From 1c6b113e152f3c67911f6f5859ee47d44dee82a8 Mon Sep 17 00:00:00 2001 From: Matthew Rich Date: Sun, 10 Nov 2024 10:15:27 -0800 Subject: [PATCH] add containerlog stream handling --- internal/containerlog/header.go | 39 ++++++++++++ internal/containerlog/header_test.go | 28 ++++++++ internal/containerlog/streamreader.go | 74 ++++++++++++++++++++++ internal/containerlog/streamreader_test.go | 35 ++++++++++ internal/containerlog/streamtype.go | 38 +++++++++++ internal/containerlog/streamtype_test.go | 20 ++++++ 6 files changed, 234 insertions(+) create mode 100644 internal/containerlog/header.go create mode 100644 internal/containerlog/header_test.go create mode 100644 internal/containerlog/streamreader.go create mode 100644 internal/containerlog/streamreader_test.go create mode 100644 internal/containerlog/streamtype.go create mode 100644 internal/containerlog/streamtype_test.go diff --git a/internal/containerlog/header.go b/internal/containerlog/header.go new file mode 100644 index 0000000..27df1ee --- /dev/null +++ b/internal/containerlog/header.go @@ -0,0 +1,39 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package containerlog + +import ( + "io" + "encoding/binary" +) + +func Header(r io.Reader) (s StreamType, size uint64, err error) { + var header []byte = make([]byte, 8) + if _, err = io.ReadFull(r, header); err == nil { + s = StreamType(header[0]) + if err = s.Validate(); err == nil { + header[0] = 0x0 + size = binary.BigEndian.Uint64(header) + } + } + return +} + +func ReadMessage(r io.Reader, size uint64) (message string, err error) { + var messageData []byte = make([]byte, size) + var bytesRead int + if bytesRead, err = r.Read(messageData); err == nil && uint64(bytesRead) == size { + message = string(messageData) + } + return +} + +func Read(r io.Reader) (s StreamType, message string, err error) { + var messageSize uint64 + if s, messageSize, err = Header(r); err == nil { + if message, err = ReadMessage(r, messageSize); err == nil { + return + } + } + return +} diff --git a/internal/containerlog/header_test.go b/internal/containerlog/header_test.go new file mode 100644 index 0000000..a2cc257 --- /dev/null +++ b/internal/containerlog/header_test.go @@ -0,0 +1,28 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package containerlog + +import ( + "github.com/stretchr/testify/assert" + "testing" + "bytes" +) + +func TestLogHeader(t *testing.T) { + for _, v := range []struct{ expected error; value []byte } { + { expected: nil, value: StreamStdout.Log("test message") }, + { expected: nil, value: StreamStderr.Log("test error") }, + { expected: ErrInvalidStreamType, value: StreamType(0x3).Log("fail") }, + { expected: ErrInvalidStreamType, value: StreamType(0x4).Log("fail") }, + } { + var buf bytes.Buffer + _, e := buf.Write(v.value) + assert.Nil(t, e) + logType, logSize, err := Header(&buf) + assert.ErrorIs(t, err, v.expected) + assert.ErrorIs(t, logType.Validate(), v.expected) + if err == nil { + assert.Equal(t, uint64(len(v.value) - 8), logSize) + } + } +} diff --git a/internal/containerlog/streamreader.go b/internal/containerlog/streamreader.go new file mode 100644 index 0000000..abbaf4a --- /dev/null +++ b/internal/containerlog/streamreader.go @@ -0,0 +1,74 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package containerlog + +import ( + "bytes" + "io" +) + +type StreamReader struct { + source io.Reader + streams []*ReadBuffer +} + +type ReadBuffer struct { + streamtype StreamType + reader *StreamReader + bytes.Buffer +} + +func NewStreamReader(source io.Reader) (s *StreamReader) { + s = &StreamReader{ + source: source, + streams: make([]*ReadBuffer, 3), + } + s.streams[StreamStdout] = &ReadBuffer{ + streamtype: StreamStdout, + reader: s, + } + s.streams[StreamStderr] = &ReadBuffer{ + streamtype: StreamStderr, + reader: s, + } + return +} + +func (s *StreamReader) StdoutPipe() io.ReadCloser { + return s.streams[StreamStdout] +} + +func (s *StreamReader) StderrPipe() io.ReadCloser { + return s.streams[StreamStderr] +} + +func (b *ReadBuffer) Read(p []byte) (n int, e error) { + for { + if b.reader.streams[b.streamtype].Len() >= len(p) { + break + } + + streamtype, message, err := Read(b.reader.source) + + if err != nil { + break + } + + if b.reader.streams[streamtype] == nil { + b.reader.streams[streamtype] = &ReadBuffer{ + streamtype: streamtype, + reader: b.reader, + } + } + + if bytesRead, bufferErr := b.reader.streams[streamtype].WriteString(message); bytesRead != len(message) || bufferErr != nil { + break + } + } + + return b.Buffer.Read(p) +} + +func (b *ReadBuffer) Close() error { + return nil +} diff --git a/internal/containerlog/streamreader_test.go b/internal/containerlog/streamreader_test.go new file mode 100644 index 0000000..588d0df --- /dev/null +++ b/internal/containerlog/streamreader_test.go @@ -0,0 +1,35 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package containerlog + +import ( + "github.com/stretchr/testify/assert" + "testing" + "bytes" + "io" +) + +func TestStreamReader(t *testing.T) { + + var logs bytes.Buffer + logs.Write(StreamStdout.Log("stdout log message")) + logs.Write(StreamStderr.Log("stderr log message")) + logs.Write(StreamStdout.Log("stdout log message - line 2")) + logs.Write(StreamStderr.Log("stderr log message - line 2")) + logs.Write(StreamStderr.Log("stderr log message - line 3")) + + sr := NewStreamReader(&logs) + outpipe := sr.StdoutPipe() + errpipe := sr.StderrPipe() + + var message []byte = make([]byte, 20) + n, ee := errpipe.Read(message) + assert.Nil(t, ee) + assert.Equal(t, 20, n) + + assert.Equal(t, "stderr log messagest", string(message)) + + ov, oe := io.ReadAll(outpipe) + assert.Nil(t, oe) + assert.Equal(t, "stdout log messagestdout log message - line 2", string(ov)) +} diff --git a/internal/containerlog/streamtype.go b/internal/containerlog/streamtype.go new file mode 100644 index 0000000..a3d548a --- /dev/null +++ b/internal/containerlog/streamtype.go @@ -0,0 +1,38 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package containerlog + +import ( + "fmt" + "errors" + "encoding/binary" +) + +type StreamType byte + +const ( + StreamStdin StreamType = 0x0 + StreamStdout StreamType = 0x1 + StreamStderr StreamType = 0x2 +) + +var ( + ErrInvalidStreamType error = errors.New("Invalid container log stream type") +) + +func (s StreamType) Validate() error { + switch s { + case StreamStdin, StreamStdout, StreamStderr: + return nil + } + return fmt.Errorf("%w: %d", ErrInvalidStreamType, s) +} + +func (s StreamType) Log(msg string) (log []byte) { + msgLen := len(msg) + log = make([]byte, 8 + msgLen) + binary.BigEndian.PutUint64(log, uint64(msgLen)) + log[0] = byte(s) + copy(log[8:], []byte(msg)) + return +} diff --git a/internal/containerlog/streamtype_test.go b/internal/containerlog/streamtype_test.go new file mode 100644 index 0000000..7526ae2 --- /dev/null +++ b/internal/containerlog/streamtype_test.go @@ -0,0 +1,20 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package containerlog + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestStreamType(t *testing.T) { + for _, v := range []struct{ expected error; value StreamType } { + { expected: nil, value: 0x0 }, + { expected: nil, value: 0x1 }, + { expected: nil, value: 0x2 }, + { expected: ErrInvalidStreamType, value: 0x3 }, + { expected: ErrInvalidStreamType, value: 0x4 }, + } { + assert.ErrorIs(t, v.value.Validate(), v.expected) + } +}