// Copyright 2024 Matthew Rich . All rights reserved. package resource import ( "context" "errors" "fmt" "gopkg.in/yaml.v3" "io" "io/fs" "net/url" "net/http" _ "os" "encoding/json" "strings" "log/slog" "gitea.rosskeen.house/rosskeen.house/machine" "decl/internal/codec" "decl/internal/data" "decl/internal/transport" "decl/internal/folio" "decl/internal/iofilter" "decl/internal/ext" "crypto/sha256" "encoding/hex" "time" ) /* HTTP resource Lifecycle transitions: Create * Stat request * if the resource is absent then execute a POST request to create it. * if the resource is present then throw an error. Read * Stat request * if the resource is present then execute a GET request to retrieve the content. Update * Stat request * execute a PUT request to update the resource. Delete * Stat request * DELETE request */ const ( HTTPTypeName TypeName = "http" ) var ( ErrUndefinedContentSource error = errors.New("Cannot create reader: no content defined") ) func init() { folio.DocumentRegistry.ResourceTypes.Register([]string{"http", "https"}, HTTPFactory) } func HTTPFactory(u *url.URL) data.Resource { h := NewHTTP() if u != nil { if err := folio.CastParsedURI(u).ConstructResource(h); err != nil { panic(err) } } return h } type HTTPHeader struct { Name string `yaml:"name" json:"name"` Value string `yaml:"value" json:"value"` } // Manage the state of an HTTP endpoint type HTTP struct { *Common `yaml:",inline" json:",inline"` stater machine.Stater `yaml:"-" json:"-"` client *http.Client `yaml:"-" json:"-"` Endpoint folio.URI `yaml:"endpoint" json:"endpoint"` Headers []HTTPHeader `yaml:"headers,omitempty" json:"headers,omitempty"` Content string `yaml:"body,omitempty" json:"body,omitempty"` ContentSourceRef folio.ResourceReference `json:"sourceref,omitempty" yaml:"sourceref,omitempty"` Status string `yaml:"status,omitempty" json:"status,omitempty"` StatusCode int `yaml:"statuscode,omitempty" json:"statuscode,omitempty"` Sha256 string `yaml:"sha256,omitempty" json:"sha256,omitempty"` SerializeContent bool `json:"serializecontent,omitempty" yaml:"serializecontent,omitempty"` LastModified time.Time `json:"lastmodified,omitempty" yaml:"lastmodified,omitempty"` Size int64 `yaml:"size,omitempty" json:"size,omitempty"` SignatureValue string `yaml:"signature,omitempty" json:"signature,omitempty"` Resources data.ResourceMapper `yaml:"-" json:"-"` reader *transport.Reader `yaml:"-" json:"-"` writer *transport.ReadWriter `yaml:"-" json:"-"` } func NewHTTP() *HTTP { h := &HTTP{ client: &http.Client{} } h.Common = NewCommon(HTTPTypeName, true) h.Common.SchemeCheck = h.SchemeCheck h.Common.NormalizePath = h.NormalizePath slog.Info("NewHTTP()", "http", h) return h } func (h *HTTP) SchemeCheck(scheme string) bool { switch scheme { case "http", "https": return true } return false } func (h *HTTP) Init(u data.URIParser) (err error) { if u == nil { u = folio.URI(h.URI()).Parse() } if err = h.SetParsedURI(u); err == nil { err = h.Open() } return } func (h *HTTP) NormalizePath() error { return nil } func (h *HTTP) SetResourceMapper(resources data.ResourceMapper) { h.Resources = resources } func (h *HTTP) Open() (err error) { u := h.Common.parsedURI if u == nil { err = fmt.Errorf("HTTP parsed URI is not set: %s", h.Endpoint) } /* else { err = h.OpenGetter() } */ return } func (h *HTTP) OpenGetter() (err error) { h.reader, err = transport.NewReader(h.Common.parsedURI) return } func (h *HTTP) OpenPoster() (err error) { h.writer, err = transport.NewReadWriter(h.Common.parsedURI) return } func (h *HTTP) Clone() data.Resource { return &HTTP { Common: h.Common.Clone(), client: h.client, Endpoint: h.Endpoint, Headers: h.Headers, Content: h.Content, reader: h.reader, writer: h.writer, } } func (h *HTTP) StateMachine() machine.Stater { if h.stater == nil { h.stater = StorageMachine(h) } return h.stater } func (h *HTTP) Notify(m *machine.EventMessage) { ctx := context.Background() slog.Info("Notify()", "http", h, "m", m) switch m.On { case machine.ENTERSTATEEVENT: switch m.Dest { case "start_stat": if statErr := h.ReadStat(); statErr == nil { if triggerErr := h.StateMachine().Trigger("exists"); triggerErr == nil { return } } else { if triggerErr := h.StateMachine().Trigger("notexists"); triggerErr == nil { return } } case "start_read": if _,readErr := h.Read(ctx); readErr == nil { if triggerErr := h.StateMachine().Trigger("state_read"); triggerErr == nil { return } else { h.Common.State = "absent" panic(triggerErr) } } else { h.Common.State = "absent" panic(readErr) } case "start_create": if e := h.Create(ctx); e == nil { if triggerErr := h.stater.Trigger("created"); triggerErr == nil { return } } h.Common.State = "absent" case "start_delete": if deleteErr := h.Delete(ctx); deleteErr == nil { if triggerErr := h.StateMachine().Trigger("deleted"); triggerErr == nil { return } else { h.Common.State = "present" panic(triggerErr) } } else { h.Common.State = "present" panic(deleteErr) } case "absent": h.Common.State = "absent" case "present", "created", "read": h.Common.State = "present" } case machine.EXITSTATEEVENT: } } func (h *HTTP) URI() string { return h.Endpoint.String() } func (h *HTTP) SetParsedURI(u data.URIParser) (err error) { if err = h.Common.SetParsedURI(u); err == nil { h.Endpoint = h.Common.URI() } return } func (h *HTTP) SetFileInfo(info fs.FileInfo) error { if info != nil { h.LastModified = info.ModTime() h.Size = info.Size() contentType := info.Sys().(*transport.HTTPFileInfo).ContentType if contentType != "" { contentTypeHeader := &HTTPHeader{ Name: "Content-Type", Value: contentType } for _, h := range h.Headers { if h.Name == "Content-Type" { h.Value = contentType contentTypeHeader = nil } } if contentTypeHeader != nil { h.Headers = append(h.Headers, *contentTypeHeader) } } return nil } return ErrInvalidFileInfo } func (h *HTTP) ContentSourceRefStat() (info fs.FileInfo) { if len(h.ContentSourceRef) > 0 { rs, _ := h.ContentReaderStream() info, _ = rs.Stat() rs.Close() } return } func (h *HTTP) ReadStat() (err error) { if h.reader == nil { if err = h.OpenGetter(); err != nil { return } } var info fs.FileInfo info, err = h.reader.Stat() if err == nil { _ = h.SetFileInfo(info) } else { if refStat := h.ContentSourceRefStat(); refStat != nil { _ = h.SetFileInfo(refStat) err = nil } } if err != nil { h.State = "absent" return } return } func (h *HTTP) ContentReaderStream() (*transport.Reader, error) { if len(h.Content) == 0 && len(h.ContentSourceRef) != 0 { return h.ContentSourceRef.Lookup(nil).ContentReaderStream() } return nil, fmt.Errorf("Cannot provide transport reader for string content") } func (h *HTTP) ContentWriterStream() (*transport.Writer, error) { if len(h.Content) == 0 && len(h.ContentSourceRef) != 0 { return h.ContentSourceRef.Lookup(nil).ContentWriterStream() } return nil, fmt.Errorf("Cannot provide transport writer for string content") } func (h *HTTP) JSON() ([]byte, error) { return json.Marshal(h) } func (h *HTTP) Validate() error { s := NewSchema(h.Type()) jsonDoc, jsonErr := h.JSON() if jsonErr == nil { return s.Validate(string(jsonDoc)) } return jsonErr } func (h *HTTP) Apply() error { switch h.Common.State { case "absent": case "present": } _,e := h.Read(context.Background()) if e == nil { h.Common.State = "present" } return e } func (h *HTTP) Load(docData []byte, f codec.Format) (err error) { if err = f.StringDecoder(string(docData)).Decode(h); err == nil { err = h.Common.SetParsedURI(folio.URI(h.Endpoint).Parse()) } return } func (h *HTTP) LoadReader(r io.ReadCloser, f codec.Format) (err error) { if err = f.Decoder(r).Decode(h); err == nil { err = h.Common.SetParsedURI(folio.URI(h.Endpoint).Parse()) //err = h.setParsedURI(h.Endpoint) } return } func (h *HTTP) LoadString(docData string, f codec.Format) (err error) { if err = f.StringDecoder(docData).Decode(h); err == nil { err = h.Common.SetParsedURI(folio.URI(h.Endpoint).Parse()) //err = h.setParsedURI(h.Endpoint) } return } func (h *HTTP) LoadDecl(yamlResourceDeclaration string) error { return h.LoadString(yamlResourceDeclaration, codec.FormatYaml) } func (h *HTTP) ResolveId(ctx context.Context) string { _ = h.Common.SetParsedURI(folio.URI(h.Endpoint).Parse()) slog.Info("HTTP.ResolveId()", "uri", h.Endpoint.String()) return h.Endpoint.String() } func (h *HTTP) Signature() folio.Signature { var s folio.Signature if e := (&s).SetHexString(h.SignatureValue); e != nil { panic(e) } return s } func (h *HTTP) Hash() (shaBytes []byte) { shaBytes, _ = hex.DecodeString(h.Sha256) return } func (h *HTTP) HashHexString() string { return h.Sha256 } func (h *HTTP) Create(ctx context.Context) (err error) { slog.Info("HTTP.Create()", "http", h) if err = h.OpenPoster(); err != nil { return } var contentReader io.ReadCloser h.writer, err = transport.NewReadWriterWithContext(h.Common.parsedURI, ctx) if err != nil { slog.Error("HTTP.Create()", "http", h, "error", err) //panic(err) return } slog.Info("HTTP.Create() content", "http", h) // create should post to the named resource using the resource content or sourceref. contentReader, err = h.contentSourceReader() if err != nil { slog.Error("HTTP.Create()", "error", err) return } contentReader = h.UpdateContentAttributesFromReader(contentReader) srcRead := ext.NewReadCloser(contentReader) if _, ok := srcRead.(io.WriterTo); ok { panic("reader has writerto interface") } if _, ok := any(h.writer).(io.ReaderFrom); !ok { panic("writer is missing io.ReaderFrom") } if err != nil { return } defer func () { h.writer.Close() contentReader.Close() }() err = h.AddAuthorizationTokenFromConfigToTransport() if err != nil { return } if len(h.Headers) > 0 { for _, header := range h.Headers { h.writer.AddHeader(header.Name, header.Value) } } slog.Error("HTTP.Create()", "http", h, "reader", h.reader) copyBuffer := make([]byte, 32 * 1024) _, writeErr := io.CopyBuffer(h.writer, srcRead, copyBuffer) if writeErr != nil { return fmt.Errorf("Http.Create(): CopyBuffer failed %v %v: %w", h.writer, contentReader, writeErr) } h.Status = h.writer.Status() h.StatusCode = h.writer.StatusCode() return } func (h *HTTP) Update(ctx context.Context) error { return h.Create(ctx) } func (h *HTTP) ReadAuthorizationTokenFromConfig(req *http.Request) error { if h.Common.config != nil { token, tokenErr := h.Common.config.GetValue("authorization_token") if tokenErr == nil { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token)) } slog.Info("ReadAuthorizationTokenFromConfig()", "error", tokenErr) return tokenErr } return nil } func (h *HTTP) AddAuthorizationTokenFromConfigToTransport() (err error) { if h.Common.config != nil { token, tokenErr := h.Common.config.GetValue("authorization_token") slog.Info("HTTP.AddAuthorizationTokenFromConfigToTransport()", "error", tokenErr) if tokenErr == nil { if h.reader != nil { h.reader.AddHeader("Authorization", fmt.Sprintf("Bearer %s", token)) } if h.writer != nil { h.writer.AddHeader("Authorization", fmt.Sprintf("Bearer %s", token)) } } else { err = tokenErr } } return } func (h *HTTP) UpdateContentAttributes() { h.Size = int64(len(h.Content)) h.Sha256 = fmt.Sprintf("%x", sha256.Sum256([]byte(h.Content))) } func (h *HTTP) UpdateContentAttributesFromReader(reader io.ReadCloser) io.ReadCloser { var content strings.Builder hash := sha256.New() h.Size = 0 h.Content = "" h.Sha256 = "" return iofilter.NewReader(reader, func(p []byte, readn int, readerr error) (n int, err error) { hash.Write(p[:readn]) h.Sha256 = fmt.Sprintf("%x", hash.Sum(nil)) h.Size += int64(readn) if len(h.ContentSourceRef) == 0 || h.SerializeContent { content.Write(p[:readn]) h.Content = content.String() } return readn, readerr }) } func (h *HTTP) SetContent(r io.Reader) error { fileContent, ioErr := io.ReadAll(r) h.Content = string(fileContent) h.UpdateContentAttributes() return ioErr } func (h *HTTP) GetContent(w io.Writer) (contentReader io.ReadCloser, err error) { slog.Info("Http.GetContent()", "content", len(h.Content), "sourceref", h.ContentSourceRef) contentReader, err = h.readThru(context.Background()) if w != nil { copyBuffer := make([]byte, 32 * 1024) _, writeErr := io.CopyBuffer(w, contentReader, copyBuffer) if writeErr != nil { return nil, fmt.Errorf("Http.GetContent(): CopyBuffer failed %v %v: %w", w, contentReader, writeErr) } return nil, nil } if v, ok := contentReader.(*transport.Reader); ok { h.SignatureValue = v.Signature() } return } /* func (h *HTTP) writeThru(ctx context.Context) (contentWriter io.WriteCloser, err error) { if len(h.ContentSourceRef) != 0 { contentReader, err = h.ContentSourceRef.Lookup(nil).ContentReaderStream() contentReader.(*transport.Reader).SetGzip(false) } else { if len(h.Content) != 0 { contentReader = io.NopCloser(strings.NewReader(h.Content)) } else { //contentReader, err = os.Open(f.Path) contentReader = transport.NewReaderWithContext(u, ctx) contentReader.(*transport.Reader).SetGzip(false) } } contentReader = h.UpdateContentAttributesFromReader(contentReader) return } */ func (h *HTTP) hasContentSource() bool { return len(h.ContentSourceRef) > 0 || len(h.Content) > 0 } func (h *HTTP) contentSourceReader() (contentReader io.ReadCloser, err error) { if len(h.ContentSourceRef) != 0 { contentReader, err = h.ContentSourceRef.Lookup(nil).ContentReaderStream() contentReader.(*transport.Reader).SetGzip(false) } else { if len(h.Content) != 0 { contentReader = io.NopCloser(ext.NewStringReader(h.Content)) } else { err = ErrUndefinedContentSource } } return } // set up reader for source content func (h *HTTP) readThru(ctx context.Context) (contentReader io.ReadCloser, err error) { if h.hasContentSource() { contentReader, err = h.contentSourceReader() } else { if h.reader == nil { h.reader, err = transport.NewReaderWithContext(h.Common.parsedURI, ctx) h.reader.SetGzip(false) } contentReader = h.reader } contentReader = h.UpdateContentAttributesFromReader(contentReader) return } func (h *HTTP) Read(ctx context.Context) (yamlData []byte, err error) { var contentReader io.ReadCloser contentReader, err = h.readThru(ctx) if err != nil { return } defer contentReader.Close() err = h.AddAuthorizationTokenFromConfigToTransport() if err != nil { return } if len(h.Headers) > 0 { for _, header := range h.Headers { h.reader.AddHeader(header.Name, header.Value) } } slog.Info("HTTP.Read()", "reader", h.reader, "http", h) if err = h.SetContent(contentReader); err != nil { return } if h.writer == nil { h.Status = h.reader.Status() h.StatusCode = h.reader.StatusCode() } return yaml.Marshal(h) } func (h *HTTP) Delete(ctx context.Context) error { return nil } func (h *HTTP) Type() string { return "http" }