630 lines
15 KiB
Go
630 lines
15 KiB
Go
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
|
|
|
package resource
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"gopkg.in/yaml.v3"
|
|
"io"
|
|
"io/fs"
|
|
"net/url"
|
|
"net/http"
|
|
_ "os"
|
|
"encoding/json"
|
|
"strings"
|
|
"log/slog"
|
|
"gitea.rosskeen.house/rosskeen.house/machine"
|
|
"decl/internal/codec"
|
|
"decl/internal/data"
|
|
"decl/internal/transport"
|
|
"decl/internal/folio"
|
|
"decl/internal/iofilter"
|
|
"decl/internal/ext"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"time"
|
|
)
|
|
|
|
/* HTTP resource
|
|
Lifecycle
|
|
|
|
transitions:
|
|
|
|
Create
|
|
* Stat request
|
|
* if the resource is absent then execute a POST request to create it.
|
|
* if the resource is present then throw an error.
|
|
Read
|
|
* Stat request
|
|
* if the resource is present then execute a GET request to retrieve the content.
|
|
Update
|
|
* Stat request
|
|
* execute a PUT request to update the resource.
|
|
Delete
|
|
* Stat request
|
|
* DELETE request
|
|
*/
|
|
|
|
const (
|
|
HTTPTypeName TypeName = "http"
|
|
)
|
|
|
|
var (
|
|
ErrUndefinedContentSource error = errors.New("Cannot create reader: no content defined")
|
|
)
|
|
|
|
func init() {
|
|
folio.DocumentRegistry.ResourceTypes.Register([]string{"http", "https"}, HTTPFactory)
|
|
}
|
|
|
|
func HTTPFactory(u *url.URL) data.Resource {
|
|
var err error
|
|
h := NewHTTP()
|
|
|
|
slog.Info("HTTP.Factory", "http", h, "url", u)
|
|
if err = h.SetParsedURI(u); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err = h.Open(); err != nil {
|
|
panic(err)
|
|
}
|
|
return h
|
|
}
|
|
|
|
type HTTPHeader struct {
|
|
Name string `yaml:"name" json:"name"`
|
|
Value string `yaml:"value" json:"value"`
|
|
}
|
|
|
|
// Manage the state of an HTTP endpoint
|
|
type HTTP struct {
|
|
*Common `yaml:",inline" json:",inline"`
|
|
stater machine.Stater `yaml:"-" json:"-"`
|
|
client *http.Client `yaml:"-" json:"-"`
|
|
Endpoint folio.URI `yaml:"endpoint" json:"endpoint"`
|
|
|
|
Headers []HTTPHeader `yaml:"headers,omitempty" json:"headers,omitempty"`
|
|
Content string `yaml:"body,omitempty" json:"body,omitempty"`
|
|
ContentSourceRef folio.ResourceReference `json:"sourceref,omitempty" yaml:"sourceref,omitempty"`
|
|
Status string `yaml:"status,omitempty" json:"status,omitempty"`
|
|
StatusCode int `yaml:"statuscode,omitempty" json:"statuscode,omitempty"`
|
|
Sha256 string `yaml:"sha256,omitempty" json:"sha256,omitempty"`
|
|
SerializeContent bool `json:"serializecontent,omitempty" yaml:"serializecontent,omitempty"`
|
|
LastModified time.Time `json:"lastmodified,omitempty" yaml:"lastmodified,omitempty"`
|
|
Size int64 `yaml:"size,omitempty" json:"size,omitempty"`
|
|
SignatureValue string `yaml:"signature,omitempty" json:"signature,omitempty"`
|
|
Resources data.ResourceMapper `yaml:"-" json:"-"`
|
|
reader *transport.Reader `yaml:"-" json:"-"`
|
|
writer *transport.ReadWriter `yaml:"-" json:"-"`
|
|
}
|
|
|
|
func NewHTTP() *HTTP {
|
|
h := &HTTP{ client: &http.Client{} }
|
|
h.Common = NewCommon(HTTPTypeName, true)
|
|
h.Common.SchemeCheck = h.SchemeCheck
|
|
h.Common.NormalizePath = h.NormalizePath
|
|
slog.Info("NewHTTP()", "http", h)
|
|
return h
|
|
}
|
|
|
|
func (h *HTTP) SchemeCheck(scheme string) bool {
|
|
switch scheme {
|
|
case "http", "https":
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (h *HTTP) NormalizePath() error {
|
|
return nil
|
|
}
|
|
|
|
func (h *HTTP) SetResourceMapper(resources data.ResourceMapper) {
|
|
h.Resources = resources
|
|
}
|
|
|
|
func (h *HTTP) Open() (err error) {
|
|
u := h.Common.parsedURI
|
|
if u == nil {
|
|
err = fmt.Errorf("HTTP parsed URI is not set: %s", h.Endpoint)
|
|
}
|
|
/*
|
|
else {
|
|
err = h.OpenGetter()
|
|
}
|
|
*/
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) OpenGetter() (err error) {
|
|
h.reader, err = transport.NewReader(h.Common.parsedURI)
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) OpenPoster() (err error) {
|
|
h.writer, err = transport.NewReadWriter(h.Common.parsedURI)
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) Clone() data.Resource {
|
|
return &HTTP {
|
|
Common: h.Common.Clone(),
|
|
client: h.client,
|
|
Endpoint: h.Endpoint,
|
|
Headers: h.Headers,
|
|
Content: h.Content,
|
|
reader: h.reader,
|
|
writer: h.writer,
|
|
}
|
|
}
|
|
|
|
func (h *HTTP) StateMachine() machine.Stater {
|
|
if h.stater == nil {
|
|
h.stater = StorageMachine(h)
|
|
}
|
|
return h.stater
|
|
}
|
|
|
|
func (h *HTTP) Notify(m *machine.EventMessage) {
|
|
ctx := context.Background()
|
|
slog.Info("Notify()", "http", h, "m", m)
|
|
switch m.On {
|
|
case machine.ENTERSTATEEVENT:
|
|
switch m.Dest {
|
|
case "start_stat":
|
|
if statErr := h.ReadStat(); statErr == nil {
|
|
if triggerErr := h.StateMachine().Trigger("exists"); triggerErr == nil {
|
|
return
|
|
}
|
|
} else {
|
|
if triggerErr := h.StateMachine().Trigger("notexists"); triggerErr == nil {
|
|
return
|
|
}
|
|
}
|
|
case "start_read":
|
|
if _,readErr := h.Read(ctx); readErr == nil {
|
|
if triggerErr := h.StateMachine().Trigger("state_read"); triggerErr == nil {
|
|
return
|
|
} else {
|
|
h.Common.State = "absent"
|
|
panic(triggerErr)
|
|
}
|
|
} else {
|
|
h.Common.State = "absent"
|
|
panic(readErr)
|
|
}
|
|
case "start_create":
|
|
if e := h.Create(ctx); e == nil {
|
|
if triggerErr := h.stater.Trigger("created"); triggerErr == nil {
|
|
return
|
|
}
|
|
}
|
|
h.Common.State = "absent"
|
|
case "start_delete":
|
|
if deleteErr := h.Delete(ctx); deleteErr == nil {
|
|
if triggerErr := h.StateMachine().Trigger("deleted"); triggerErr == nil {
|
|
return
|
|
} else {
|
|
h.Common.State = "present"
|
|
panic(triggerErr)
|
|
}
|
|
} else {
|
|
h.Common.State = "present"
|
|
panic(deleteErr)
|
|
}
|
|
case "absent":
|
|
h.Common.State = "absent"
|
|
case "present", "created", "read":
|
|
h.Common.State = "present"
|
|
}
|
|
case machine.EXITSTATEEVENT:
|
|
}
|
|
}
|
|
|
|
func (h *HTTP) URI() string {
|
|
return h.Endpoint.String()
|
|
}
|
|
|
|
func (h *HTTP) SetURI(uri string) (err error) {
|
|
if err = h.Common.SetURI(uri); err == nil {
|
|
h.Endpoint = h.Common.Uri
|
|
}
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) SetParsedURI(u *url.URL) (err error) {
|
|
if err = h.Common.SetParsedURI(u); err == nil {
|
|
h.Endpoint = h.Common.Uri
|
|
}
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) SetFileInfo(info fs.FileInfo) error {
|
|
if info != nil {
|
|
h.LastModified = info.ModTime()
|
|
h.Size = info.Size()
|
|
|
|
contentType := info.Sys().(*transport.HTTPFileInfo).ContentType
|
|
if contentType != "" {
|
|
contentTypeHeader := &HTTPHeader{ Name: "Content-Type", Value: contentType }
|
|
for _, h := range h.Headers {
|
|
if h.Name == "Content-Type" {
|
|
h.Value = contentType
|
|
contentTypeHeader = nil
|
|
}
|
|
}
|
|
if contentTypeHeader != nil {
|
|
h.Headers = append(h.Headers, *contentTypeHeader)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
return ErrInvalidFileInfo
|
|
}
|
|
|
|
func (h *HTTP) ContentSourceRefStat() (info fs.FileInfo) {
|
|
if len(h.ContentSourceRef) > 0 {
|
|
rs, _ := h.ContentReaderStream()
|
|
info, _ = rs.Stat()
|
|
rs.Close()
|
|
}
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) ReadStat() (err error) {
|
|
|
|
if h.reader == nil {
|
|
if err = h.OpenGetter(); err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
var info fs.FileInfo
|
|
info, err = h.reader.Stat()
|
|
|
|
if err == nil {
|
|
_ = h.SetFileInfo(info)
|
|
} else {
|
|
if refStat := h.ContentSourceRefStat(); refStat != nil {
|
|
_ = h.SetFileInfo(refStat)
|
|
err = nil
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
h.State = "absent"
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) ContentReaderStream() (*transport.Reader, error) {
|
|
if len(h.Content) == 0 && len(h.ContentSourceRef) != 0 {
|
|
return h.ContentSourceRef.Lookup(nil).ContentReaderStream()
|
|
}
|
|
return nil, fmt.Errorf("Cannot provide transport reader for string content")
|
|
}
|
|
|
|
func (h *HTTP) ContentWriterStream() (*transport.Writer, error) {
|
|
if len(h.Content) == 0 && len(h.ContentSourceRef) != 0 {
|
|
return h.ContentSourceRef.Lookup(nil).ContentWriterStream()
|
|
}
|
|
return nil, fmt.Errorf("Cannot provide transport writer for string content")
|
|
}
|
|
|
|
func (h *HTTP) JSON() ([]byte, error) {
|
|
return json.Marshal(h)
|
|
}
|
|
|
|
func (h *HTTP) Validate() error {
|
|
s := NewSchema(h.Type())
|
|
jsonDoc, jsonErr := h.JSON()
|
|
if jsonErr == nil {
|
|
return s.Validate(string(jsonDoc))
|
|
}
|
|
return jsonErr
|
|
}
|
|
|
|
func (h *HTTP) Apply() error {
|
|
switch h.Common.State {
|
|
case "absent":
|
|
case "present":
|
|
}
|
|
_,e := h.Read(context.Background())
|
|
if e == nil {
|
|
h.Common.State = "present"
|
|
}
|
|
return e
|
|
}
|
|
|
|
func (h *HTTP) Load(docData []byte, f codec.Format) (err error) {
|
|
if err = f.StringDecoder(string(docData)).Decode(h); err == nil {
|
|
err = h.Common.SetURI(string(h.Endpoint))
|
|
}
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) LoadReader(r io.ReadCloser, f codec.Format) (err error) {
|
|
if err = f.Decoder(r).Decode(h); err == nil {
|
|
err = h.Common.SetURI(string(h.Endpoint))
|
|
//err = h.setParsedURI(h.Endpoint)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) LoadString(docData string, f codec.Format) (err error) {
|
|
if err = f.StringDecoder(docData).Decode(h); err == nil {
|
|
err = h.Common.SetURI(string(h.Endpoint))
|
|
//err = h.setParsedURI(h.Endpoint)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) LoadDecl(yamlResourceDeclaration string) error {
|
|
return h.LoadString(yamlResourceDeclaration, codec.FormatYaml)
|
|
}
|
|
|
|
func (h *HTTP) ResolveId(ctx context.Context) string {
|
|
_ = h.Common.SetURI(h.Endpoint.String())
|
|
slog.Info("HTTP.ResolveId()", "uri", h.Endpoint.String())
|
|
return h.Endpoint.String()
|
|
}
|
|
|
|
func (h *HTTP) Signature() folio.Signature {
|
|
var s folio.Signature
|
|
if e := (&s).SetHexString(h.SignatureValue); e != nil {
|
|
panic(e)
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (h *HTTP) Hash() (shaBytes []byte) {
|
|
shaBytes, _ = hex.DecodeString(h.Sha256)
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) HashHexString() string {
|
|
return h.Sha256
|
|
}
|
|
|
|
func (h *HTTP) Create(ctx context.Context) (err error) {
|
|
slog.Info("HTTP.Create()", "http", h)
|
|
|
|
if err = h.OpenPoster(); err != nil {
|
|
return
|
|
}
|
|
|
|
var contentReader io.ReadCloser
|
|
h.writer, err = transport.NewReadWriterWithContext(h.Common.parsedURI, ctx)
|
|
if err != nil {
|
|
slog.Error("HTTP.Create()", "http", h, "error", err)
|
|
//panic(err)
|
|
return
|
|
}
|
|
|
|
slog.Info("HTTP.Create() content", "http", h)
|
|
|
|
// create should post to the named resource using the resource content or sourceref.
|
|
contentReader, err = h.contentSourceReader()
|
|
if err != nil {
|
|
slog.Error("HTTP.Create()", "error", err)
|
|
return
|
|
}
|
|
contentReader = h.UpdateContentAttributesFromReader(contentReader)
|
|
srcRead := ext.NewReadCloser(contentReader)
|
|
if _, ok := srcRead.(io.WriterTo); ok {
|
|
panic("reader has writerto interface")
|
|
}
|
|
if _, ok := any(h.writer).(io.ReaderFrom); !ok {
|
|
panic("writer is missing io.ReaderFrom")
|
|
}
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
defer func () {
|
|
h.writer.Close()
|
|
contentReader.Close()
|
|
}()
|
|
|
|
err = h.AddAuthorizationTokenFromConfigToTransport()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if len(h.Headers) > 0 {
|
|
for _, header := range h.Headers {
|
|
h.writer.AddHeader(header.Name, header.Value)
|
|
}
|
|
}
|
|
|
|
slog.Error("HTTP.Create()", "http", h, "reader", h.reader)
|
|
|
|
copyBuffer := make([]byte, 32 * 1024)
|
|
_, writeErr := io.CopyBuffer(h.writer, srcRead, copyBuffer)
|
|
if writeErr != nil {
|
|
return fmt.Errorf("Http.Create(): CopyBuffer failed %v %v: %w", h.writer, contentReader, writeErr)
|
|
}
|
|
h.Status = h.writer.Status()
|
|
h.StatusCode = h.writer.StatusCode()
|
|
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) Update(ctx context.Context) error {
|
|
return h.Create(ctx)
|
|
}
|
|
|
|
func (h *HTTP) ReadAuthorizationTokenFromConfig(req *http.Request) error {
|
|
if h.Common.config != nil {
|
|
token, tokenErr := h.Common.config.GetValue("authorization_token")
|
|
if tokenErr == nil {
|
|
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
|
|
}
|
|
slog.Info("ReadAuthorizationTokenFromConfig()", "error", tokenErr)
|
|
return tokenErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *HTTP) AddAuthorizationTokenFromConfigToTransport() (err error) {
|
|
if h.Common.config != nil {
|
|
token, tokenErr := h.Common.config.GetValue("authorization_token")
|
|
slog.Info("HTTP.AddAuthorizationTokenFromConfigToTransport()", "error", tokenErr)
|
|
if tokenErr == nil {
|
|
if h.reader != nil {
|
|
h.reader.AddHeader("Authorization", fmt.Sprintf("Bearer %s", token))
|
|
}
|
|
if h.writer != nil {
|
|
h.writer.AddHeader("Authorization", fmt.Sprintf("Bearer %s", token))
|
|
}
|
|
} else {
|
|
err = tokenErr
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) UpdateContentAttributes() {
|
|
h.Size = int64(len(h.Content))
|
|
h.Sha256 = fmt.Sprintf("%x", sha256.Sum256([]byte(h.Content)))
|
|
}
|
|
|
|
func (h *HTTP) UpdateContentAttributesFromReader(reader io.ReadCloser) io.ReadCloser {
|
|
var content strings.Builder
|
|
hash := sha256.New()
|
|
h.Size = 0
|
|
h.Content = ""
|
|
h.Sha256 = ""
|
|
return iofilter.NewReader(reader, func(p []byte, readn int, readerr error) (n int, err error) {
|
|
hash.Write(p[:readn])
|
|
h.Sha256 = fmt.Sprintf("%x", hash.Sum(nil))
|
|
h.Size += int64(readn)
|
|
if len(h.ContentSourceRef) == 0 || h.SerializeContent {
|
|
content.Write(p[:readn])
|
|
h.Content = content.String()
|
|
}
|
|
return readn, readerr
|
|
})
|
|
}
|
|
|
|
func (h *HTTP) SetContent(r io.Reader) error {
|
|
fileContent, ioErr := io.ReadAll(r)
|
|
h.Content = string(fileContent)
|
|
h.UpdateContentAttributes()
|
|
return ioErr
|
|
}
|
|
|
|
func (h *HTTP) GetContent(w io.Writer) (contentReader io.ReadCloser, err error) {
|
|
slog.Info("Http.GetContent()", "content", len(h.Content), "sourceref", h.ContentSourceRef)
|
|
contentReader, err = h.readThru(context.Background())
|
|
|
|
if w != nil {
|
|
copyBuffer := make([]byte, 32 * 1024)
|
|
_, writeErr := io.CopyBuffer(w, contentReader, copyBuffer)
|
|
if writeErr != nil {
|
|
return nil, fmt.Errorf("Http.GetContent(): CopyBuffer failed %v %v: %w", w, contentReader, writeErr)
|
|
}
|
|
return nil, nil
|
|
}
|
|
if v, ok := contentReader.(*transport.Reader); ok {
|
|
h.SignatureValue = v.Signature()
|
|
}
|
|
return
|
|
}
|
|
|
|
/*
|
|
func (h *HTTP) writeThru(ctx context.Context) (contentWriter io.WriteCloser, err error) {
|
|
if len(h.ContentSourceRef) != 0 {
|
|
contentReader, err = h.ContentSourceRef.Lookup(nil).ContentReaderStream()
|
|
contentReader.(*transport.Reader).SetGzip(false)
|
|
} else {
|
|
if len(h.Content) != 0 {
|
|
contentReader = io.NopCloser(strings.NewReader(h.Content))
|
|
} else {
|
|
//contentReader, err = os.Open(f.Path)
|
|
contentReader = transport.NewReaderWithContext(u, ctx)
|
|
contentReader.(*transport.Reader).SetGzip(false)
|
|
}
|
|
}
|
|
contentReader = h.UpdateContentAttributesFromReader(contentReader)
|
|
return
|
|
}
|
|
*/
|
|
|
|
func (h *HTTP) hasContentSource() bool {
|
|
return len(h.ContentSourceRef) > 0 || len(h.Content) > 0
|
|
}
|
|
|
|
func (h *HTTP) contentSourceReader() (contentReader io.ReadCloser, err error) {
|
|
if len(h.ContentSourceRef) != 0 {
|
|
contentReader, err = h.ContentSourceRef.Lookup(nil).ContentReaderStream()
|
|
contentReader.(*transport.Reader).SetGzip(false)
|
|
} else {
|
|
if len(h.Content) != 0 {
|
|
contentReader = io.NopCloser(ext.NewStringReader(h.Content))
|
|
} else {
|
|
err = ErrUndefinedContentSource
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// set up reader for source content
|
|
func (h *HTTP) readThru(ctx context.Context) (contentReader io.ReadCloser, err error) {
|
|
if h.hasContentSource() {
|
|
contentReader, err = h.contentSourceReader()
|
|
} else {
|
|
if h.reader == nil {
|
|
h.reader, err = transport.NewReaderWithContext(h.Common.parsedURI, ctx)
|
|
h.reader.SetGzip(false)
|
|
}
|
|
contentReader = h.reader
|
|
}
|
|
contentReader = h.UpdateContentAttributesFromReader(contentReader)
|
|
return
|
|
}
|
|
|
|
func (h *HTTP) Read(ctx context.Context) (yamlData []byte, err error) {
|
|
var contentReader io.ReadCloser
|
|
contentReader, err = h.readThru(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
defer contentReader.Close()
|
|
err = h.AddAuthorizationTokenFromConfigToTransport()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if len(h.Headers) > 0 {
|
|
for _, header := range h.Headers {
|
|
h.reader.AddHeader(header.Name, header.Value)
|
|
}
|
|
}
|
|
|
|
slog.Info("HTTP.Read()", "reader", h.reader, "http", h)
|
|
if err = h.SetContent(contentReader); err != nil {
|
|
return
|
|
}
|
|
|
|
if h.writer == nil {
|
|
h.Status = h.reader.Status()
|
|
h.StatusCode = h.reader.StatusCode()
|
|
}
|
|
return yaml.Marshal(h)
|
|
}
|
|
|
|
func (h *HTTP) Delete(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (h *HTTP) Type() string {
|
|
return "http"
|
|
}
|