separate file reader/writer; add support for http headers
Some checks are pending
Lint / golangci-lint (push) Waiting to run
Declarative Tests / test (push) Waiting to run
Declarative Tests / build-fedora (push) Waiting to run
Declarative Tests / build-ubuntu-focal (push) Waiting to run

This commit is contained in:
Matthew Rich 2024-09-19 08:02:31 +00:00
parent 4b38af88a4
commit ba9b37f512
5 changed files with 423 additions and 47 deletions

View File

@ -6,11 +6,13 @@ import (
_ "errors" _ "errors"
"path/filepath" "path/filepath"
"io" "io"
"io/fs"
"os" "os"
"net/url" "net/url"
"strings" "strings"
"fmt" "fmt"
"compress/gzip" "compress/gzip"
"log/slog"
) )
type File struct { type File struct {
@ -25,13 +27,93 @@ type File struct {
gzipReader io.ReadCloser gzipReader io.ReadCloser
} }
type FileReader struct {
*File
readHandle *os.File
gzipReader io.ReadCloser
}
type FileWriter struct {
*File
writeHandle *os.File
gzipWriter io.WriteCloser
}
func FilePath(u *url.URL) string { func FilePath(u *url.URL) string {
return filepath.Join(u.Hostname(), u.Path) return filepath.Join(u.Hostname(), u.Path)
} }
func FileExists(u *url.URL) bool { func FileExists(u *url.URL) bool {
_, err := os.Stat(FilePath(u)) _, err := os.Stat(FilePath(u))
return err == nil return !os.IsNotExist(err)
}
func NewFileReader(u *url.URL) (f *FileReader, err error) {
f = &FileReader {
File: &File {
uri: u,
path: FilePath(u),
},
}
f.extension()
f.DetectGzip()
exists := FileExists(u)
slog.Info("transport.NewFileReader()", "uri", u, "path", f.Path(), "file", f, "error", err, "exists", exists)
if f.Path() == "" || f.Path() == "-" {
f.readHandle = os.Stdin
} else {
if f.readHandle, err = os.Open(f.Path()); err == nil {
var fi fs.FileInfo
fi, err = f.readHandle.Stat()
if fi.IsDir() {
f.readHandle.Close()
f.readHandle = nil
err = fmt.Errorf("is a directory")
}
}
if err != nil {
slog.Info("transport.NewFileReader()", "file", f, "path", f.Path(), "error", err)
return
}
}
if f.Gzip() {
if exists {
if f.gzipReader, err = gzip.NewReader(f.readHandle); err != nil {
return
}
}
}
slog.Info("transport.NewFileReader() - created reader transport", "uri", u, "file", f, "error", err)
return
}
func NewFileWriter(u *url.URL) (f *FileWriter, err error) {
f = &FileWriter {
File: &File {
uri: u,
path: FilePath(u),
},
}
f.extension()
f.DetectGzip()
exists := FileExists(u)
slog.Info("transport.NewFileWriter()", "file", f, "error", err, "exists", exists)
if f.Path() == "" || f.Path() == "-" {
f.writeHandle = os.Stdout
} else {
if f.writeHandle, err = os.OpenFile(f.Path(), os.O_RDWR|os.O_CREATE, 0644); err != nil {
slog.Info("transport.NewFileWriter()", "file", f, "path", f.Path(), "error", err)
return
}
}
if f.Gzip() {
f.gzipWriter = gzip.NewWriter(f.writeHandle)
}
slog.Info("transport.NewFileWriter()", "file", f, "error", err)
return
} }
func NewFile(u *url.URL) (f *File, err error) { func NewFile(u *url.URL) (f *File, err error) {
@ -41,12 +123,16 @@ func NewFile(u *url.URL) (f *File, err error) {
} }
f.extension() f.extension()
f.DetectGzip() f.DetectGzip()
exists := FileExists(u)
slog.Info("transport.NewFile()", "file", f, "error", err, "exists", exists)
if f.path == "" || f.path == "-" { if f.path == "" || f.path == "-" {
f.readHandle = os.Stdin f.readHandle = os.Stdin
f.writeHandle = os.Stdout f.writeHandle = os.Stdout
} else { } else {
if f.readHandle, err = os.OpenFile(f.Path(), os.O_RDWR|os.O_CREATE, 0644); err != nil { if f.readHandle, err = os.OpenFile(f.Path(), os.O_RDWR|os.O_CREATE, 0644); err != nil {
slog.Info("transport.NewFile()", "file", f, "path", f.Path(), "error", err)
return return
} }
f.writeHandle = f.readHandle f.writeHandle = f.readHandle
@ -54,10 +140,13 @@ func NewFile(u *url.URL) (f *File, err error) {
if f.Gzip() { if f.Gzip() {
f.gzipWriter = gzip.NewWriter(f.writeHandle) f.gzipWriter = gzip.NewWriter(f.writeHandle)
if exists {
if f.gzipReader, err = gzip.NewReader(f.readHandle); err != nil { if f.gzipReader, err = gzip.NewReader(f.readHandle); err != nil {
return return
} }
} }
}
slog.Info("transport.NewFile()", "file", f, "error", err)
return return
} }
@ -68,10 +157,11 @@ func (f *File) extension() {
if numberOfElements > 2 { if numberOfElements > 2 {
f.exttype = elements[numberOfElements - 2] f.exttype = elements[numberOfElements - 2]
f.fileext = elements[numberOfElements - 1] f.fileext = elements[numberOfElements - 1]
} } else {
f.exttype = elements[numberOfElements - 1] f.exttype = elements[numberOfElements - 1]
} }
} }
}
func (f *File) DetectGzip() { func (f *File) DetectGzip() {
f.gzip = (f.uri.Query().Get("gzip") == "true" || f.fileext == "gz") f.gzip = (f.uri.Query().Get("gzip") == "true" || f.fileext == "gz")
@ -101,11 +191,26 @@ func (f *File) Signature() (documentSignature string) {
} }
func (f *File) ContentType() string { func (f *File) ContentType() string {
var ext strings.Builder
if f.uri.Scheme != "file" { if f.uri.Scheme != "file" {
return f.uri.Scheme return f.uri.Scheme
} }
if f.fileext == "" {
return f.exttype return f.exttype
} }
ext.WriteString(f.exttype)
ext.WriteRune('.')
ext.WriteString(f.fileext)
return ext.String()
}
func (f *File) Stat() (fs.FileInfo, error) {
return f.FileInfo()
}
func (f *File) FileInfo() (info fs.FileInfo, err error) {
return os.Lstat(f.Path())
}
func (f *File) SetGzip(gzip bool) { func (f *File) SetGzip(gzip bool) {
f.gzip = gzip f.gzip = gzip
@ -115,16 +220,25 @@ func (f *File) Gzip() bool {
return f.gzip return f.gzip
} }
func (f *File) Reader() io.ReadCloser { func (f *FileReader) Reader() io.ReadCloser {
if f.Gzip() { if f.Gzip() {
var err error
if f.gzipReader, err = gzip.NewReader(f.readHandle); err != nil {
panic(err)
}
return f.gzipReader return f.gzipReader
} }
return f.readHandle return f.readHandle
} }
func (f *File) Writer() io.WriteCloser { func (f *FileWriter) Writer() io.WriteCloser {
if f.Gzip() { if f.Gzip() {
f.gzipWriter = gzip.NewWriter(f.writeHandle)
return f.gzipWriter return f.gzipWriter
} }
return f.writeHandle return f.writeHandle
} }
func (f *File) ReadWriter() io.ReadWriteCloser {
return f.writeHandle
}

View File

@ -10,26 +10,34 @@ _ "os"
"net/http" "net/http"
"strings" "strings"
"fmt" "fmt"
"bytes"
"context" "context"
"path/filepath" "path/filepath"
"log/slog"
"io/fs"
) )
type BufferCloser struct { type Pipe struct {
stream io.Closer Reader io.ReadCloser
*bytes.Buffer Writer io.WriteCloser
}
type HTTPConnection struct {
stream *Pipe
request *http.Request
response *http.Response
Client *http.Client
} }
type HTTP struct { type HTTP struct {
uri *url.URL uri *url.URL
path string path string
gzip bool
exttype string exttype string
fileext string fileext string
buffer BufferCloser
getRequest *http.Request ctx context.Context
getResponse *http.Response get *HTTPConnection
postRequest *http.Request post *HTTPConnection
postResponse *http.Response
Client *http.Client Client *http.Client
} }
@ -37,26 +45,99 @@ func HTTPExists(u *url.URL) bool {
return false return false
} }
func (b BufferCloser) Close() error { func NewPipe() *Pipe {
if b.stream != nil { r,w := io.Pipe()
return b.stream.Close() return &Pipe{ Reader: r, Writer: w }
} }
return nil
func NewHTTPConnection(client *http.Client) *HTTPConnection {
return &HTTPConnection {
Client: client,
}
}
func (h *HTTPConnection) NewPostRequest(ctx context.Context, uri string) (err error) {
h.stream = NewPipe()
h.request, err = http.NewRequestWithContext(ctx, "POST", uri, h.Reader())
return
}
func (h *HTTPConnection) NewGetRequest(ctx context.Context, uri string) (err error) {
h.request, err = http.NewRequestWithContext(ctx, "GET", uri, nil)
return
}
func (h *HTTPConnection) Request() *http.Request {
return h.request
}
func (h *HTTPConnection) Response() *http.Response {
return h.response
}
func (h *HTTPConnection) Writer() io.WriteCloser {
return h.stream.Writer
}
func (h *HTTPConnection) Reader() io.ReadCloser {
return h.stream.Reader
}
func (h *HTTPConnection) Do() (err error) {
slog.Info("transport.HTTPConnection.Do()", "connection", h)
h.response, err = h.Client.Do(h.request)
return
}
func (h *HTTPConnection) Read(p []byte) (n int, err error) {
if h.response == nil {
if err = h.Do(); err != nil {
return
}
}
return h.response.Body.Read(p)
}
func (h *HTTPConnection) Write(p []byte) (n int, err error) {
if h.response == nil {
if err = h.Do(); err != nil {
return
}
}
slog.Info("transport.HTTPConnection.Write()", "data", p, "connection", h)
return h.Writer().Write(p)
}
func (h *HTTPConnection) ReadFrom(r io.Reader) (n int64, err error) {
h.request.Body = r.(io.ReadCloser)
if h.response == nil {
if err = h.Do(); err != nil {
return
}
}
return h.request.ContentLength, nil
}
func (h *HTTPConnection) Close() (err error) {
if h.response != nil {
defer h.response.Body.Close()
}
if h.stream != nil {
err = h.Writer().Close()
}
return
} }
func NewHTTP(u *url.URL, ctx context.Context) (h *HTTP, err error) { func NewHTTP(u *url.URL, ctx context.Context) (h *HTTP, err error) {
h = &HTTP { h = &HTTP {
ctx: ctx,
uri: u, uri: u,
path: filepath.Join(u.Hostname(), u.RequestURI()), path: filepath.Join(u.Hostname(), u.RequestURI()),
Client: http.DefaultClient, Client: http.DefaultClient,
} }
h.extension()
h.postRequest, err = http.NewRequestWithContext(ctx, "POST", u.String(), h.buffer) h.extension()
if err != nil { h.DetectGzip()
return
}
h.getRequest, err = http.NewRequestWithContext(ctx, "GET", u.String(), nil)
return return
} }
@ -70,6 +151,10 @@ func (h *HTTP) extension() {
h.exttype = elements[numberOfElements - 1] h.exttype = elements[numberOfElements - 1]
} }
func (h *HTTP) DetectGzip() {
h.gzip = (h.uri.Query().Get("gzip") == "true" || h.fileext == "gz")
}
func (h *HTTP) URI() *url.URL { func (h *HTTP) URI() *url.URL {
return h.uri return h.uri
} }
@ -79,8 +164,8 @@ func (h *HTTP) Path() string {
} }
func (h *HTTP) Signature() (documentSignature string) { func (h *HTTP) Signature() (documentSignature string) {
if h.getResponse != nil { if h.get.Response() != nil {
documentSignature = h.getResponse.Header.Get("Signature") documentSignature = h.get.Response().Header.Get("Signature")
if documentSignature == "" { if documentSignature == "" {
signatureResp, signatureErr := h.Client.Get(fmt.Sprintf("%s.sig", h.uri.String())) signatureResp, signatureErr := h.Client.Get(fmt.Sprintf("%s.sig", h.uri.String()))
if signatureErr == nil { if signatureErr == nil {
@ -95,8 +180,12 @@ func (h *HTTP) Signature() (documentSignature string) {
return documentSignature return documentSignature
} }
func (h *HTTP) Stat() (info fs.FileInfo, err error) {
return
}
func (h *HTTP) ContentType() (contenttype string) { func (h *HTTP) ContentType() (contenttype string) {
contenttype = h.getResponse.Header.Get("Content-Type") contenttype = h.get.Response().Header.Get("Content-Type")
switch contenttype { switch contenttype {
case "application/octet-stream": case "application/octet-stream":
return h.exttype return h.exttype
@ -105,22 +194,50 @@ func (h *HTTP) ContentType() (contenttype string) {
return return
} }
func (h *HTTP) SetGzip(gzip bool) {
h.gzip = gzip
}
func (h *HTTP) Gzip() bool { func (h *HTTP) Gzip() bool {
return h.fileext == "gz" return h.gzip
} }
func (h *HTTP) Reader() io.ReadCloser { func (h *HTTP) Reader() io.ReadCloser {
var err error if h.get == nil {
if h.getResponse, err = h.Client.Do(h.getRequest); err != nil { h.get = NewHTTPConnection(h.Client)
if err := h.get.NewGetRequest(h.ctx, h.uri.String()); err != nil {
panic(err) panic(err)
} }
return h.getResponse.Body }
return h.get
} }
func (h *HTTP) Writer() io.WriteCloser { func (h *HTTP) Writer() io.WriteCloser {
var err error if h.post == nil {
if h.postResponse, err = h.Client.Do(h.postRequest); err != nil { h.post = NewHTTPConnection(h.Client)
if err := h.post.NewPostRequest(h.ctx, h.uri.String()); err != nil {
panic(err) panic(err)
} }
return h.buffer }
return h.post
}
func (h *HTTP) ReadWriter() io.ReadWriteCloser {
return nil
}
func (h *HTTP) GetRequest() *http.Request {
return h.get.Request()
}
func (h *HTTP) GetResponse() *http.Response {
return h.get.Response()
}
func (h *HTTP) PostRequest() *http.Request {
return h.post.Request()
}
func (h *HTTP) PostResponse() *http.Response {
return h.post.Response()
} }

View File

@ -5,17 +5,102 @@ package transport
import ( import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"testing" "testing"
_ "fmt" "fmt"
_ "os" _ "os"
"io"
"net/url" "net/url"
_ "path/filepath" _ "path/filepath"
"context" "context"
"net/http"
"net/http/httptest"
) )
func TestNewTransportHTTPReader(t *testing.T) { func TestNewTransportHTTPReader(t *testing.T) {
u, urlErr := url.Parse("https://localhost/resource") //ctx := context.Background()
body := []byte(`
type: "user"
attributes:
name: "foo"
gecos: "foo user"
`)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/resource/user")
n,e := rw.Write(body)
assert.Nil(t, e)
assert.Greater(t, n, 0)
assert.Equal(t, "bar", req.Header.Get("foo"))
}))
defer server.Close()
u, urlErr := url.Parse(fmt.Sprintf("%s/resource/user", server.URL))
assert.Nil(t, urlErr) assert.Nil(t, urlErr)
h, err := NewHTTP(u, context.Background()) h, err := NewHTTP(u, context.Background())
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, h) assert.NotNil(t, h)
h.Reader()
h.GetRequest().Header.Add("foo", "bar")
resData, readErr := io.ReadAll(h.Reader())
assert.Nil(t, readErr)
assert.Greater(t, len(resData), 0)
assert.Equal(t, body, resData)
}
func TestNewTransportHTTPWriter(t *testing.T) {
body := []byte(`
type: "user"
attributes:
name: "foo"
gecos: "foo user"
`)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/resource/user")
n, postBody := io.ReadAll(req.Body)
assert.Greater(t, n, 0)
assert.Equal(t, "bar", req.Header.Get("foo"))
assert.Equal(t, body, postBody)
}))
defer server.Close()
u, urlErr := url.Parse(fmt.Sprintf("%s/resource/user", server.URL))
assert.Nil(t, urlErr)
h, err := NewHTTP(u, context.Background())
assert.Nil(t, err)
assert.NotNil(t, h)
h.Writer()
h.PostRequest().Header.Add("foo", "bar")
// _, writeErr := h.Writer().Write(body)
// assert.Nil(t, writeErr)
}
func TestNewHTTPConnection(t *testing.T) {
ctx := context.Background()
h := NewHTTPConnection(http.DefaultClient)
assert.NotNil(t, h)
body := []byte(`
type: "user"
attributes:
name: "foo"
gecos: "foo user"
`)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/resource/user")
n,e := rw.Write(body)
assert.Nil(t, e)
assert.Greater(t, n, 0)
assert.Equal(t, "bar", req.Header.Get("foo"))
}))
defer server.Close()
uri := fmt.Sprintf("%s/resource/user", server.URL)
assert.Nil(t, h.NewGetRequest(ctx, uri))
h.Request().Header.Add("foo", "bar")
responseData, responseErr := io.ReadAll(h)
assert.Nil(t, responseErr)
assert.Equal(t, body, responseData)
} }

View File

@ -10,6 +10,7 @@ _ "net/http"
_ "strings" _ "strings"
_ "path/filepath" _ "path/filepath"
"io" "io"
"io/fs"
_ "os" _ "os"
"context" "context"
) )
@ -17,9 +18,17 @@ _ "os"
type Handler interface { type Handler interface {
URI() *url.URL URI() *url.URL
ContentType() string ContentType() string
SetGzip(bool)
Gzip() bool Gzip() bool
Signature() string Signature() string
Stat() (fs.FileInfo, error)
}
type HandlerReader interface {
Reader() io.ReadCloser Reader() io.ReadCloser
}
type HandlerWriter interface {
Writer() io.WriteCloser Writer() io.WriteCloser
} }
@ -31,7 +40,10 @@ type Reader struct {
} }
func NewReader(u *url.URL) (reader *Reader, e error) { func NewReader(u *url.URL) (reader *Reader, e error) {
ctx := context.Background() return NewReaderWithContext(u, context.Background())
}
func NewReaderWithContext(u *url.URL, ctx context.Context) (reader *Reader, e error) {
reader = &Reader{ uri: u } reader = &Reader{ uri: u }
switch u.Scheme { switch u.Scheme {
case "http", "https": case "http", "https":
@ -39,10 +51,10 @@ func NewReader(u *url.URL) (reader *Reader, e error) {
case "file": case "file":
fallthrough fallthrough
default: default:
reader.handle, e = NewFile(u) reader.handle, e = NewFileReader(u)
reader.exists = func() bool { return FileExists(u) } reader.exists = func() bool { return FileExists(u) }
} }
reader.SetStream(reader.handle.Reader()) reader.SetStream(reader.handle.(HandlerReader).Reader())
return return
} }
@ -62,18 +74,23 @@ type Writer struct {
} }
func NewWriter(u *url.URL) (writer *Writer, e error) { func NewWriter(u *url.URL) (writer *Writer, e error) {
ctx := context.Background() return NewWriterWithContext(u, context.Background())
}
func NewWriterWithContext(u *url.URL, ctx context.Context) (writer *Writer, e error) {
writer = &Writer{ uri: u } writer = &Writer{ uri: u }
switch u.Scheme { switch u.Scheme {
case "http", "https": case "http", "https":
writer.handle, e = NewHTTP(u, ctx) writer.handle, e = NewHTTP(u, ctx)
case "file": case "file":
fallthrough fallthrough
default: default:
writer.handle, e = NewFile(u) writer.handle, e = NewFileWriter(u)
writer.exists = func() bool { return FileExists(u) } writer.exists = func() bool { return FileExists(u) }
} }
writer.SetStream(writer.handle.Writer())
writer.SetStream(writer.handle.(HandlerWriter).Writer())
return writer, e return writer, e
} }
@ -117,10 +134,18 @@ func (r *Reader) ContentType() string {
return r.handle.ContentType() return r.handle.ContentType()
} }
func (r *Reader) SetGzip(value bool) {
r.handle.SetGzip(value)
}
func (r *Reader) Gzip() bool { func (r *Reader) Gzip() bool {
return r.handle.Gzip() return r.handle.Gzip()
} }
func (r *Reader) Stat() (info fs.FileInfo, err error) {
return r.handle.Stat()
}
func (r *Reader) Signature() string { func (r *Reader) Signature() string {
return r.handle.Signature() return r.handle.Signature()
} }
@ -129,6 +154,17 @@ func (r *Reader) SetStream(s io.ReadCloser) {
r.stream = s r.stream = s
} }
func (r *Reader) AddHeader(name string, value string) {
r.handle.(*HTTP).GetRequest().Header.Add(name, value)
}
func (r *Reader) Status() string {
return r.handle.(*HTTP).GetResponse().Status
}
func (r *Reader) StatusCode() int {
return r.handle.(*HTTP).GetResponse().StatusCode
}
func (w *Writer) Exists() bool { return w.exists() } func (w *Writer) Exists() bool { return w.exists() }
@ -136,6 +172,14 @@ func (w *Writer) Write(b []byte) (int, error) {
return w.stream.Write(b) return w.stream.Write(b)
} }
func (w *Writer) ReadFrom(r io.Reader) (n int64, e error) {
if v, ok := w.stream.(io.ReaderFrom); ok {
return v.ReadFrom(r)
} else {
panic("io.ReaderFrom interface not supported by writer")
}
}
func (w *Writer) Close() error { func (w *Writer) Close() error {
return w.stream.Close() return w.stream.Close()
} }
@ -144,6 +188,10 @@ func (w *Writer) ContentType() string {
return w.handle.ContentType() return w.handle.ContentType()
} }
func (w *Writer) SetGzip(value bool) {
w.handle.SetGzip(value)
}
func (w *Writer) Gzip() bool { func (w *Writer) Gzip() bool {
return w.handle.Gzip() return w.handle.Gzip()
} }
@ -155,3 +203,15 @@ func (w *Writer) Signature() string {
func (w *Writer) SetStream(s io.WriteCloser) { func (w *Writer) SetStream(s io.WriteCloser) {
w.stream = s w.stream = s
} }
func (w *Writer) AddHeader(name string, value string) {
w.handle.(*HTTP).PostRequest().Header.Add(name, value)
}
func (w *Writer) Status() string {
return w.handle.(*HTTP).PostResponse().Status
}
func (w *Writer) StatusCode() int {
return w.handle.(*HTTP).PostResponse().StatusCode
}

View File

@ -62,7 +62,7 @@ func TestTransportReaderContentType(t *testing.T) {
assert.True(t, reader.Exists()) assert.True(t, reader.Exists())
assert.NotNil(t, reader) assert.NotNil(t, reader)
assert.Equal(t, reader.ContentType(), "yaml") assert.Equal(t, "jx.yaml", reader.ContentType())
} }
func TestTransportReaderDir(t *testing.T) { func TestTransportReaderDir(t *testing.T) {