// Copyright 2024 Matthew Rich . All rights reserved. package resource import ( "context" _ "errors" "fmt" "gopkg.in/yaml.v3" "io" "net/url" "net/http" _ "os" "encoding/json" "strings" "log/slog" "gitea.rosskeen.house/rosskeen.house/machine" "decl/internal/codec" "decl/internal/data" "decl/internal/transport" "decl/internal/folio" "decl/internal/iofilter" "crypto/sha256" "encoding/hex" ) const ( HTTPTypeName TypeName = "http" ) func init() { ResourceTypes.Register([]string{"http", "https"}, HTTPFactory) } func HTTPFactory(u *url.URL) data.Resource { var err error h := NewHTTP() (&h.Endpoint).SetURL(u) h.parsedURI = u if h.reader, err = transport.NewReader(u); err != nil { panic(err) } if h.writer, err = transport.NewWriter(u); err != nil { panic(err) } return h } type HTTPHeader struct { Name string `yaml:"name" json:"name"` Value string `yaml:"value" json:"value"` } // Manage the state of an HTTP endpoint type HTTP struct { *Common `yaml:",inline" json:",inline"` parsedURI *url.URL `yaml:"-" json:"-"` stater machine.Stater `yaml:"-" json:"-"` client *http.Client `yaml:"-" json:"-"` Endpoint folio.URI `yaml:"endpoint" json:"endpoint"` Headers []HTTPHeader `yaml:"headers,omitempty" json:"headers,omitempty"` Content string `yaml:"body,omitempty" json:"body,omitempty"` ContentSourceRef folio.ResourceReference `json:"sourceref,omitempty" yaml:"sourceref,omitempty"` Status string `yaml:"status,omitempty" json:"status,omitempty"` StatusCode int `yaml:"statuscode,omitempty" json:"statuscode,omitempty"` Sha256 string `yaml:"sha256,omitempty" json:"sha256,omitempty"` SerializeContent bool `json:"serializecontent,omitempty" yaml:"serializecontent,omitempty"` Size int64 `yaml:"size,omitempty" json:"size,omitempty"` SignatureValue string `yaml:"signature,omitempty" json:"signature,omitempty"` config data.ConfigurationValueGetter Resources data.ResourceMapper `yaml:"-" json:"-"` reader *transport.Reader `yaml:"-" json:"-"` writer *transport.Writer `yaml:"-" json:"-"` } func NewHTTP() *HTTP { return &HTTP{ client: &http.Client{} } } func (h *HTTP) SetResourceMapper(resources data.ResourceMapper) { h.Resources = resources } func (h *HTTP) Clone() data.Resource { return &HTTP { Common: &Common{ includeQueryParamsInURI: true, resourceType: HTTPTypeName }, client: h.client, Endpoint: h.Endpoint, Headers: h.Headers, Content: h.Content, reader: h.reader, writer: h.writer, } } func (h *HTTP) StateMachine() machine.Stater { if h.stater == nil { h.stater = StorageMachine(h) } return h.stater } func (h *HTTP) Notify(m *machine.EventMessage) { ctx := context.Background() slog.Info("Notify()", "http", h, "m", m) switch m.On { case machine.ENTERSTATEEVENT: switch m.Dest { case "start_read": if _,readErr := h.Read(ctx); readErr == nil { if triggerErr := h.StateMachine().Trigger("state_read"); triggerErr == nil { return } else { h.Common.State = "absent" panic(triggerErr) } } else { h.Common.State = "absent" panic(readErr) } case "start_create": if e := h.Create(ctx); e == nil { if triggerErr := h.stater.Trigger("created"); triggerErr == nil { return } } h.Common.State = "absent" case "start_delete": if deleteErr := h.Delete(ctx); deleteErr == nil { if triggerErr := h.StateMachine().Trigger("deleted"); triggerErr == nil { return } else { h.Common.State = "present" panic(triggerErr) } } else { h.Common.State = "present" panic(deleteErr) } case "absent": h.Common.State = "absent" case "present", "created", "read": h.Common.State = "present" } case machine.EXITSTATEEVENT: } } func (h *HTTP) URI() string { return string(h.Endpoint) } func (h *HTTP) setParsedURI(uri folio.URI) error { if parsed := uri.Parse(); parsed == nil { return folio.ErrInvalidURI } else { h.parsedURI = parsed } return nil } func (h *HTTP) SetURI(uri string) (err error) { v := folio.URI(uri) if err = h.setParsedURI(v); err == nil { h.Endpoint = v } return } func (h *HTTP) UseConfig(config data.ConfigurationValueGetter) { h.config = config } func (h *HTTP) JSON() ([]byte, error) { return json.Marshal(h) } func (h *HTTP) Validate() error { s := NewSchema(h.Type()) jsonDoc, jsonErr := h.JSON() if jsonErr == nil { return s.Validate(string(jsonDoc)) } return jsonErr } func (h *HTTP) Apply() error { switch h.Common.State { case "absent": case "present": } _,e := h.Read(context.Background()) if e == nil { h.Common.State = "present" } return e } func (h *HTTP) Load(docData []byte, f codec.Format) (err error) { if err = f.StringDecoder(string(docData)).Decode(h); err == nil { err = h.setParsedURI(h.Endpoint) } return } func (h *HTTP) LoadReader(r io.ReadCloser, f codec.Format) (err error) { if err = f.Decoder(r).Decode(h); err == nil { err = h.setParsedURI(h.Endpoint) } return } func (h *HTTP) LoadString(docData string, f codec.Format) (err error) { if err = f.StringDecoder(docData).Decode(h); err == nil { err = h.setParsedURI(h.Endpoint) } return } func (h *HTTP) LoadDecl(yamlResourceDeclaration string) error { return h.LoadString(yamlResourceDeclaration, codec.FormatYaml) } func (h *HTTP) ResolveId(ctx context.Context) string { return h.Endpoint.String() } func (h *HTTP) Signature() folio.Signature { var s folio.Signature if e := (&s).SetHexString(h.SignatureValue); e != nil { panic(e) } return s } func (h *HTTP) Hash() (shaBytes []byte) { shaBytes, _ = hex.DecodeString(h.Sha256) return } func (h *HTTP) HashHexString() string { return h.Sha256 } func (h *HTTP) Create(ctx context.Context) (err error) { slog.Error("HTTP.Create()", "http", h) var contentReader io.ReadCloser h.writer, err = transport.NewWriterWithContext(h.parsedURI, ctx) if err != nil { slog.Error("HTTP.Create()", "http", h, "error", err) //panic(err) return } slog.Error("HTTP.Create() content", "http", h) contentReader, err = h.contentSourceReader() if err != nil { return } contentReader = h.UpdateContentAttributesFromReader(contentReader) if err != nil { return } defer func () { h.writer.Close() contentReader.Close() }() err = h.AddAuthorizationTokenFromConfigToTransport() if err != nil { return } if len(h.Headers) > 0 { for _, header := range h.Headers { h.writer.AddHeader(header.Name, header.Value) } } slog.Error("HTTP.Create()", "http", h) copyBuffer := make([]byte, 32 * 1024) _, writeErr := io.CopyBuffer(h.writer, contentReader, copyBuffer) if writeErr != nil { return fmt.Errorf("Http.Create(): CopyBuffer failed %v %v: %w", h.writer, contentReader, writeErr) } h.Status = h.writer.Status() h.StatusCode = h.writer.StatusCode() return /* body := strings.NewReader(h.Content) req, reqErr := http.NewRequest("POST", h.Endpoint, body) if reqErr != nil { return reqErr } if tokenErr := h.ReadAuthorizationTokenFromConfig(req); tokenErr != nil { slog.Error("ReadAuthorizationTokenFromConfig()", "error", tokenErr) } for _,header := range h.Headers { req.Header.Add(header.Name, header.Value) } resp, err := h.client.Do(req) h.Status = resp.Status h.StatusCode = resp.StatusCode if err != nil { return err } defer resp.Body.Close() return err */ } func (h *HTTP) Update(ctx context.Context) error { return h.Create(ctx) } func (h *HTTP) ReadAuthorizationTokenFromConfig(req *http.Request) error { if h.config != nil { token, tokenErr := h.config.GetValue("authorization_token") if tokenErr == nil { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token)) } slog.Info("ReadAuthorizationTokenFromConfig()", "error", tokenErr) return tokenErr } return nil } func (h *HTTP) AddAuthorizationTokenFromConfigToTransport() (err error) { if h.config != nil { token, tokenErr := h.config.GetValue("authorization_token") if tokenErr == nil { if h.reader != nil { h.reader.AddHeader("Authorization", fmt.Sprintf("Bearer %s", token)) } if h.writer != nil { h.writer.AddHeader("Authorization", fmt.Sprintf("Bearer %s", token)) } } else { err = tokenErr } } return } func (h *HTTP) UpdateContentAttributes() { h.Size = int64(len(h.Content)) h.Sha256 = fmt.Sprintf("%x", sha256.Sum256([]byte(h.Content))) } func (h *HTTP) UpdateContentAttributesFromReader(reader io.ReadCloser) io.ReadCloser { var content strings.Builder hash := sha256.New() h.Size = 0 h.Content = "" h.Sha256 = "" return iofilter.NewReader(reader, func(p []byte, readn int, readerr error) (n int, err error) { hash.Write(p[:readn]) h.Sha256 = fmt.Sprintf("%x", hash.Sum(nil)) h.Size += int64(readn) if len(h.ContentSourceRef) == 0 || h.SerializeContent { content.Write(p[:readn]) h.Content = content.String() } return readn, readerr }) } func (h *HTTP) SetContent(r io.Reader) error { fileContent, ioErr := io.ReadAll(r) h.Content = string(fileContent) h.UpdateContentAttributes() return ioErr } func (h *HTTP) GetContent(w io.Writer) (contentReader io.ReadCloser, err error) { slog.Info("Http.GetContent()", "content", len(h.Content), "sourceref", h.ContentSourceRef) contentReader, err = h.readThru(context.Background()) if w != nil { copyBuffer := make([]byte, 32 * 1024) _, writeErr := io.CopyBuffer(w, contentReader, copyBuffer) if writeErr != nil { return nil, fmt.Errorf("Http.GetContent(): CopyBuffer failed %v %v: %w", w, contentReader, writeErr) } return nil, nil } if v, ok := contentReader.(*transport.Reader); ok { h.SignatureValue = v.Signature() } return } /* func (h *HTTP) writeThru(ctx context.Context) (contentWriter io.WriteCloser, err error) { if len(h.ContentSourceRef) != 0 { contentReader, err = h.ContentSourceRef.Lookup(nil).ContentReaderStream() contentReader.(*transport.Reader).SetGzip(false) } else { if len(h.Content) != 0 { contentReader = io.NopCloser(strings.NewReader(h.Content)) } else { //contentReader, err = os.Open(f.Path) contentReader = transport.NewReaderWithContext(u, ctx) contentReader.(*transport.Reader).SetGzip(false) } } contentReader = h.UpdateContentAttributesFromReader(contentReader) return } */ func (h *HTTP) contentSourceReader() (contentReader io.ReadCloser, err error) { if len(h.ContentSourceRef) != 0 { contentReader, err = h.ContentSourceRef.Lookup(nil).ContentReaderStream() contentReader.(*transport.Reader).SetGzip(false) } else { if len(h.Content) != 0 { contentReader = io.NopCloser(strings.NewReader(h.Content)) } else { err = fmt.Errorf("Cannot create reader: no content defined") } } return } // set up reader for source content func (h *HTTP) readThru(ctx context.Context) (contentReader io.ReadCloser, err error) { if contentReader, err = h.contentSourceReader(); err != nil { if h.reader == nil { h.reader, err = transport.NewReaderWithContext(h.parsedURI, ctx) h.reader.SetGzip(false) } contentReader = h.reader } contentReader = h.UpdateContentAttributesFromReader(contentReader) return } func (h *HTTP) Read(ctx context.Context) (yamlData []byte, err error) { var contentReader io.ReadCloser contentReader, err = h.readThru(ctx) if err != nil { return } defer contentReader.Close() err = h.AddAuthorizationTokenFromConfigToTransport() if err != nil { return } if len(h.Headers) > 0 { for _, header := range h.Headers { h.reader.AddHeader(header.Name, header.Value) } } slog.Info("HTTP.Read()", "reader", h.reader) /* copyBuffer := make([]byte, 32 * 1024) _, writeErr := io.CopyBuffer(w, h.reader, copyBuffer) if writeErr != nil { return nil, fmt.Errorf("Http.GetContent(): CopyBuffer failed %v %v: %w", w, contentReader, writeErr) } */ if err = h.SetContent(contentReader); err != nil { return } h.Status = h.reader.Status() h.StatusCode = h.reader.StatusCode() return yaml.Marshal(h) /* req, reqErr := http.NewRequestWithContext(ctx, "GET", h.Endpoint, nil) if reqErr != nil { return nil, reqErr } slog.Info("HTTP.Read() ", "request", req, "err", reqErr) tokenErr := h.ReadAuthorizationTokenFromConfig(req) if tokenErr != nil { slog.Error("ReadAuthorizationTokenFromConfig()", "error", tokenErr) } if len(h.Headers) > 0 { for _,header := range h.Headers { req.Header.Add(header.Name, header.Value) } } resp, err := h.client.Do(req) slog.Info("Http.Read()", "response", resp, "error", err) h.Status = resp.Status h.StatusCode = resp.StatusCode if err != nil { return nil, err } defer resp.Body.Close() body, errReadBody := io.ReadAll(resp.Body) if errReadBody != nil { return nil, errReadBody } h.Body = string(body) return yaml.Marshal(h) */ } func (h *HTTP) Delete(ctx context.Context) error { return nil } func (h *HTTP) Type() string { return "http" }