add containerlog stream handling

This commit is contained in:
Matthew Rich 2024-11-10 10:15:27 -08:00
parent 08b2f5301f
commit 1c6b113e15
6 changed files with 234 additions and 0 deletions

View File

@ -0,0 +1,39 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"io"
"encoding/binary"
)
func Header(r io.Reader) (s StreamType, size uint64, err error) {
var header []byte = make([]byte, 8)
if _, err = io.ReadFull(r, header); err == nil {
s = StreamType(header[0])
if err = s.Validate(); err == nil {
header[0] = 0x0
size = binary.BigEndian.Uint64(header)
}
}
return
}
func ReadMessage(r io.Reader, size uint64) (message string, err error) {
var messageData []byte = make([]byte, size)
var bytesRead int
if bytesRead, err = r.Read(messageData); err == nil && uint64(bytesRead) == size {
message = string(messageData)
}
return
}
func Read(r io.Reader) (s StreamType, message string, err error) {
var messageSize uint64
if s, messageSize, err = Header(r); err == nil {
if message, err = ReadMessage(r, messageSize); err == nil {
return
}
}
return
}

View File

@ -0,0 +1,28 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"github.com/stretchr/testify/assert"
"testing"
"bytes"
)
func TestLogHeader(t *testing.T) {
for _, v := range []struct{ expected error; value []byte } {
{ expected: nil, value: StreamStdout.Log("test message") },
{ expected: nil, value: StreamStderr.Log("test error") },
{ expected: ErrInvalidStreamType, value: StreamType(0x3).Log("fail") },
{ expected: ErrInvalidStreamType, value: StreamType(0x4).Log("fail") },
} {
var buf bytes.Buffer
_, e := buf.Write(v.value)
assert.Nil(t, e)
logType, logSize, err := Header(&buf)
assert.ErrorIs(t, err, v.expected)
assert.ErrorIs(t, logType.Validate(), v.expected)
if err == nil {
assert.Equal(t, uint64(len(v.value) - 8), logSize)
}
}
}

View File

@ -0,0 +1,74 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"bytes"
"io"
)
type StreamReader struct {
source io.Reader
streams []*ReadBuffer
}
type ReadBuffer struct {
streamtype StreamType
reader *StreamReader
bytes.Buffer
}
func NewStreamReader(source io.Reader) (s *StreamReader) {
s = &StreamReader{
source: source,
streams: make([]*ReadBuffer, 3),
}
s.streams[StreamStdout] = &ReadBuffer{
streamtype: StreamStdout,
reader: s,
}
s.streams[StreamStderr] = &ReadBuffer{
streamtype: StreamStderr,
reader: s,
}
return
}
func (s *StreamReader) StdoutPipe() io.ReadCloser {
return s.streams[StreamStdout]
}
func (s *StreamReader) StderrPipe() io.ReadCloser {
return s.streams[StreamStderr]
}
func (b *ReadBuffer) Read(p []byte) (n int, e error) {
for {
if b.reader.streams[b.streamtype].Len() >= len(p) {
break
}
streamtype, message, err := Read(b.reader.source)
if err != nil {
break
}
if b.reader.streams[streamtype] == nil {
b.reader.streams[streamtype] = &ReadBuffer{
streamtype: streamtype,
reader: b.reader,
}
}
if bytesRead, bufferErr := b.reader.streams[streamtype].WriteString(message); bytesRead != len(message) || bufferErr != nil {
break
}
}
return b.Buffer.Read(p)
}
func (b *ReadBuffer) Close() error {
return nil
}

View File

@ -0,0 +1,35 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"github.com/stretchr/testify/assert"
"testing"
"bytes"
"io"
)
func TestStreamReader(t *testing.T) {
var logs bytes.Buffer
logs.Write(StreamStdout.Log("stdout log message"))
logs.Write(StreamStderr.Log("stderr log message"))
logs.Write(StreamStdout.Log("stdout log message - line 2"))
logs.Write(StreamStderr.Log("stderr log message - line 2"))
logs.Write(StreamStderr.Log("stderr log message - line 3"))
sr := NewStreamReader(&logs)
outpipe := sr.StdoutPipe()
errpipe := sr.StderrPipe()
var message []byte = make([]byte, 20)
n, ee := errpipe.Read(message)
assert.Nil(t, ee)
assert.Equal(t, 20, n)
assert.Equal(t, "stderr log messagest", string(message))
ov, oe := io.ReadAll(outpipe)
assert.Nil(t, oe)
assert.Equal(t, "stdout log messagestdout log message - line 2", string(ov))
}

View File

@ -0,0 +1,38 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"fmt"
"errors"
"encoding/binary"
)
type StreamType byte
const (
StreamStdin StreamType = 0x0
StreamStdout StreamType = 0x1
StreamStderr StreamType = 0x2
)
var (
ErrInvalidStreamType error = errors.New("Invalid container log stream type")
)
func (s StreamType) Validate() error {
switch s {
case StreamStdin, StreamStdout, StreamStderr:
return nil
}
return fmt.Errorf("%w: %d", ErrInvalidStreamType, s)
}
func (s StreamType) Log(msg string) (log []byte) {
msgLen := len(msg)
log = make([]byte, 8 + msgLen)
binary.BigEndian.PutUint64(log, uint64(msgLen))
log[0] = byte(s)
copy(log[8:], []byte(msg))
return
}

View File

@ -0,0 +1,20 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestStreamType(t *testing.T) {
for _, v := range []struct{ expected error; value StreamType } {
{ expected: nil, value: 0x0 },
{ expected: nil, value: 0x1 },
{ expected: nil, value: 0x2 },
{ expected: ErrInvalidStreamType, value: 0x3 },
{ expected: ErrInvalidStreamType, value: 0x4 },
} {
assert.ErrorIs(t, v.value.Validate(), v.expected)
}
}