add protocol transport
This commit is contained in:
parent
611e80f73c
commit
e3881c7df2
@ -76,7 +76,7 @@ func (d *Document) Apply() error {
|
|||||||
}
|
}
|
||||||
slog.Info("Document.Apply()", "declarations", d)
|
slog.Info("Document.Apply()", "declarations", d)
|
||||||
for i := range d.ResourceDecls {
|
for i := range d.ResourceDecls {
|
||||||
if e := d.ResourceDecls[i].Resource().Apply(); e != nil {
|
if e := d.ResourceDecls[i].Apply(); e != nil {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,6 +114,7 @@ func (f *File) StateMachine() machine.Stater {
|
|||||||
|
|
||||||
func (f *File) Notify(m *machine.EventMessage) {
|
func (f *File) Notify(m *machine.EventMessage) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
slog.Info("Notify()", "file", f, "m", m)
|
||||||
switch m.On {
|
switch m.On {
|
||||||
case machine.ENTERSTATEEVENT:
|
case machine.ENTERSTATEEVENT:
|
||||||
switch m.Dest {
|
switch m.Dest {
|
||||||
@ -276,6 +277,7 @@ func (f *File) Create(ctx context.Context) error {
|
|||||||
if chownErr := os.Chown(f.Path, uid, gid); chownErr != nil {
|
if chownErr := os.Chown(f.Path, uid, gid); chownErr != nil {
|
||||||
return chownErr
|
return chownErr
|
||||||
}
|
}
|
||||||
|
f.State = "present"
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,8 +10,9 @@ _ "gopkg.in/yaml.v3"
|
|||||||
"net/url"
|
"net/url"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"decl/internal/resource"
|
"decl/internal/resource"
|
||||||
|
"decl/internal/transport"
|
||||||
"regexp"
|
"regexp"
|
||||||
"os"
|
_ "os"
|
||||||
"io"
|
"io"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"errors"
|
"errors"
|
||||||
@ -20,6 +21,7 @@ _ "gopkg.in/yaml.v3"
|
|||||||
|
|
||||||
type DeclFile struct {
|
type DeclFile struct {
|
||||||
Path string `yaml:"path" json:"path"`
|
Path string `yaml:"path" json:"path"`
|
||||||
|
transport *transport.Reader `yaml:"-" json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDeclFile() *DeclFile {
|
func NewDeclFile() *DeclFile {
|
||||||
@ -30,6 +32,7 @@ func init() {
|
|||||||
SourceTypes.Register([]string{"decl"}, func(u *url.URL) DocSource {
|
SourceTypes.Register([]string{"decl"}, func(u *url.URL) DocSource {
|
||||||
t := NewDeclFile()
|
t := NewDeclFile()
|
||||||
t.Path,_ = filepath.Abs(filepath.Join(u.Hostname(), u.RequestURI()))
|
t.Path,_ = filepath.Abs(filepath.Join(u.Hostname(), u.RequestURI()))
|
||||||
|
t.transport,_ = transport.NewReader(u)
|
||||||
return t
|
return t
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -41,6 +44,7 @@ func init() {
|
|||||||
} else {
|
} else {
|
||||||
t.Path = filepath.Join(u.Hostname(), u.Path)
|
t.Path = filepath.Join(u.Hostname(), u.Path)
|
||||||
}
|
}
|
||||||
|
t.transport,_ = transport.NewReader(u)
|
||||||
return t
|
return t
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -51,26 +55,24 @@ func (d *DeclFile) Type() string { return "decl" }
|
|||||||
|
|
||||||
func (d *DeclFile) ExtractResources(filter ResourceSelector) ([]*resource.Document, error) {
|
func (d *DeclFile) ExtractResources(filter ResourceSelector) ([]*resource.Document, error) {
|
||||||
documents := make([]*resource.Document, 0, 100)
|
documents := make([]*resource.Document, 0, 100)
|
||||||
//documents = append(documents, resource.NewDocument())
|
|
||||||
|
|
||||||
GzipFileName := regexp.MustCompile(`^.*\.gz$`)
|
GzipFileName := regexp.MustCompile(`^.*\.gz$`)
|
||||||
|
|
||||||
file, fileErr := os.Open(d.Path)
|
defer d.transport.Close()
|
||||||
if fileErr != nil {
|
|
||||||
return documents, fileErr
|
|
||||||
}
|
|
||||||
var fileReader io.Reader
|
var fileReader io.Reader
|
||||||
|
|
||||||
if GzipFileName.FindString(d.Path) == d.Path {
|
if GzipFileName.FindString(d.Path) == d.Path {
|
||||||
slog.Info("decompressing gzip", "path", d.Path)
|
slog.Info("decompressing gzip", "path", d.Path)
|
||||||
zr, err := gzip.NewReader(file)
|
zr, err := gzip.NewReader(d.transport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return documents, err
|
return documents, err
|
||||||
}
|
}
|
||||||
fileReader = zr
|
fileReader = zr
|
||||||
} else {
|
} else {
|
||||||
fileReader = file
|
fileReader = d.transport
|
||||||
}
|
}
|
||||||
|
|
||||||
decoder := resource.NewYAMLDecoder(fileReader)
|
decoder := resource.NewYAMLDecoder(fileReader)
|
||||||
slog.Info("ExtractResources()", "documents", documents)
|
slog.Info("ExtractResources()", "documents", documents)
|
||||||
index := 0
|
index := 0
|
||||||
|
@ -5,14 +5,15 @@ package source
|
|||||||
import (
|
import (
|
||||||
_ "context"
|
_ "context"
|
||||||
_ "encoding/json"
|
_ "encoding/json"
|
||||||
"fmt"
|
_ "fmt"
|
||||||
_ "gopkg.in/yaml.v3"
|
_ "gopkg.in/yaml.v3"
|
||||||
"net/url"
|
"net/url"
|
||||||
"net/http"
|
_ "net/http"
|
||||||
_ "path/filepath"
|
_ "path/filepath"
|
||||||
"decl/internal/resource"
|
"decl/internal/resource"
|
||||||
"decl/internal/iofilter"
|
"decl/internal/iofilter"
|
||||||
"decl/internal/signature"
|
"decl/internal/signature"
|
||||||
|
"decl/internal/transport"
|
||||||
_ "os"
|
_ "os"
|
||||||
"io"
|
"io"
|
||||||
"errors"
|
"errors"
|
||||||
@ -21,6 +22,7 @@ _ "os"
|
|||||||
|
|
||||||
type HTTP struct {
|
type HTTP struct {
|
||||||
Endpoint string `yaml:"endpoint" json:"endpoint"`
|
Endpoint string `yaml:"endpoint" json:"endpoint"`
|
||||||
|
transport *transport.Reader `yaml:"-" json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHTTP() *HTTP {
|
func NewHTTP() *HTTP {
|
||||||
@ -31,6 +33,7 @@ func init() {
|
|||||||
SourceTypes.Register([]string{"http","https"}, func(u *url.URL) DocSource {
|
SourceTypes.Register([]string{"http","https"}, func(u *url.URL) DocSource {
|
||||||
t := NewHTTP()
|
t := NewHTTP()
|
||||||
t.Endpoint = u.String()
|
t.Endpoint = u.String()
|
||||||
|
t.transport,_ = transport.NewReader(u)
|
||||||
return t
|
return t
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -40,25 +43,11 @@ func (d *HTTP) Type() string { return "http" }
|
|||||||
func (h *HTTP) ExtractResources(filter ResourceSelector) ([]*resource.Document, error) {
|
func (h *HTTP) ExtractResources(filter ResourceSelector) ([]*resource.Document, error) {
|
||||||
documents := make([]*resource.Document, 0, 100)
|
documents := make([]*resource.Document, 0, 100)
|
||||||
|
|
||||||
resp, err := http.Get(h.Endpoint)
|
defer h.transport.Close()
|
||||||
if err != nil {
|
documentSignature := h.transport.Signature()
|
||||||
return documents, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
documentSignature := resp.Header.Get("Signature")
|
|
||||||
if documentSignature == "" {
|
|
||||||
signatureResp, signatureErr := http.Get(fmt.Sprintf("%s.sig", h.Endpoint))
|
|
||||||
if signatureErr == nil {
|
|
||||||
defer signatureResp.Body.Close()
|
|
||||||
readSignatureBody, readSignatureErr := io.ReadAll(resp.Body)
|
|
||||||
if readSignatureErr == nil {
|
|
||||||
documentSignature = string(readSignatureBody)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
hash := sha256.New()
|
hash := sha256.New()
|
||||||
sumReadData := iofilter.NewReader(resp.Body, func(p []byte, readn int, readerr error) (n int, err error) {
|
sumReadData := iofilter.NewReader(h.transport, func(p []byte, readn int, readerr error) (n int, err error) {
|
||||||
hash.Write(p)
|
hash.Write(p)
|
||||||
return
|
return
|
||||||
})
|
})
|
||||||
|
@ -10,15 +10,17 @@ _ "gopkg.in/yaml.v3"
|
|||||||
"net/url"
|
"net/url"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"decl/internal/resource"
|
"decl/internal/resource"
|
||||||
|
"decl/internal/transport"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"archive/tar"
|
"archive/tar"
|
||||||
"regexp"
|
"regexp"
|
||||||
"os"
|
_ "os"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Tar struct {
|
type Tar struct {
|
||||||
Path string `yaml:"path" json:"path"`
|
Path string `yaml:"path" json:"path"`
|
||||||
|
transport *transport.Reader `yaml:"-" json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTar() *Tar {
|
func NewTar() *Tar {
|
||||||
@ -29,6 +31,7 @@ func init() {
|
|||||||
SourceTypes.Register([]string{"tar"}, func(u *url.URL) DocSource {
|
SourceTypes.Register([]string{"tar"}, func(u *url.URL) DocSource {
|
||||||
t := NewTar()
|
t := NewTar()
|
||||||
t.Path,_ = filepath.Abs(filepath.Join(u.Hostname(), u.Path))
|
t.Path,_ = filepath.Abs(filepath.Join(u.Hostname(), u.Path))
|
||||||
|
t.transport,_ = transport.NewReader(u)
|
||||||
return t
|
return t
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -40,6 +43,7 @@ func init() {
|
|||||||
} else {
|
} else {
|
||||||
t.Path = filepath.Join(u.Hostname(), u.Path)
|
t.Path = filepath.Join(u.Hostname(), u.Path)
|
||||||
}
|
}
|
||||||
|
t.transport,_ = transport.NewReader(u)
|
||||||
return t
|
return t
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -56,14 +60,12 @@ func (t *Tar) ExtractResources(filter ResourceSelector) ([]*resource.Document, e
|
|||||||
TarGzipFileName := regexp.MustCompile(`^.*\.(tar\.gz|tgz)$`)
|
TarGzipFileName := regexp.MustCompile(`^.*\.(tar\.gz|tgz)$`)
|
||||||
TarFileName := regexp.MustCompile(`^.*\.tar$`)
|
TarFileName := regexp.MustCompile(`^.*\.tar$`)
|
||||||
|
|
||||||
file, fileErr := os.Open(t.Path)
|
defer t.transport.Close()
|
||||||
if fileErr != nil {
|
|
||||||
return documents, fileErr
|
|
||||||
}
|
|
||||||
var gzipReader io.Reader
|
var gzipReader io.Reader
|
||||||
switch t.Path {
|
switch t.Path {
|
||||||
case TarGzipFileName.FindString(t.Path):
|
case TarGzipFileName.FindString(t.Path):
|
||||||
zr, err := gzip.NewReader(file)
|
zr, err := gzip.NewReader(t.transport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return documents, err
|
return documents, err
|
||||||
}
|
}
|
||||||
@ -72,7 +74,7 @@ func (t *Tar) ExtractResources(filter ResourceSelector) ([]*resource.Document, e
|
|||||||
case TarFileName.FindString(t.Path):
|
case TarFileName.FindString(t.Path):
|
||||||
var fileReader io.Reader
|
var fileReader io.Reader
|
||||||
if gzipReader == nil {
|
if gzipReader == nil {
|
||||||
fileReader = file
|
fileReader = t.transport
|
||||||
} else {
|
} else {
|
||||||
fileReader = gzipReader
|
fileReader = gzipReader
|
||||||
}
|
}
|
||||||
|
72
internal/transport/transport.go
Normal file
72
internal/transport/transport.go
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||||
|
|
||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "errors"
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"net/http"
|
||||||
|
_ "strings"
|
||||||
|
"path/filepath"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Reader struct {
|
||||||
|
uri *url.URL
|
||||||
|
handle any
|
||||||
|
stream io.ReadCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReader(u *url.URL) (*Reader, error) {
|
||||||
|
var e error
|
||||||
|
r := &Reader{ uri: u }
|
||||||
|
switch u.Scheme {
|
||||||
|
case "http", "https":
|
||||||
|
resp, err := http.Get(u.String())
|
||||||
|
r.handle = resp
|
||||||
|
r.stream = resp.Body
|
||||||
|
e = err
|
||||||
|
case "file":
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
path := filepath.Join(u.Hostname(), u.RequestURI())
|
||||||
|
file, err := os.Open(path)
|
||||||
|
r.handle = file
|
||||||
|
r.stream = file
|
||||||
|
e = err
|
||||||
|
}
|
||||||
|
return r, e
|
||||||
|
}
|
||||||
|
|
||||||
|
type Writer struct {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWriter(url *url.URL) (*Writer, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Read(b []byte) (int, error) {
|
||||||
|
return r.stream.Read(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Close() error {
|
||||||
|
return r.stream.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Signature() string {
|
||||||
|
documentSignature := r.handle.(*http.Response).Header.Get("Signature")
|
||||||
|
if documentSignature == "" {
|
||||||
|
signatureResp, signatureErr := http.Get(fmt.Sprintf("%s.sig", r.uri.String()))
|
||||||
|
if signatureErr == nil {
|
||||||
|
defer signatureResp.Body.Close()
|
||||||
|
readSignatureBody, readSignatureErr := io.ReadAll(signatureResp.Body)
|
||||||
|
if readSignatureErr == nil {
|
||||||
|
documentSignature = string(readSignatureBody)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return documentSignature
|
||||||
|
}
|
40
internal/transport/transport_test.go
Normal file
40
internal/transport/transport_test.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||||
|
|
||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"net/url"
|
||||||
|
"testing"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
var TempDir string
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
var err error
|
||||||
|
TempDir, err = os.MkdirTemp("", "testtransportfile")
|
||||||
|
if err != nil || TempDir == "" {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rc := m.Run()
|
||||||
|
|
||||||
|
os.RemoveAll(TempDir)
|
||||||
|
os.Exit(rc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewTransportReader(t *testing.T) {
|
||||||
|
path := fmt.Sprintf("%s/foo", TempDir)
|
||||||
|
u, e := url.Parse(fmt.Sprintf("file://%s", path))
|
||||||
|
assert.Nil(t, e)
|
||||||
|
|
||||||
|
writeErr := os.WriteFile(path, []byte("test"), 0644)
|
||||||
|
assert.Nil(t, writeErr)
|
||||||
|
|
||||||
|
reader, err := NewReader(u)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.NotNil(t, reader)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user