jx/internal/fan/http.go
Matthew Rich c34a76981e
Some checks are pending
Lint / golangci-lint (push) Waiting to run
Declarative Tests / test (push) Waiting to run
Declarative Tests / build-fedora (push) Waiting to run
Declarative Tests / build-ubuntu-focal (push) Waiting to run
move source/target converters to fan pkg
2024-09-19 08:03:23 +00:00

198 lines
4.6 KiB
Go

// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package fan
import (
_ "context"
_ "encoding/json"
"fmt"
_ "gopkg.in/yaml.v3"
"net/url"
_ "net/http"
_ "path/filepath"
_ "decl/internal/resource"
"decl/internal/codec"
"decl/internal/data"
"decl/internal/folio"
_ "os"
"io"
"errors"
"log/slog"
)
type HTTP struct {
Endpoint folio.URI `yaml:"endpoint,omitempty" json:"endpoint,omitempty"`
url *url.URL `yaml:"-" json:"-"`
Format codec.Format `yaml:"format,omitempty" json:"format,omitempty"`
reader io.ReadCloser `yaml:"-" json:"-"`
writer io.WriteCloser `yaml:"-" json:"-"`
decoder codec.Decoder `yaml:"-" json:"-"`
encoder codec.Encoder `yaml:"-" json:"-"`
closer func() error `yaml:"-" json:"-"`
index int `yaml:"-" json:"-"`
signature data.Signature `yaml:"-" json:"-"`
}
func NewHTTP() *HTTP {
return &HTTP{ Format: codec.FormatYaml, index: 0, closer: func() error { return nil } }
}
func init() {
folio.DocumentRegistry.ConverterTypes.Register([]string{"http","https"}, func(u *url.URL) data.Converter {
t := NewHTTP()
t.Endpoint = folio.URI(u.String())
t.url = u
return t
})
}
func (h *HTTP) Type() data.TypeName { return "http" }
/*
func (h *HTTP) setencoder(target data.ContentIdentifier) {
if formatErr := h.Format.Set(target.ContentType()); formatErr != nil {
h.Format = codec.FormatYaml
if format,ok := h.url.Query()["format"]; ok {
if queryFormatErr := h.Format.Set(format[0]); queryFormatErr != nil {
h.Format = codec.FormatYaml
}
}
}
if h.encoder == nil {
h.encoder = codec.NewEncoder(h.writer, h.Format)
}
}
*/
func (h *HTTP) setdecoder(source data.ContentIdentifier) {
if h.decoder == nil {
_ = h.Format.Set(source.ContentType())
h.decoder = codec.NewDecoder(h.reader, h.Format)
}
}
func (h *HTTP) Extract(sourceResource data.Resource, filter data.ElementSelector) (document data.Document, err error) {
if h.index == 0 {
if sourceResource == nil {
if len(h.Endpoint) > 0 {
sourceResource, err = h.Endpoint.NewResource(nil)
} else {
return nil, ErrInvalidSource
}
}
slog.Info("HTTP.Extract()", "source", sourceResource, "error", err)
var jxSourceFile data.FileResource = sourceResource.(data.FileResource)
h.reader, err = jxSourceFile.(data.ContentGetter).GetContent(nil)
slog.Info("HTTP.Extract()", "file", h, "error", err)
if err != nil {
return
}
h.signature = sourceResource.(data.Signed).Signature()
h.setdecoder(jxSourceFile.(data.ContentIdentifier))
slog.Info("HTTP.Extract()", "jx", h)
}
u := fmt.Sprintf("%s?index=%d", sourceResource.URI(), h.index)
document = folio.DocumentRegistry.NewDocument(folio.URI(u))
err = h.decoder.Decode(document)
slog.Info("HTTP.Extract()", "doc", document, "http", h, "error", err)
h.index++
if err != nil {
return
}
if err = document.Validate(); err != nil {
return
}
if h.signature.String() != "" {
if v, ok := sourceResource.(data.ContentHasher); ok {
err = h.signature.Verify(v)
}
}
return
/*
defer h.Close()
documentSignature := h.transport.Signature()
hash := sha256.New()
sumReadData := iofilter.NewReader(h.transport, func(p []byte, readn int, readerr error) (n int, err error) {
hash.Write(p)
return
})
decoder := codec.NewYAMLDecoder(sumReadData)
index := 0
for {
doc = folio.DocumentRegistry.NewDocument(folio.URI(u))
doc := resource.NewDocument()
e := decoder.Decode(doc)
if errors.Is(e, io.EOF) {
break
}
if e != nil {
return documents, e
}
if validationErr := doc.Validate(); validationErr != nil {
return documents, validationErr
}
documents = append(documents, doc)
index++
}
if documentSignature != "" {
sig := &signature.Ident{}
sigErr := sig.VerifySum(hash.Sum(nil), []byte(documentSignature))
if sigErr != nil {
return documents, sigErr
}
}
*/
}
func (h *HTTP) ExtractMany(resourceSource data.Resource, filter data.ElementSelector) (documents []data.Document, err error) {
documents = make([]data.Document, 0, 100)
defer h.Close()
h.index = 0
for {
var doc data.Document
if doc, err = h.Extract(resourceSource, filter); err == nil {
documents = append(documents, doc)
} else {
if errors.Is(err, io.EOF) {
err = nil
//documents = append(documents, doc)
}
break
}
}
slog.Info("HTTP.ExtractMany()", "file", h, "error", err)
return
}
func (h *HTTP) Emit(document data.Document, filter data.ElementSelector) (resource data.Resource, err error) {
return nil, nil
}
func (h *HTTP) Close() (err error) {
/*
if h.decoder != nil {
h.decoder.Close()
}
*/
if h.encoder != nil {
h.encoder.Close()
}
if h.reader != nil {
h.reader.Close()
}
if h.writer != nil {
h.writer.Close()
}
return
}