jx/internal/resource/http.go

536 lines
13 KiB
Go
Raw Normal View History

// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package resource
import (
"context"
"errors"
"fmt"
"gopkg.in/yaml.v3"
2024-04-05 17:22:17 +00:00
"io"
"net/url"
2024-04-05 17:22:17 +00:00
"net/http"
_ "os"
2024-04-10 19:38:12 +00:00
"encoding/json"
"strings"
"log/slog"
2024-05-06 00:48:54 +00:00
"gitea.rosskeen.house/rosskeen.house/machine"
"decl/internal/codec"
"decl/internal/data"
"decl/internal/transport"
"decl/internal/folio"
"decl/internal/iofilter"
"crypto/sha256"
"encoding/hex"
)
const (
HTTPTypeName TypeName = "http"
)
var (
ErrUndefinedContentSource error = errors.New("Cannot create reader: no content defined")
)
func init() {
ResourceTypes.Register([]string{"http", "https"}, HTTPFactory)
}
func HTTPFactory(u *url.URL) data.Resource {
var err error
h := NewHTTP()
(&h.Endpoint).SetURL(u)
h.parsedURI = u
if h.reader, err = transport.NewReader(u); err != nil {
panic(err)
}
if h.writer, err = transport.NewWriter(u); err != nil {
panic(err)
}
return h
}
2024-04-10 19:38:12 +00:00
type HTTPHeader struct {
Name string `yaml:"name" json:"name"`
2024-04-10 20:27:34 +00:00
Value string `yaml:"value" json:"value"`
2024-04-10 19:38:12 +00:00
}
// Manage the state of an HTTP endpoint
type HTTP struct {
*Common `yaml:",inline" json:",inline"`
parsedURI *url.URL `yaml:"-" json:"-"`
2024-05-09 07:39:45 +00:00
stater machine.Stater `yaml:"-" json:"-"`
2024-04-10 19:38:12 +00:00
client *http.Client `yaml:"-" json:"-"`
Endpoint folio.URI `yaml:"endpoint" json:"endpoint"`
2024-04-10 20:27:34 +00:00
Headers []HTTPHeader `yaml:"headers,omitempty" json:"headers,omitempty"`
Content string `yaml:"body,omitempty" json:"body,omitempty"`
ContentSourceRef folio.ResourceReference `json:"sourceref,omitempty" yaml:"sourceref,omitempty"`
Status string `yaml:"status,omitempty" json:"status,omitempty"`
StatusCode int `yaml:"statuscode,omitempty" json:"statuscode,omitempty"`
Sha256 string `yaml:"sha256,omitempty" json:"sha256,omitempty"`
SerializeContent bool `json:"serializecontent,omitempty" yaml:"serializecontent,omitempty"`
Size int64 `yaml:"size,omitempty" json:"size,omitempty"`
SignatureValue string `yaml:"signature,omitempty" json:"signature,omitempty"`
config data.ConfigurationValueGetter
Resources data.ResourceMapper `yaml:"-" json:"-"`
reader *transport.Reader `yaml:"-" json:"-"`
writer *transport.Writer `yaml:"-" json:"-"`
}
func NewHTTP() *HTTP {
2024-04-10 19:38:12 +00:00
return &HTTP{ client: &http.Client{} }
}
func (h *HTTP) SetResourceMapper(resources data.ResourceMapper) {
2024-07-17 08:34:57 +00:00
h.Resources = resources
}
func (h *HTTP) Clone() data.Resource {
2024-04-19 07:52:10 +00:00
return &HTTP {
Common: &Common{ includeQueryParamsInURI: true, resourceType: HTTPTypeName },
2024-04-19 07:52:10 +00:00
client: h.client,
Endpoint: h.Endpoint,
Headers: h.Headers,
Content: h.Content,
reader: h.reader,
writer: h.writer,
2024-04-19 07:52:10 +00:00
}
}
2024-05-06 00:48:54 +00:00
func (h *HTTP) StateMachine() machine.Stater {
2024-05-09 07:39:45 +00:00
if h.stater == nil {
h.stater = StorageMachine(h)
}
return h.stater
}
func (h *HTTP) Notify(m *machine.EventMessage) {
ctx := context.Background()
slog.Info("Notify()", "http", h, "m", m)
2024-05-09 07:39:45 +00:00
switch m.On {
case machine.ENTERSTATEEVENT:
switch m.Dest {
case "start_read":
if _,readErr := h.Read(ctx); readErr == nil {
if triggerErr := h.StateMachine().Trigger("state_read"); triggerErr == nil {
return
} else {
h.Common.State = "absent"
panic(triggerErr)
}
} else {
h.Common.State = "absent"
panic(readErr)
}
2024-05-09 07:39:45 +00:00
case "start_create":
2024-05-13 17:13:20 +00:00
if e := h.Create(ctx); e == nil {
if triggerErr := h.stater.Trigger("created"); triggerErr == nil {
return
2024-05-13 05:41:12 +00:00
}
2024-05-09 07:39:45 +00:00
}
h.Common.State = "absent"
case "start_delete":
if deleteErr := h.Delete(ctx); deleteErr == nil {
if triggerErr := h.StateMachine().Trigger("deleted"); triggerErr == nil {
return
} else {
h.Common.State = "present"
panic(triggerErr)
}
} else {
h.Common.State = "present"
panic(deleteErr)
}
case "absent":
h.Common.State = "absent"
case "present", "created", "read":
h.Common.State = "present"
2024-05-09 07:39:45 +00:00
}
case machine.EXITSTATEEVENT:
}
2024-05-06 00:48:54 +00:00
}
func (h *HTTP) URI() string {
return string(h.Endpoint)
}
func (h *HTTP) setParsedURI(uri folio.URI) error {
if parsed := uri.Parse(); parsed == nil {
return folio.ErrInvalidURI
} else {
h.parsedURI = parsed
}
return nil
}
func (h *HTTP) SetURI(uri string) (err error) {
v := folio.URI(uri)
if err = h.setParsedURI(v); err == nil {
h.Endpoint = v
}
return
}
func (h *HTTP) UseConfig(config data.ConfigurationValueGetter) {
h.config = config
}
2024-04-10 19:38:12 +00:00
func (h *HTTP) JSON() ([]byte, error) {
return json.Marshal(h)
}
2024-04-09 19:30:05 +00:00
func (h *HTTP) Validate() error {
2024-04-10 19:38:12 +00:00
s := NewSchema(h.Type())
jsonDoc, jsonErr := h.JSON()
if jsonErr == nil {
return s.Validate(string(jsonDoc))
}
return jsonErr
2024-04-09 19:30:05 +00:00
}
func (h *HTTP) Apply() error {
switch h.Common.State {
case "absent":
case "present":
}
2024-04-10 19:38:12 +00:00
_,e := h.Read(context.Background())
if e == nil {
h.Common.State = "present"
2024-04-10 19:38:12 +00:00
}
return e
}
func (h *HTTP) Load(docData []byte, f codec.Format) (err error) {
if err = f.StringDecoder(string(docData)).Decode(h); err == nil {
err = h.setParsedURI(h.Endpoint)
}
return
}
func (h *HTTP) LoadReader(r io.ReadCloser, f codec.Format) (err error) {
if err = f.Decoder(r).Decode(h); err == nil {
err = h.setParsedURI(h.Endpoint)
}
return
}
func (h *HTTP) LoadString(docData string, f codec.Format) (err error) {
if err = f.StringDecoder(docData).Decode(h); err == nil {
err = h.setParsedURI(h.Endpoint)
}
return
2024-04-05 17:22:17 +00:00
}
func (h *HTTP) LoadDecl(yamlResourceDeclaration string) error {
return h.LoadString(yamlResourceDeclaration, codec.FormatYaml)
}
func (h *HTTP) ResolveId(ctx context.Context) string {
return h.Endpoint.String()
}
func (h *HTTP) Signature() folio.Signature {
var s folio.Signature
if e := (&s).SetHexString(h.SignatureValue); e != nil {
panic(e)
}
return s
}
func (h *HTTP) Hash() (shaBytes []byte) {
shaBytes, _ = hex.DecodeString(h.Sha256)
return
}
func (h *HTTP) HashHexString() string {
return h.Sha256
}
func (h *HTTP) Create(ctx context.Context) (err error) {
slog.Error("HTTP.Create()", "http", h)
var contentReader io.ReadCloser
h.writer, err = transport.NewWriterWithContext(h.parsedURI, ctx)
if err != nil {
slog.Error("HTTP.Create()", "http", h, "error", err)
//panic(err)
return
}
slog.Error("HTTP.Create() content", "http", h)
// create should post to the named resource using the resource content or sourceref.
contentReader, err = h.contentSourceReader()
if err != nil {
return
}
contentReader = h.UpdateContentAttributesFromReader(contentReader)
if err != nil {
return
}
defer func () {
h.writer.Close()
contentReader.Close()
}()
err = h.AddAuthorizationTokenFromConfigToTransport()
if err != nil {
return
}
if len(h.Headers) > 0 {
for _, header := range h.Headers {
h.writer.AddHeader(header.Name, header.Value)
}
}
slog.Error("HTTP.Create()", "http", h)
copyBuffer := make([]byte, 32 * 1024)
_, writeErr := io.CopyBuffer(h.writer, contentReader, copyBuffer)
if writeErr != nil {
return fmt.Errorf("Http.Create(): CopyBuffer failed %v %v: %w", h.writer, contentReader, writeErr)
}
h.Status = h.writer.Status()
h.StatusCode = h.writer.StatusCode()
return
/*
body := strings.NewReader(h.Content)
2024-04-10 19:38:12 +00:00
req, reqErr := http.NewRequest("POST", h.Endpoint, body)
if reqErr != nil {
return reqErr
}
if tokenErr := h.ReadAuthorizationTokenFromConfig(req); tokenErr != nil {
slog.Error("ReadAuthorizationTokenFromConfig()", "error", tokenErr)
}
2024-04-10 19:38:12 +00:00
for _,header := range h.Headers {
req.Header.Add(header.Name, header.Value)
}
2024-04-10 19:38:12 +00:00
resp, err := h.client.Do(req)
h.Status = resp.Status
h.StatusCode = resp.StatusCode
2024-04-10 19:38:12 +00:00
if err != nil {
return err
}
2024-04-10 20:27:34 +00:00
defer resp.Body.Close()
2024-04-10 19:38:12 +00:00
return err
*/
}
func (h *HTTP) Update(ctx context.Context) error {
return h.Create(ctx)
2024-04-10 19:38:12 +00:00
}
func (h *HTTP) ReadAuthorizationTokenFromConfig(req *http.Request) error {
if h.config != nil {
token, tokenErr := h.config.GetValue("authorization_token")
if tokenErr == nil {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
}
slog.Info("ReadAuthorizationTokenFromConfig()", "error", tokenErr)
return tokenErr
}
return nil
}
func (h *HTTP) AddAuthorizationTokenFromConfigToTransport() (err error) {
if h.config != nil {
token, tokenErr := h.config.GetValue("authorization_token")
if tokenErr == nil {
if h.reader != nil {
h.reader.AddHeader("Authorization", fmt.Sprintf("Bearer %s", token))
}
if h.writer != nil {
h.writer.AddHeader("Authorization", fmt.Sprintf("Bearer %s", token))
}
} else {
err = tokenErr
}
}
return
}
func (h *HTTP) UpdateContentAttributes() {
h.Size = int64(len(h.Content))
h.Sha256 = fmt.Sprintf("%x", sha256.Sum256([]byte(h.Content)))
}
func (h *HTTP) UpdateContentAttributesFromReader(reader io.ReadCloser) io.ReadCloser {
var content strings.Builder
hash := sha256.New()
h.Size = 0
h.Content = ""
h.Sha256 = ""
return iofilter.NewReader(reader, func(p []byte, readn int, readerr error) (n int, err error) {
hash.Write(p[:readn])
h.Sha256 = fmt.Sprintf("%x", hash.Sum(nil))
h.Size += int64(readn)
if len(h.ContentSourceRef) == 0 || h.SerializeContent {
content.Write(p[:readn])
h.Content = content.String()
}
return readn, readerr
})
}
func (h *HTTP) SetContent(r io.Reader) error {
fileContent, ioErr := io.ReadAll(r)
h.Content = string(fileContent)
h.UpdateContentAttributes()
return ioErr
}
func (h *HTTP) GetContent(w io.Writer) (contentReader io.ReadCloser, err error) {
slog.Info("Http.GetContent()", "content", len(h.Content), "sourceref", h.ContentSourceRef)
contentReader, err = h.readThru(context.Background())
if w != nil {
copyBuffer := make([]byte, 32 * 1024)
_, writeErr := io.CopyBuffer(w, contentReader, copyBuffer)
if writeErr != nil {
return nil, fmt.Errorf("Http.GetContent(): CopyBuffer failed %v %v: %w", w, contentReader, writeErr)
}
return nil, nil
}
if v, ok := contentReader.(*transport.Reader); ok {
h.SignatureValue = v.Signature()
}
return
}
/*
func (h *HTTP) writeThru(ctx context.Context) (contentWriter io.WriteCloser, err error) {
if len(h.ContentSourceRef) != 0 {
contentReader, err = h.ContentSourceRef.Lookup(nil).ContentReaderStream()
contentReader.(*transport.Reader).SetGzip(false)
} else {
if len(h.Content) != 0 {
contentReader = io.NopCloser(strings.NewReader(h.Content))
} else {
//contentReader, err = os.Open(f.Path)
contentReader = transport.NewReaderWithContext(u, ctx)
contentReader.(*transport.Reader).SetGzip(false)
}
}
contentReader = h.UpdateContentAttributesFromReader(contentReader)
return
}
*/
func (h *HTTP) hasContentSource() bool {
return len(h.ContentSourceRef) > 0 || len(h.Content) > 0
}
func (h *HTTP) contentSourceReader() (contentReader io.ReadCloser, err error) {
if len(h.ContentSourceRef) != 0 {
contentReader, err = h.ContentSourceRef.Lookup(nil).ContentReaderStream()
contentReader.(*transport.Reader).SetGzip(false)
} else {
if len(h.Content) != 0 {
contentReader = io.NopCloser(strings.NewReader(h.Content))
} else {
err = ErrUndefinedContentSource
}
}
return
}
// set up reader for source content
func (h *HTTP) readThru(ctx context.Context) (contentReader io.ReadCloser, err error) {
if h.hasContentSource() {
contentReader, err = h.contentSourceReader()
} else {
if h.reader == nil {
h.reader, err = transport.NewReaderWithContext(h.parsedURI, ctx)
h.reader.SetGzip(false)
}
contentReader = h.reader
}
contentReader = h.UpdateContentAttributesFromReader(contentReader)
return
}
func (h *HTTP) Read(ctx context.Context) (yamlData []byte, err error) {
var contentReader io.ReadCloser
contentReader, err = h.readThru(ctx)
if err != nil {
return
}
defer contentReader.Close()
err = h.AddAuthorizationTokenFromConfigToTransport()
if err != nil {
return
}
if len(h.Headers) > 0 {
for _, header := range h.Headers {
h.reader.AddHeader(header.Name, header.Value)
}
}
slog.Info("HTTP.Read()", "reader", h.reader)
/*
copyBuffer := make([]byte, 32 * 1024)
_, writeErr := io.CopyBuffer(w, h.reader, copyBuffer)
if writeErr != nil {
return nil, fmt.Errorf("Http.GetContent(): CopyBuffer failed %v %v: %w", w, contentReader, writeErr)
}
*/
if err = h.SetContent(contentReader); err != nil {
return
}
h.Status = h.reader.Status()
h.StatusCode = h.reader.StatusCode()
return yaml.Marshal(h)
/*
2024-04-10 19:38:12 +00:00
req, reqErr := http.NewRequestWithContext(ctx, "GET", h.Endpoint, nil)
if reqErr != nil {
return nil, reqErr
}
slog.Info("HTTP.Read() ", "request", req, "err", reqErr)
tokenErr := h.ReadAuthorizationTokenFromConfig(req)
if tokenErr != nil {
slog.Error("ReadAuthorizationTokenFromConfig()", "error", tokenErr)
}
2024-04-10 19:38:12 +00:00
if len(h.Headers) > 0 {
for _,header := range h.Headers {
req.Header.Add(header.Name, header.Value)
}
}
resp, err := h.client.Do(req)
slog.Info("Http.Read()", "response", resp, "error", err)
h.Status = resp.Status
h.StatusCode = resp.StatusCode
2024-04-05 17:22:17 +00:00
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, errReadBody := io.ReadAll(resp.Body)
if errReadBody != nil {
return nil, errReadBody
}
h.Body = string(body)
return yaml.Marshal(h)
*/
}
func (h *HTTP) Delete(ctx context.Context) error {
return nil
}
func (h *HTTP) Type() string {
2024-04-10 19:38:12 +00:00
return "http"
}