collect resource/doc errors and add to result. Add readwritecloser support for HTTP transport reading response data
All checks were successful
Lint / golangci-lint (push) Successful in 10m38s
Declarative Tests / test (push) Successful in 38s

This commit is contained in:
Matthew Rich 2024-10-02 20:26:02 +00:00
parent 0da6c3db75
commit eaaf0f8931
27 changed files with 1119 additions and 274 deletions

View File

@ -239,3 +239,28 @@ resources:
assert.NotEqual(t, "", string(yaml))
assert.Greater(t, len(yaml), 0)
}
func TestFailedResources(t *testing.T) {
if _, e := os.Stat("./jx"); errors.Is(e, os.ErrNotExist) {
t.Skip("cli not built")
}
os.Unsetenv("JX_DEBUG")
resources := `
resources:
- type: package
transition: create
attributes:
name: foobarbaz
`
assert.Nil(t, TempDir.CreateFile("err.jx.yaml", resources))
yaml, cliErr := exec.Command("./jx", "apply", TempDir.FilePath("err.jx.yaml")).Output()
if cliErr != nil {
slog.Info("Debug CLI error", "error", cliErr, "stderr", cliErr.(*exec.ExitError).Stderr)
}
assert.NotNil(t, cliErr)
assert.NotEqual(t, "", string(yaml))
assert.Contains(t, string(cliErr.(*exec.ExitError).Stderr), "Document errors: 1")
}

2
go.mod
View File

@ -19,6 +19,8 @@ require google.golang.org/protobuf v1.33.0
require (
gitea.rosskeen.house/pylon/luaruntime v0.0.0-20240924031921-4d00743b53e1 // indirect
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/ProtonMail/go-crypto v1.0.0 // indirect
github.com/cloudflare/circl v1.3.3 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/distribution/reference v0.5.0 // indirect

36
go.sum
View File

@ -8,8 +8,13 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0kC2U78=
github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0=
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs=
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -94,6 +99,7 @@ github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k=
@ -113,30 +119,58 @@ go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7e
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
@ -145,6 +179,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -20,6 +20,7 @@ _ "decl/internal/resource"
var (
ErrFailedResources error = errors.New("Failed Resources")
ErrFailedDocuments error = errors.New("Document errors")
)
type App struct {
@ -163,6 +164,7 @@ func (a *App) Import(docs []string) (err error) {
}
func (a *App) Apply(ctx context.Context, deleteResources bool) (err error) {
var errorsCount int = 0
for _, d := range a.Documents {
d.SetConfig(a.Config)
@ -176,6 +178,8 @@ func (a *App) Apply(ctx context.Context, deleteResources bool) (err error) {
if ! d.CheckConstraints() {
slog.Info("Client.Apply() document constraints failed", "requires", d)
d.AddError(fmt.Errorf("%w: %s", folio.ErrConstraintFailure, d.GetURI()))
errorsCount++
continue
}
@ -185,9 +189,14 @@ func (a *App) Apply(ctx context.Context, deleteResources bool) (err error) {
return e
}
if d.Failures() > 0 {
err = fmt.Errorf("%w: %d, %w", ErrFailedResources, d.Failures(), err)
d.AddError(fmt.Errorf("%w: %d, %w", ErrFailedResources, d.Failures(), err))
errorsCount++
}
}
if errorsCount > 0 {
return fmt.Errorf("%w: %d", ErrFailedDocuments, errorsCount)
}
return
}
@ -221,6 +230,7 @@ func (a *App) ImportCmd(ctx context.Context, docs []string, resourceURI string,
}
func (a *App) ApplyCmd(ctx context.Context, docs []string, quiet bool, deleteResources bool) (err error) {
var failedResources error
if err = a.Import(docs); err != nil {
return
}
@ -229,10 +239,10 @@ func (a *App) ApplyCmd(ctx context.Context, docs []string, quiet bool, deleteRes
return
}
if err = a.Apply(ctx, deleteResources); err != nil {
slog.Info("Client.ApplyCmd()", "client", a, "error", err)
if ! errors.Is(err, ErrFailedResources) {
return
if failedResources = a.Apply(ctx, deleteResources); failedResources != nil {
slog.Info("Client.ApplyCmd()", "client", a, "error", failedResources)
if ! errors.Is(failedResources, ErrFailedResources) && ! errors.Is(failedResources, ErrFailedDocuments) {
return failedResources
}
}
@ -240,10 +250,16 @@ func (a *App) ApplyCmd(ctx context.Context, docs []string, quiet bool, deleteRes
err = a.Quiet()
} else {
err = a.Emit()
}
if failedResources != nil {
if err != nil {
return
return fmt.Errorf("%w %w", failedResources, err)
} else {
return failedResources
}
}
return
}

View File

@ -13,7 +13,7 @@
"type": {
"type": "string",
"description": "Config type name.",
"enum": [ "generic", "exec", "certificate" ]
"enum": [ "system", "generic", "exec", "certificate" ]
},
"values": {
"oneOf": [

View File

@ -73,4 +73,5 @@ type Document interface {
Diff(with Document, output io.Writer) (returnOutput string, diffErr error)
DiffState(output io.Writer) (returnOutput string, diffErr error)
Clone() Document
AddError(error)
}

View File

@ -57,6 +57,17 @@ func NewDeclarationFromDocument(document *Document) *Declaration {
return &Declaration{ document: document, ResourceTypes: document.Types() }
}
func (n *ConfigName) Exists() bool {
return DocumentRegistry.ConfigNameMap.Has(string(*n))
}
func (n *ConfigName) GetBlock() *Block {
if v, ok := DocumentRegistry.ConfigNameMap.Get(string(*n)); ok {
return v
}
return nil
}
func (d *Declaration) SetDocument(newDocument *Document) {
slog.Info("Declaration.SetDocument()", "declaration", d)
d.document = newDocument
@ -176,7 +187,7 @@ func (d *Declaration) Apply(stateTransition string) (result error) {
}
stater := d.Attributes.StateMachine()
slog.Info("Declaration.Apply()", "machine", stater, "machine.state", stater.CurrentState(), "uri", d.Attributes.URI())
slog.Info("Declaration.Apply()", "stateTransition", stateTransition, "machine", stater, "machine.state", stater.CurrentState(), "uri", d.Attributes.URI())
switch stateTransition {
case "construct":
if doc, ok := DocumentRegistry.DeclarationMap[d]; ok {
@ -231,6 +242,10 @@ func (d *Declaration) SetConfig(configDoc data.Document) {
if v, ok := DocumentRegistry.ConfigNameMap.Get(string(d.Config)); ok {
d.configBlock = v
d.Attributes.UseConfig(d.configBlock)
return
}
if d.Config != "" { // XXX
panic("failed setting config")
}
}
@ -284,11 +299,13 @@ func (d *Declaration) UnmarshalValue(value *DeclarationType) error {
d.Error = value.Error
d.Requires = value.Requires
newResource, resourceErr := d.ResourceTypes.New(fmt.Sprintf("%s://", value.Type))
slog.Info("Declaration.UnmarshalValue", "value", value, "error", resourceErr, "type", value.Type, "resource", newResource, "resourcetypes", d.ResourceTypes)
if resourceErr != nil {
slog.Info("Declaration.UnmarshalValue", "value", value, "error", resourceErr)
return resourceErr
}
d.Attributes = newResource
d.configBlock = d.Config.GetBlock()
return nil
}

View File

@ -8,17 +8,6 @@ import (
// Dependencies describe a requirement on a given system property
// system properties:
// loaded from config
//
// deps assigned to decl
// match values in document configurations
// match values in all configurations?
//
// documents/facter.jx.yaml -> facts -> Get(key)
//
// ConfigMapper? -> DocumentRegistry
// requires:
// arch: amd64
// foo: bar

View File

@ -27,6 +27,7 @@ type DocumentType struct {
Format codec.Format `json:"format,omitempty" yaml:"format,omitempty"`
Requires Dependencies `json:"requires,omitempty" yaml:"requires,omitempty"`
Imports []URI `json:"imports,omitempty" yaml:"imports,omitempty"`
Errors []string `json:"error,omitempty" yaml:"error,omitempty"`
}
type Document struct {
@ -35,6 +36,7 @@ type Document struct {
Format codec.Format `json:"format,omitempty" yaml:"format,omitempty"`
Requires Dependencies `json:"requires,omitempty" yaml:"requires,omitempty"`
Imports []URI `json:"imports,omitempty" yaml:"imports,omitempty"`
Errors []string `json:"error,omitempty" yaml:"error,omitempty"`
uris mapper.Store[string, data.Declaration]
ResourceDeclarations []*Declaration `json:"resources,omitempty" yaml:"resources,omitempty"`
configNames mapper.Store[string, data.Block] `json:"-" yaml:"-"`
@ -269,10 +271,9 @@ func (d *Document) Apply(state string) error {
idx := i - start
if idx < 0 { idx = - idx }
slog.Info("Document.Apply() applying resource", "index", idx, "uri", d.ResourceDeclarations[idx].Resource().URI(), "resource", d.ResourceDeclarations[idx].Resource())
d.ResourceDeclarations[idx].SetConfig(d.config)
slog.Info("Document.Apply() applying resource", "index", idx, "uri", d.ResourceDeclarations[idx].Resource().URI(), "resource", d.ResourceDeclarations[idx].Resource())
slog.Info("Document.Apply() applying resource", "index", idx, "uri", d.ResourceDeclarations[idx].Resource().URI(), "state", state, "resource", d.ResourceDeclarations[idx].Resource())
if d.ResourceDeclarations[idx].Requires.Check() {
if e := d.ResourceDeclarations[idx].Apply(state); e != nil {
@ -572,3 +573,6 @@ func (d *Document) UnmarshalJSON(data []byte) error {
return d.loadImports()
}
func (d *Document) AddError(e error) {
d.Errors = append(d.Errors, e.Error())
}

View File

@ -92,8 +92,10 @@ func init() {
ResourceTypes.Register([]string{"container"}, func(u *url.URL) data.Resource {
c := NewContainer(nil)
c.Name = filepath.Join(u.Hostname(), u.Path)
c.Common.SetParsedURI(u)
return c
if err := c.Common.SetParsedURI(u); err == nil {
return c
}
return nil
})
}
@ -425,16 +427,23 @@ func (c *Container) URI() string {
func (c *Container) Type() string { return "container" }
func (c *Container) ResolveId(ctx context.Context) string {
c.Common.SetURI(c.URI())
var err error
if err = c.Common.SetURI(c.URI()); err != nil {
triggerErr := c.StateMachine().Trigger("notexists")
panic(fmt.Errorf("%w: %s %s, %w", err, c.Type(), c.Name, triggerErr))
}
filterArgs := filters.NewArgs()
filterArgs.Add("name", "/"+c.Name)
containers, err := c.apiClient.ContainerList(ctx, container.ListOptions{
containers, listErr := c.apiClient.ContainerList(ctx, container.ListOptions{
All: true,
Filters: filterArgs,
})
if err != nil {
if listErr != nil {
triggerErr := c.StateMachine().Trigger("notexists")
panic(fmt.Errorf("%w: %s %s, %w", err, c.Type(), c.Name, triggerErr))
panic(fmt.Errorf("%w: %s %s, %w", listErr, c.Type(), c.Name, triggerErr))
}
slog.Info("Container.ResolveId()", "containers", containers)

View File

@ -96,6 +96,8 @@ type ContainerImage struct {
Resources data.ResourceMapper `json:"-" yaml:"-"`
contextDocument data.Document `json:"-" yaml:"-"`
ConverterTypes data.TypesRegistry[data.Converter] `json:"-" yaml:"-"`
imageStat types.ImageInspect `json:"-" yaml:"-"`
}
func init() {
@ -126,19 +128,19 @@ func NewContainerImage(containerClientApi ContainerImageClient) *ContainerImage
}
func (c *ContainerImage) RegistryAuthConfig() (authConfig registry.AuthConfig, err error) {
if c.config != nil {
if c.Common.config != nil {
var configValue any
if configValue, err = c.config.GetValue("repo_username"); err != nil {
if configValue, err = c.Common.config.GetValue("repo_username"); err != nil {
return
} else {
authConfig.Username = configValue.(string)
}
if configValue, err = c.config.GetValue("repo_password"); err != nil {
if configValue, err = c.Common.config.GetValue("repo_password"); err != nil {
return
} else {
authConfig.Password = configValue.(string)
}
if configValue, err = c.config.GetValue("repo_server"); err != nil {
if configValue, err = c.Common.config.GetValue("repo_server"); err != nil {
return authConfig, nil
} else {
authConfig.ServerAddress = configValue.(string)
@ -279,6 +281,16 @@ func (c *ContainerImage) Notify(m *machine.EventMessage) {
switch m.On {
case machine.ENTERSTATEEVENT:
switch m.Dest {
case "start_stat":
if statErr := c.ReadStat(ctx); statErr == nil {
if triggerErr := c.StateMachine().Trigger("exists"); triggerErr == nil {
return
}
} else {
if triggerErr := c.StateMachine().Trigger("notexists"); triggerErr == nil {
return
}
}
case "start_read":
if _,readErr := c.Read(ctx); readErr == nil {
if triggerErr := c.stater.Trigger("state_read"); triggerErr == nil {
@ -720,6 +732,13 @@ func (c *ContainerImage) Inspect(ctx context.Context) (imageInspect types.ImageI
return
}
func (c *ContainerImage) ReadStat(ctx context.Context) (err error) {
if c.imageStat, _, err = c.apiClient.ImageInspectWithRaw(ctx, c.Name); err != nil || c.imageStat.ID == "" {
return fmt.Errorf("%w: %s %s", err, c.Type(), c.Name)
}
return
}
func (c *ContainerImage) Read(ctx context.Context) (resourceYaml []byte, err error) {
defer func() {
if r := recover(); r != nil {
@ -781,7 +800,6 @@ fmt.Printf("Untagged image: %s\n", img.Untagged)
func (c *ContainerImage) Type() string { return "container-image" }
func (c *ContainerImage) ResolveId(ctx context.Context) string {
slog.Info("ContainerImage.ResolveId()", "name", c.Name, "machine.state", c.StateMachine().CurrentState())
imageInspect, _, err := c.apiClient.ImageInspectWithRaw(ctx, c.Name)
if err != nil {
triggerResult := c.StateMachine().Trigger("notexists")

View File

@ -8,6 +8,7 @@ import (
"fmt"
"gopkg.in/yaml.v3"
"io"
"io/fs"
"net/url"
"net/http"
_ "os"
@ -20,10 +21,32 @@ _ "os"
"decl/internal/transport"
"decl/internal/folio"
"decl/internal/iofilter"
"decl/internal/ext"
"crypto/sha256"
"encoding/hex"
"time"
)
/* HTTP resource
Lifecycle
transitions:
Create
* Stat request
* if the resource is absent then execute a POST request to create it.
* if the resource is present then throw an error.
Read
* Stat request
* if the resource is present then execute a GET request to retrieve the content.
Update
* Stat request
* execute a PUT request to update the resource.
Delete
* Stat request
* DELETE request
*/
const (
HTTPTypeName TypeName = "http"
)
@ -70,12 +93,12 @@ type HTTP struct {
StatusCode int `yaml:"statuscode,omitempty" json:"statuscode,omitempty"`
Sha256 string `yaml:"sha256,omitempty" json:"sha256,omitempty"`
SerializeContent bool `json:"serializecontent,omitempty" yaml:"serializecontent,omitempty"`
LastModified time.Time `json:"lastmodified,omitempty" yaml:"lastmodified,omitempty"`
Size int64 `yaml:"size,omitempty" json:"size,omitempty"`
SignatureValue string `yaml:"signature,omitempty" json:"signature,omitempty"`
config data.ConfigurationValueGetter
Resources data.ResourceMapper `yaml:"-" json:"-"`
reader *transport.Reader `yaml:"-" json:"-"`
writer *transport.Writer `yaml:"-" json:"-"`
writer *transport.ReadWriter `yaml:"-" json:"-"`
}
func NewHTTP() *HTTP {
@ -96,16 +119,24 @@ func (h *HTTP) SetResourceMapper(resources data.ResourceMapper) {
func (h *HTTP) Open() (err error) {
u := h.Common.parsedURI
if u != nil {
if h.reader, err = transport.NewReader(u); err != nil {
return
}
if h.writer, err = transport.NewWriter(u); err != nil {
return
}
} else {
if u == nil {
err = fmt.Errorf("HTTP parsed URI is not set: %s", h.Endpoint)
}
/*
else {
err = h.OpenGetter()
}
*/
return
}
func (h *HTTP) OpenGetter() (err error) {
h.reader, err = transport.NewReader(h.Common.parsedURI)
return
}
func (h *HTTP) OpenPoster() (err error) {
h.writer, err = transport.NewReadWriter(h.Common.parsedURI)
return
}
@ -202,11 +233,78 @@ func (h *HTTP) SetParsedURI(u *url.URL) (err error) {
return
}
func (h *HTTP) ReadStat() (err error) {
err = h.Open()
func (h *HTTP) SetFileInfo(info fs.FileInfo) error {
if info != nil {
h.LastModified = info.ModTime()
h.Size = info.Size()
contentType := info.Sys().(*transport.HTTPFileInfo).ContentType
if contentType != "" {
contentTypeHeader := &HTTPHeader{ Name: "Content-Type", Value: contentType }
for _, h := range h.Headers {
if h.Name == "Content-Type" {
h.Value = contentType
contentTypeHeader = nil
}
}
if contentTypeHeader != nil {
h.Headers = append(h.Headers, *contentTypeHeader)
}
}
return nil
}
return ErrInvalidFileInfo
}
func (h *HTTP) ContentSourceRefStat() (info fs.FileInfo) {
if len(h.ContentSourceRef) > 0 {
rs, _ := h.ContentReaderStream()
info, _ = rs.Stat()
rs.Close()
}
return
}
func (h *HTTP) ReadStat() (err error) {
if h.reader == nil {
if err = h.OpenGetter(); err != nil {
return
}
}
var info fs.FileInfo
info, err = h.reader.Stat()
if err == nil {
_ = h.SetFileInfo(info)
} else {
if refStat := h.ContentSourceRefStat(); refStat != nil {
_ = h.SetFileInfo(refStat)
err = nil
}
}
if err != nil {
h.State = "absent"
return
}
return
}
func (h *HTTP) ContentReaderStream() (*transport.Reader, error) {
if len(h.Content) == 0 && len(h.ContentSourceRef) != 0 {
return h.ContentSourceRef.Lookup(nil).ContentReaderStream()
}
return nil, fmt.Errorf("Cannot provide transport reader for string content")
}
func (h *HTTP) ContentWriterStream() (*transport.Writer, error) {
if len(h.Content) == 0 && len(h.ContentSourceRef) != 0 {
return h.ContentSourceRef.Lookup(nil).ContentWriterStream()
}
return nil, fmt.Errorf("Cannot provide transport writer for string content")
}
func (h *HTTP) JSON() ([]byte, error) {
return json.Marshal(h)
}
@ -283,25 +381,36 @@ func (h *HTTP) HashHexString() string {
}
func (h *HTTP) Create(ctx context.Context) (err error) {
slog.Error("HTTP.Create()", "http", h)
slog.Info("HTTP.Create()", "http", h)
if err = h.OpenPoster(); err != nil {
return
}
var contentReader io.ReadCloser
h.writer, err = transport.NewWriterWithContext(h.Common.parsedURI, ctx)
h.writer, err = transport.NewReadWriterWithContext(h.Common.parsedURI, ctx)
if err != nil {
slog.Error("HTTP.Create()", "http", h, "error", err)
//panic(err)
return
}
slog.Error("HTTP.Create() content", "http", h)
slog.Info("HTTP.Create() content", "http", h)
// create should post to the named resource using the resource content or sourceref.
contentReader, err = h.contentSourceReader()
if err != nil {
slog.Error("HTTP.Create()", "error", err)
return
}
contentReader = h.UpdateContentAttributesFromReader(contentReader)
srcRead := ext.NewReadCloser(contentReader)
if _, ok := srcRead.(io.WriterTo); ok {
panic("reader has writerto interface")
}
if _, ok := any(h.writer).(io.ReaderFrom); !ok {
panic("writer is missing io.ReaderFrom")
}
if err != nil {
return
}
@ -325,12 +434,13 @@ func (h *HTTP) Create(ctx context.Context) (err error) {
slog.Error("HTTP.Create()", "http", h, "reader", h.reader)
copyBuffer := make([]byte, 32 * 1024)
_, writeErr := io.CopyBuffer(h.writer, contentReader, copyBuffer)
_, writeErr := io.CopyBuffer(h.writer, srcRead, copyBuffer)
if writeErr != nil {
return fmt.Errorf("Http.Create(): CopyBuffer failed %v %v: %w", h.writer, contentReader, writeErr)
}
h.Status = h.writer.Status()
h.StatusCode = h.writer.StatusCode()
return
}
@ -339,8 +449,8 @@ func (h *HTTP) Update(ctx context.Context) error {
}
func (h *HTTP) ReadAuthorizationTokenFromConfig(req *http.Request) error {
if h.config != nil {
token, tokenErr := h.config.GetValue("authorization_token")
if h.Common.config != nil {
token, tokenErr := h.Common.config.GetValue("authorization_token")
if tokenErr == nil {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
}
@ -351,8 +461,9 @@ func (h *HTTP) ReadAuthorizationTokenFromConfig(req *http.Request) error {
}
func (h *HTTP) AddAuthorizationTokenFromConfigToTransport() (err error) {
if h.config != nil {
token, tokenErr := h.config.GetValue("authorization_token")
if h.Common.config != nil {
token, tokenErr := h.Common.config.GetValue("authorization_token")
slog.Info("HTTP.AddAuthorizationTokenFromConfigToTransport()", "error", tokenErr)
if tokenErr == nil {
if h.reader != nil {
h.reader.AddHeader("Authorization", fmt.Sprintf("Bearer %s", token))
@ -444,7 +555,7 @@ func (h *HTTP) contentSourceReader() (contentReader io.ReadCloser, err error) {
contentReader.(*transport.Reader).SetGzip(false)
} else {
if len(h.Content) != 0 {
contentReader = io.NopCloser(strings.NewReader(h.Content))
contentReader = io.NopCloser(ext.NewStringReader(h.Content))
} else {
err = ErrUndefinedContentSource
}
@ -480,7 +591,7 @@ func (h *HTTP) Read(ctx context.Context) (yamlData []byte, err error) {
return
}
if len(h.Headers) > 0 {
if len(h.Headers) > 0 {
for _, header := range h.Headers {
h.reader.AddHeader(header.Name, header.Value)
}
@ -490,9 +601,11 @@ func (h *HTTP) Read(ctx context.Context) (yamlData []byte, err error) {
if err = h.SetContent(contentReader); err != nil {
return
}
h.Status = h.reader.Status()
h.StatusCode = h.reader.StatusCode()
if h.writer == nil {
h.Status = h.reader.Status()
h.StatusCode = h.reader.StatusCode()
}
return yaml.Marshal(h)
}

View File

@ -120,6 +120,16 @@ func (p *Package) Notify(m *machine.EventMessage) {
switch m.On {
case machine.ENTERSTATEEVENT:
switch m.Dest {
case "start_stat":
if statErr := p.ReadStat(ctx); statErr == nil {
if triggerErr := p.StateMachine().Trigger("exists"); triggerErr == nil {
return
}
} else {
if triggerErr := p.StateMachine().Trigger("notexists"); triggerErr == nil {
return
}
}
case "start_read":
if _,readErr := p.Read(ctx); readErr == nil {
if triggerErr := p.StateMachine().Trigger("state_read"); triggerErr == nil {
@ -170,6 +180,13 @@ func (p *Package) Notify(m *machine.EventMessage) {
}
}
func (p *Package) ReadStat(ctx context.Context) (err error) {
if p.ReadCommand.Exists() {
_, err = p.ReadCommand.Execute(p)
}
return
}
func (p *Package) URI() string {
return fmt.Sprintf("package://%s?version=%s&type=%s", p.Name, url.QueryEscape(p.Version), p.PackageType)
}
@ -220,43 +237,12 @@ func (p *Package) Validate() error {
func (p *Package) ResolveId(ctx context.Context) string {
slog.Info("Package.ResolveId()", "name", p.Name, "machine.state", p.StateMachine().CurrentState())
/*
imageInspect, _, err := p.apiClient.ImageInspectWithRaw(ctx, p.Name)
if err != nil {
triggerResult := p.StateMachine().Trigger("notexists")
slog.Info("ContainerImage.ResolveId()", "name", p.Name, "machine.state", p.StateMachine().CurrentState(), "resource.state", p.State, "trigger.error", triggerResult)
panic(fmt.Errorf("%w: %s %s", err, p.Type(), p.Name))
}
slog.Info("ContainerImage.ResolveId()", "name", c.Name, "machine.state", c.StateMachine().CurrentState(), "resource.state", c.State)
c.Id = imageInspect.ID
if c.Id != "" {
if triggerErr := c.StateMachine().Trigger("exists"); triggerErr != nil {
panic(fmt.Errorf("%w: %s %s", triggerErr, c.Type(), c.Name))
}
slog.Info("ContainerImage.ResolveId() trigger created", "machine", c.StateMachine(), "state", c.StateMachine().CurrentState())
} else {
if triggerErr := c.StateMachine().Trigger("notexists"); triggerErr != nil {
panic(fmt.Errorf("%w: %s %s", triggerErr, c.Type(), c.Name))
}
slog.Info("ContainerImage.ResolveId()", "name", c.Name, "machine.state", c.StateMachine().CurrentState(), "resource.state", c.State)
}
return c.Id
*/
if p.ReadCommand.Exists() {
if _, err := p.ReadCommand.Execute(p); err != nil {
if triggerResult := p.StateMachine().Trigger("notexists"); triggerResult != nil {
panic(fmt.Errorf("%w: %s %s", err, p.Type(), p.Name))
}
}
/*
exErr := p.ReadCommand.Extractor(out, p)
if exErr != nil {
return nil, exErr
}
return yaml.Marshal(p)
} else {
return nil, ErrUnsupportedPackageType
*/
}
return p.Name
}
@ -266,7 +252,7 @@ func (p *Package) Create(ctx context.Context) (err error) {
p.Version = ""
}
slog.Info("Package.Create()")
if source := p.SourceRef.Lookup(p.Resources); source != nil {
r, _ := source.ContentReaderStream()
if p.CreateCommand.StdinAvailable {
@ -284,6 +270,7 @@ func (p *Package) Create(ctx context.Context) (err error) {
}
if _, err = p.CreateCommand.Execute(p); err != nil {
slog.Info("Package.Create() ERROR", "error", err)
msg := err.Error()
lenMsg := len(msg) - 1
lenErr := len(ErrRpmPackageInstalled.Error())
@ -292,6 +279,7 @@ func (p *Package) Create(ctx context.Context) (err error) {
}
}
_, err = p.Read(ctx)
slog.Info("Package.Create()", "package", p, "readerr", err)
return
}

View File

@ -8,26 +8,14 @@ _ "errors"
_ "os"
"net/url"
"net/http"
"strings"
"fmt"
"context"
"path/filepath"
"log/slog"
"io/fs"
"decl/internal/identifier"
"strconv"
)
type Pipe struct {
Reader io.ReadCloser
Writer io.WriteCloser
}
type HTTPConnection struct {
stream *Pipe
request *http.Request
response *http.Response
Client *http.Client
}
type HTTP struct {
uri *url.URL
path string
@ -37,97 +25,17 @@ type HTTP struct {
ctx context.Context
get *HTTPConnection
post *HTTPConnection
Client *http.Client
}
func HTTPExists(u *url.URL) bool {
resp, err := http.Head(u.String())
if err == nil && resp.StatusCode == 200 {
return true
}
return false
}
func NewPipe() *Pipe {
r,w := io.Pipe()
return &Pipe{ Reader: r, Writer: w }
}
func NewHTTPConnection(client *http.Client) *HTTPConnection {
return &HTTPConnection {
Client: client,
}
}
func (h *HTTPConnection) NewPostRequest(ctx context.Context, uri string) (err error) {
h.stream = NewPipe()
h.request, err = http.NewRequestWithContext(ctx, "POST", uri, h.Reader())
return
}
func (h *HTTPConnection) NewGetRequest(ctx context.Context, uri string) (err error) {
h.request, err = http.NewRequestWithContext(ctx, "GET", uri, nil)
return
}
func (h *HTTPConnection) Request() *http.Request {
return h.request
}
func (h *HTTPConnection) Response() *http.Response {
return h.response
}
func (h *HTTPConnection) Writer() io.WriteCloser {
return h.stream.Writer
}
func (h *HTTPConnection) Reader() io.ReadCloser {
return h.stream.Reader
}
func (h *HTTPConnection) Do() (err error) {
slog.Info("transport.HTTPConnection.Do()", "connection", h, "request", h.request)
h.response, err = h.Client.Do(h.request)
return
}
func (h *HTTPConnection) Read(p []byte) (n int, err error) {
if h.response == nil {
if err = h.Do(); err != nil {
return
}
}
return h.response.Body.Read(p)
}
func (h *HTTPConnection) Write(p []byte) (n int, err error) {
if h.response == nil {
if err = h.Do(); err != nil {
return
}
}
slog.Info("transport.HTTPConnection.Write()", "data", p, "connection", h)
return h.Writer().Write(p)
}
func (h *HTTPConnection) ReadFrom(r io.Reader) (n int64, err error) {
h.request.Body = r.(io.ReadCloser)
if h.response == nil {
if err = h.Do(); err != nil {
return
}
}
return h.request.ContentLength, nil
}
func (h *HTTPConnection) Close() (err error) {
if h.response != nil {
defer h.response.Body.Close()
}
if h.stream != nil {
err = h.Writer().Close()
}
return
}
func NewHTTP(u *url.URL, ctx context.Context) (h *HTTP, err error) {
h = &HTTP {
ctx: ctx,
@ -142,13 +50,8 @@ func NewHTTP(u *url.URL, ctx context.Context) (h *HTTP, err error) {
}
func (h *HTTP) extension() {
elements := strings.Split(h.path, ".")
numberOfElements := len(elements)
if numberOfElements > 2 {
h.exttype = elements[numberOfElements - 2]
h.fileext = elements[numberOfElements - 1]
}
h.exttype = elements[numberOfElements - 1]
id := identifier.ID(h.path)
h.exttype, h.fileext = id.Extension()
}
func (h *HTTP) DetectGzip() {
@ -181,7 +84,25 @@ func (h *HTTP) Signature() (documentSignature string) {
}
func (h *HTTP) Stat() (info fs.FileInfo, err error) {
return
uri := h.uri.String()
var response *http.Response
if response, err = http.Head(uri); err != nil {
return
}
defer response.Body.Close()
fi := NewHTTPFileInfo(uri)
contentLength := response.Header.Get("Content-Length")
if contentLength != "" {
if fi.ContentLength, err = strconv.ParseInt(contentLength, 10, 64); err != nil {
return
}
}
if response.Header.Get("Last-Modified") != "" {
fi.LastModified, _ = http.ParseTime(response.Header.Get("Last-Modified"))
}
fi.ContentType = response.Header.Get("Content-Type")
return fi, err
}
func (h *HTTP) ContentType() (contenttype string) {
@ -201,44 +122,3 @@ func (h *HTTP) SetGzip(gzip bool) {
func (h *HTTP) Gzip() bool {
return h.gzip
}
func (h *HTTP) Reader() io.ReadCloser {
if h.get == nil {
h.get = NewHTTPConnection(h.Client)
if err := h.get.NewGetRequest(h.ctx, h.uri.String()); err != nil {
panic(err)
}
}
return h.get
}
func (h *HTTP) Writer() io.WriteCloser {
if h.post == nil {
h.post = NewHTTPConnection(h.Client)
if err := h.post.NewPostRequest(h.ctx, h.uri.String()); err != nil {
panic(err)
}
}
return h.post
}
func (h *HTTP) ReadWriter() io.ReadWriteCloser {
return nil
}
func (h *HTTP) GetRequest() *http.Request {
return h.get.Request()
}
func (h *HTTP) GetResponse() *http.Response {
return h.get.Response()
}
func (h *HTTP) PostRequest() *http.Request {
return h.post.Request()
}
func (h *HTTP) PostResponse() *http.Response {
return h.post.Response()
}

View File

@ -0,0 +1,75 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
_ "errors"
"io"
_ "os"
"net/url"
"net/http"
"context"
"path/filepath"
"fmt"
)
type HTTPReader struct {
*HTTP
get *HTTPConnection
}
func NewHTTPReader(u *url.URL, ctx context.Context) (h *HTTPReader, err error) {
h = &HTTPReader {
HTTP: &HTTP {
ctx: ctx,
uri: u,
path: filepath.Join(u.Hostname(), u.RequestURI()),
Client: http.DefaultClient,
},
get: NewHTTPConnection(http.DefaultClient),
}
h.extension()
h.DetectGzip()
err = h.get.NewGetRequest(h.ctx, h.uri.String())
return
}
func (h *HTTPReader) Signature() (documentSignature string) {
if h.get.Response() != nil {
documentSignature = h.get.Response().Header.Get("Signature")
if documentSignature == "" {
signatureResp, signatureErr := h.Client.Get(fmt.Sprintf("%s.sig", h.uri.String()))
if signatureErr == nil {
defer signatureResp.Body.Close()
readSignatureBody, readSignatureErr := io.ReadAll(signatureResp.Body)
if readSignatureErr == nil {
documentSignature = string(readSignatureBody)
}
}
}
}
return documentSignature
}
func (h *HTTPReader) ContentType() (contenttype string) {
contenttype = h.get.Response().Header.Get("Content-Type")
switch contenttype {
case "application/octet-stream":
return h.exttype
default:
}
return
}
func (h *HTTPReader) Reader() io.ReadCloser {
return h.get
}
func (h *HTTPReader) GetRequest() *http.Request {
return h.get.Request()
}
func (h *HTTPReader) GetResponse() *http.Response {
return h.get.Response()
}

View File

@ -0,0 +1,75 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
_ "errors"
"io"
_ "os"
"net/url"
"net/http"
"context"
"path/filepath"
)
// An HTTPReadWriter supports reading the HTTP response using the io.ReadWriteCloser interface.
type HTTPReadWriter struct {
*HTTP
post *HTTPConnection
}
func NewHTTPReadWriter(u *url.URL, ctx context.Context) (h *HTTPReadWriter, err error) {
h = &HTTPReadWriter {
HTTP: &HTTP {
ctx: ctx,
uri: u,
path: filepath.Join(u.Hostname(), u.RequestURI()),
},
post: NewHTTPConnection(http.DefaultClient),
}
h.extension()
h.DetectGzip()
err = h.post.NewPostRequest(h.ctx, h.uri.String())
return
}
/*
func (h *HTTP) Signature() (documentSignature string) {
if h.get.Response() != nil {
documentSignature = h.get.Response().Header.Get("Signature")
if documentSignature == "" {
signatureResp, signatureErr := h.Client.Get(fmt.Sprintf("%s.sig", h.uri.String()))
if signatureErr == nil {
defer signatureResp.Body.Close()
readSignatureBody, readSignatureErr := io.ReadAll(signatureResp.Body)
if readSignatureErr == nil {
documentSignature = string(readSignatureBody)
}
}
}
}
return documentSignature
}
func (h *HTTP) ContentType() (contenttype string) {
contenttype = h.get.Response().Header.Get("Content-Type")
switch contenttype {
case "application/octet-stream":
return h.exttype
default:
}
return
}
*/
func (h *HTTPReadWriter) ReadWriter() io.ReadWriteCloser {
return h.post
}
func (h *HTTPReadWriter) PostRequest() *http.Request {
return h.post.Request()
}
func (h *HTTPReadWriter) PostResponse() *http.Response {
return h.post.Response()
}

View File

@ -0,0 +1,91 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
"github.com/stretchr/testify/assert"
"testing"
"fmt"
_ "os"
"io"
"net/url"
_ "path/filepath"
"context"
"net/http"
"net/http/httptest"
"log/slog"
_ "strings"
"decl/internal/ext"
)
/*
func TestNewTransportHTTPReader(t *testing.T) {
//ctx := context.Background()
body := []byte(`
type: "user"
attributes:
name: "foo"
gecos: "foo user"
`)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/resource/user")
n,e := rw.Write(body)
assert.Nil(t, e)
assert.Greater(t, n, 0)
assert.Equal(t, "bar", req.Header.Get("foo"))
}))
defer server.Close()
u, urlErr := url.Parse(fmt.Sprintf("%s/resource/user", server.URL))
assert.Nil(t, urlErr)
h, err := NewHTTPReader(u, context.Background())
assert.Nil(t, err)
assert.NotNil(t, h)
h.Reader()
h.GetRequest().Header.Add("foo", "bar")
resData, readErr := io.ReadAll(h.Reader())
assert.Nil(t, readErr)
assert.Greater(t, len(resData), 0)
assert.Equal(t, body, resData)
}
*/
func TestNewTransportHTTPReadWriter(t *testing.T) {
body := []byte(`
type: "user"
attributes:
name: "foo"
gecos: "foo user"
`)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/resource/user")
postBody, readErr := io.ReadAll(req.Body)
assert.Nil(t, readErr)
assert.Greater(t, len(postBody), 0)
assert.Equal(t, "bar", req.Header.Get("foo"))
assert.Equal(t, body, postBody)
}))
defer server.Close()
u, urlErr := url.Parse(fmt.Sprintf("%s/resource/user", server.URL))
assert.Nil(t, urlErr)
h, err := NewHTTPReadWriter(u, context.Background())
assert.Nil(t, err)
assert.NotNil(t, h)
rw := h.ReadWriter()
h.PostRequest().Header.Add("foo", "bar")
slog.Info("TestNewTransportHTTPReadWriter()", "http", h, "rw", rw)
_, ok := rw.(io.ReaderFrom)
assert.True(t, ok)
bodyReader := ext.NewStringReader(string(body))
copyBuffer := make([]byte, 32 * 1024)
_, writeErr := io.CopyBuffer(rw, io.NopCloser(bodyReader), copyBuffer)
assert.Nil(t, writeErr)
}

View File

@ -13,6 +13,7 @@ _ "path/filepath"
"context"
"net/http"
"net/http/httptest"
"decl/internal/identifier"
)
func TestNewTransportHTTPReader(t *testing.T) {
@ -35,7 +36,7 @@ attributes:
u, urlErr := url.Parse(fmt.Sprintf("%s/resource/user", server.URL))
assert.Nil(t, urlErr)
h, err := NewHTTP(u, context.Background())
h, err := NewHTTPReader(u, context.Background())
assert.Nil(t, err)
assert.NotNil(t, h)
h.Reader()
@ -66,7 +67,7 @@ attributes:
u, urlErr := url.Parse(fmt.Sprintf("%s/resource/user", server.URL))
assert.Nil(t, urlErr)
h, err := NewHTTP(u, context.Background())
h, err := NewHTTPWriter(u, context.Background())
assert.Nil(t, err)
assert.NotNil(t, h)
h.Writer()
@ -76,31 +77,23 @@ attributes:
// assert.Nil(t, writeErr)
}
func TestNewHTTPConnection(t *testing.T) {
ctx := context.Background()
h := NewHTTPConnection(http.DefaultClient)
assert.NotNil(t, h)
body := []byte(`
type: "user"
attributes:
name: "foo"
gecos: "foo user"
`)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/resource/user")
n,e := rw.Write(body)
assert.Nil(t, e)
assert.Greater(t, n, 0)
assert.Equal(t, "bar", req.Header.Get("foo"))
}))
defer server.Close()
uri := fmt.Sprintf("%s/resource/user", server.URL)
assert.Nil(t, h.NewGetRequest(ctx, uri))
h.Request().Header.Add("foo", "bar")
responseData, responseErr := io.ReadAll(h)
assert.Nil(t, responseErr)
assert.Equal(t, body, responseData)
func TestHTTPExt(t *testing.T) {
for _, v := range []struct { File identifier.ID
ExpectedExt string
ExpectedType string }{
{ File: "file:///tmp/foo/bar/baz.txt", ExpectedExt: "", ExpectedType: "txt" },
{ File: "file:///tmp/foo/bar/baz.txt.gz", ExpectedExt: "gz", ExpectedType: "txt" },
{ File: "file:///tmp/foo/bar/baz.quuz.txt.gz", ExpectedExt: "gz", ExpectedType: "txt" },
} {
u := v.File.Parse()
assert.Equal(t, "file", u.Scheme)
filetype, fileext := v.File.Extension()
assert.Equal(t, v.ExpectedType, filetype)
assert.Equal(t, v.ExpectedExt, fileext)
h, err := NewHTTP(u, context.Background())
assert.Nil(t, err)
assert.Equal(t, v.ExpectedType, h.exttype)
assert.Equal(t, v.ExpectedExt, h.fileext)
}
}

View File

@ -0,0 +1,74 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
_ "errors"
"io"
_ "os"
"net/url"
"net/http"
"context"
"path/filepath"
)
type HTTPWriter struct {
*HTTP
post *HTTPConnection
}
func NewHTTPWriter(u *url.URL, ctx context.Context) (h *HTTPWriter, err error) {
h = &HTTPWriter {
HTTP: &HTTP {
ctx: ctx,
uri: u,
path: filepath.Join(u.Hostname(), u.RequestURI()),
},
post: NewHTTPConnection(http.DefaultClient),
}
h.extension()
h.DetectGzip()
err = h.post.NewPostRequest(h.ctx, h.uri.String())
return
}
/*
func (h *HTTP) Signature() (documentSignature string) {
if h.get.Response() != nil {
documentSignature = h.get.Response().Header.Get("Signature")
if documentSignature == "" {
signatureResp, signatureErr := h.Client.Get(fmt.Sprintf("%s.sig", h.uri.String()))
if signatureErr == nil {
defer signatureResp.Body.Close()
readSignatureBody, readSignatureErr := io.ReadAll(signatureResp.Body)
if readSignatureErr == nil {
documentSignature = string(readSignatureBody)
}
}
}
}
return documentSignature
}
func (h *HTTP) ContentType() (contenttype string) {
contenttype = h.get.Response().Header.Get("Content-Type")
switch contenttype {
case "application/octet-stream":
return h.exttype
default:
}
return
}
*/
func (h *HTTPWriter) Writer() io.WriteCloser {
return h.post
}
func (h *HTTPWriter) PostRequest() *http.Request {
return h.post.Request()
}
func (h *HTTPWriter) PostResponse() *http.Response {
return h.post.Response()
}

View File

@ -0,0 +1,102 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
_ "errors"
"io"
_ "os"
"net/http"
"context"
"log/slog"
)
type HTTPConnection struct {
stream *Pipe
request *http.Request
response *http.Response
Client *http.Client
}
func NewHTTPConnection(client *http.Client) *HTTPConnection {
return &HTTPConnection {
Client: client,
}
}
func (h *HTTPConnection) NewPostRequest(ctx context.Context, uri string) (err error) {
h.stream = NewPipe()
h.request, err = http.NewRequestWithContext(ctx, "POST", uri, h.Reader())
slog.Info("transport.HTTPConnection.NewPostRequest()", "stream", h.stream, "request", h.request)
return
}
func (h *HTTPConnection) NewGetRequest(ctx context.Context, uri string) (err error) {
h.request, err = http.NewRequestWithContext(ctx, "GET", uri, nil)
return
}
func (h *HTTPConnection) Request() *http.Request {
return h.request
}
func (h *HTTPConnection) Response() *http.Response {
return h.response
}
func (h *HTTPConnection) Writer() io.WriteCloser {
return h.stream.Writer
}
func (h *HTTPConnection) Reader() io.ReadCloser {
return h.stream.Reader
}
func (h *HTTPConnection) Do() (err error) {
slog.Info("transport.HTTPConnection.Do()", "connection", h, "request", h.request)
h.response, err = h.Client.Do(h.request)
return
}
func (h *HTTPConnection) Read(p []byte) (n int, err error) {
slog.Info("transport.Read()", "request", h.request)
if h.response == nil {
if err = h.Do(); err != nil {
return
}
}
return h.response.Body.Read(p)
}
func (h *HTTPConnection) Write(p []byte) (n int, err error) {
if h.response == nil {
go func() { err = h.Do() }()
if err != nil {
return
}
}
slog.Info("transport.HTTPConnection.Write()", "data", p, "connection", h)
return h.Writer().Write(p)
}
func (h *HTTPConnection) ReadFrom(r io.Reader) (n int64, err error) {
slog.Info("transport.ReadFrom()", "request", h.request)
h.request.Body = r.(io.ReadCloser)
if h.response == nil {
if err = h.Do(); err != nil {
return
}
}
return h.request.ContentLength, nil
}
func (h *HTTPConnection) Close() (err error) {
if h.response != nil {
defer h.response.Body.Close()
}
if h.stream != nil {
err = h.Writer().Close()
}
return
}

View File

@ -0,0 +1,44 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
"github.com/stretchr/testify/assert"
"testing"
"fmt"
_ "os"
"io"
_ "path/filepath"
"context"
"net/http"
"net/http/httptest"
)
func TestNewHTTPConnection(t *testing.T) {
ctx := context.Background()
h := NewHTTPConnection(http.DefaultClient)
assert.NotNil(t, h)
body := []byte(`
type: "user"
attributes:
name: "foo"
gecos: "foo user"
`)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/resource/user")
n,e := rw.Write(body)
assert.Nil(t, e)
assert.Greater(t, n, 0)
assert.Equal(t, "bar", req.Header.Get("foo"))
}))
defer server.Close()
uri := fmt.Sprintf("%s/resource/user", server.URL)
assert.Nil(t, h.NewGetRequest(ctx, uri))
h.Request().Header.Add("foo", "bar")
responseData, responseErr := io.ReadAll(h)
assert.Nil(t, responseErr)
assert.Equal(t, body, responseData)
}

View File

@ -0,0 +1,36 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
"io/fs"
"decl/internal/identifier"
"net/url"
"path/filepath"
"time"
)
type HTTPFileInfo struct {
URI identifier.ID
parsedURI *url.URL
ContentLength int64
LastModified time.Time
ContentType string
Collection bool
}
func NewHTTPFileInfo(uri string) (h *HTTPFileInfo) {
h = &HTTPFileInfo{ URI: identifier.ID(uri) }
h.parsedURI = h.URI.Parse()
if h.parsedURI == nil {
return nil
}
return
}
func (h *HTTPFileInfo) Size() int64 { return h.ContentLength }
func (h *HTTPFileInfo) Name() string { return filepath.Base(h.parsedURI.Path) }
func (h *HTTPFileInfo) Mode() fs.FileMode { return 0777 }
func (h *HTTPFileInfo) ModTime() time.Time { return h.LastModified }
func (h *HTTPFileInfo) IsDir() bool { return h.Collection }
func (h *HTTPFileInfo) Sys() any { return any(h) }

View File

@ -0,0 +1,63 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
"github.com/stretchr/testify/assert"
"testing"
"fmt"
_ "os"
"io/fs"
_ "path/filepath"
"net/http"
"net/http/httptest"
"time"
)
func TestNewHTTPFileInfo(t *testing.T) {
for _, v := range []struct { Valid bool; URL string } {
{ Valid: true, URL: "https://localhost/test" },
{ Valid: false, URL: "a_b://localhost/test" },
} {
h := NewHTTPFileInfo(v.URL)
if v.Valid {
assert.NotNil(t, h)
assert.NotNil(t, h.parsedURI)
} else {
assert.Nil(t, h)
}
}
}
func TestHTTPFileInfo(t *testing.T) {
expectedTime := time.Now()
body := []byte(`
type: "user"
attributes:
name: "foo"
gecos: "foo user"
`)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/resource/user")
rw.Header().Add("Content-Type", "application/yaml")
n,e := rw.Write(body)
assert.Nil(t, e)
assert.Greater(t, n, 0)
}))
defer server.Close()
uri := fmt.Sprintf("%s/resource/user", server.URL)
r, readerErr := NewReaderURI(uri)
assert.Nil(t, readerErr)
fi, statErr := r.Stat()
assert.Nil(t, statErr)
assert.Equal(t, "user", fi.Name())
assert.Equal(t, int64(len(body)), fi.Size())
assert.Equal(t, "application/yaml", fi.Sys().(*HTTPFileInfo).ContentType)
assert.Greater(t, expectedTime, fi.ModTime())
assert.False(t, fi.IsDir())
assert.Equal(t, fs.FileMode(0777), fi.Mode())
}

View File

@ -0,0 +1,17 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
"io"
)
type Pipe struct {
Reader io.ReadCloser
Writer io.WriteCloser
}
func NewPipe() *Pipe {
r,w := io.Pipe()
return &Pipe{ Reader: r, Writer: w }
}

View File

@ -0,0 +1,57 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
_ "errors"
_ "fmt"
_ "net/url"
_ "net/http"
_ "strings"
_ "path/filepath"
"io"
_ "os"
_ "context"
)
type ReadFilter func([]byte, int, error) (int, error)
var ReadBufferSize int = 32 * 1024
type readerFrom struct {
io.Writer
filter ReadFilter
}
func (w readerFrom) ReadFrom(r io.Reader) (n int64, err error) {
readBuffer := make([]byte, ReadBufferSize)
for {
var readBytes int
if readBytes, err = r.Read(readBuffer); readBytes > 0 {
if w.filter != nil {
readBytes, err = w.filter(readBuffer, readBytes, err)
}
writeBytes, writeErr := w.Write(readBuffer[0:readBytes])
if writeBytes > 0 {
n += int64(writeBytes)
}
if readBytes != writeBytes {
writeErr = io.ErrShortWrite
}
if writeErr != nil {
break
}
}
if err != nil {
if err == io.EOF {
err = nil
}
break
}
}
return
}
func NewReaderFrom(w io.Writer, filter ReadFilter) io.ReaderFrom {
return readerFrom{ Writer: w, filter: filter}
}

View File

@ -0,0 +1,112 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package transport
import (
_ "errors"
_ "fmt"
"net/url"
"net/http"
_ "strings"
_ "path/filepath"
"io"
_ "os"
"context"
"io/fs"
)
type HandlerReadWriter interface {
ReadWriter() io.ReadWriteCloser
}
type ReadWriter struct {
uri *url.URL
handle Handler
stream io.ReadWriteCloser
exists func() bool
}
func NewReadWriter(u *url.URL) (readWriter *ReadWriter, e error) {
return NewReadWriterWithContext(u, context.Background())
}
func NewReadWriterWithContext(u *url.URL, ctx context.Context) (readWriter *ReadWriter, e error) {
readWriter = &ReadWriter{ uri: u }
switch u.Scheme {
case "http", "https":
readWriter.handle, e = NewHTTPReadWriter(u, ctx)
case "file":
fallthrough
default:
readWriter.handle, e = NewFile(u)
readWriter.exists = func() bool { return FileExists(u) }
}
readWriter.SetStream(readWriter.handle.(HandlerReadWriter).ReadWriter())
return
}
func NewReadWriterURI(uri string) (readWriter *ReadWriter, e error) {
var u *url.URL
if u, e = url.Parse(uri); e == nil {
return NewReadWriter(u)
}
return
}
func (r *ReadWriter) Exists() bool { return r.exists() }
func (r *ReadWriter) Read(b []byte) (int, error) {
return r.stream.Read(b)
}
func (r *ReadWriter) Close() error {
return r.stream.Close()
}
func (r *ReadWriter) ContentType() string {
return r.handle.ContentType()
}
func (r *ReadWriter) SetGzip(value bool) {
r.handle.SetGzip(value)
}
func (r *ReadWriter) Gzip() bool {
return r.handle.Gzip()
}
func (r *ReadWriter) Signature() string {
return r.handle.Signature()
}
func (r *ReadWriter) SetStream(s io.ReadWriteCloser) {
r.stream = s
}
func (r *ReadWriter) Write(b []byte) (int, error) {
return r.stream.Write(b)
}
func (r *ReadWriter) ReadFrom(reader io.Reader) (n int64, e error) {
return r.stream.(io.ReaderFrom).ReadFrom(reader)
}
func (r *ReadWriter) AddHeader(name string, value string) {
r.handle.(*HTTPReadWriter).PostRequest().Header.Add(name, value)
}
func (r *ReadWriter) Status() string {
return r.handle.(*HTTPReadWriter).PostResponse().Status
}
func (r *ReadWriter) StatusCode() int {
return r.handle.(*HTTPReadWriter).PostResponse().StatusCode
}
func (r *ReadWriter) Response() *http.Response {
return r.handle.(*HTTPReadWriter).PostResponse()
}
func (r *ReadWriter) Stat() (info fs.FileInfo, err error) {
return r.handle.Stat()
}

View File

@ -6,7 +6,7 @@ import (
_ "errors"
_ "fmt"
"net/url"
_ "net/http"
"net/http"
_ "strings"
_ "path/filepath"
"io"
@ -47,7 +47,7 @@ func NewReaderWithContext(u *url.URL, ctx context.Context) (reader *Reader, e er
reader = &Reader{ uri: u }
switch u.Scheme {
case "http", "https":
reader.handle, e = NewHTTP(u, ctx)
reader.handle, e = NewHTTPReader(u, ctx)
case "file":
fallthrough
default:
@ -82,7 +82,7 @@ func NewWriterWithContext(u *url.URL, ctx context.Context) (writer *Writer, e er
switch u.Scheme {
case "http", "https":
writer.handle, e = NewHTTP(u, ctx)
writer.handle, e = NewHTTPWriter(u, ctx)
case "file":
fallthrough
default:
@ -155,15 +155,19 @@ func (r *Reader) SetStream(s io.ReadCloser) {
}
func (r *Reader) AddHeader(name string, value string) {
r.handle.(*HTTP).GetRequest().Header.Add(name, value)
r.handle.(*HTTPReader).GetRequest().Header.Add(name, value)
}
func (r *Reader) Status() string {
return r.handle.(*HTTP).GetResponse().Status
return r.handle.(*HTTPReader).GetResponse().Status
}
func (r *Reader) StatusCode() int {
return r.handle.(*HTTP).GetResponse().StatusCode
return r.handle.(*HTTPReader).GetResponse().StatusCode
}
func (r *Reader) Response() *http.Response {
return r.handle.(*HTTPReader).GetResponse()
}
func (w *Writer) Exists() bool { return w.exists() }
@ -205,13 +209,17 @@ func (w *Writer) SetStream(s io.WriteCloser) {
}
func (w *Writer) AddHeader(name string, value string) {
w.handle.(*HTTP).PostRequest().Header.Add(name, value)
w.handle.(*HTTPWriter).PostRequest().Header.Add(name, value)
}
func (w *Writer) Status() string {
return w.handle.(*HTTP).PostResponse().Status
return w.handle.(*HTTPWriter).PostResponse().Status
}
func (w *Writer) StatusCode() int {
return w.handle.(*HTTP).PostResponse().StatusCode
return w.handle.(*HTTPWriter).PostResponse().StatusCode
}
func (w *Writer) Response() *http.Response {
return w.handle.(*HTTPWriter).PostResponse()
}