Compare commits
No commits in common. "94f3998fcda6fcc48962604506bb1a9001516fe2" and "08b2f5301f037ba7a45291f19b06ee911293d83e" have entirely different histories.
94f3998fcd
...
08b2f5301f
@ -4,11 +4,14 @@ package command
|
||||
|
||||
import (
|
||||
_ "context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"errors"
|
||||
"gopkg.in/yaml.v3"
|
||||
"io"
|
||||
"log/slog"
|
||||
_ "net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"text/template"
|
||||
@ -16,23 +19,14 @@ _ "net/url"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// A resource that implements the ExecProvider interface can be used as an exec target.
|
||||
type CommandProvider interface {
|
||||
Start() error
|
||||
Wait() error
|
||||
SetCmdEnv([]string)
|
||||
SetStdin(io.Reader)
|
||||
StdinPipe() (io.WriteCloser, error)
|
||||
StdoutPipe() (io.ReadCloser, error)
|
||||
StderrPipe() (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
var ErrUnknownCommand error = errors.New("Unable to find command in path")
|
||||
|
||||
type CommandExecutor func(value any) ([]byte, error)
|
||||
type CommandExtractAttributes func(output []byte, target any) error
|
||||
type CommandExists func() error
|
||||
|
||||
type CommandArg string
|
||||
|
||||
type CommandInput string
|
||||
|
||||
type Command struct {
|
||||
@ -50,9 +44,6 @@ type Command struct {
|
||||
CommandExists CommandExists `json:"-" yaml:"-"`
|
||||
Input CommandInput `json:"-" yaml:"-"`
|
||||
stdin io.Reader `json:"-" yaml:"-"`
|
||||
|
||||
TargetRef CommandTargetRef `json:"targetref,omitempty" yaml:"targetref,omitempty"`
|
||||
execHandle CommandProvider `json:"-" yaml:"-"`
|
||||
}
|
||||
|
||||
func NewCommand() *Command {
|
||||
@ -79,20 +70,21 @@ func (c *Command) Defaults() {
|
||||
}
|
||||
c.Executor = func(value any) ([]byte, error) {
|
||||
c.ClearOutput()
|
||||
|
||||
c.execHandle = c.TargetRef.Provider(c, value)
|
||||
|
||||
args, err := c.Template(value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if inputErr := c.SetInput(value); inputErr != nil {
|
||||
return nil, inputErr
|
||||
}
|
||||
cmd := exec.Command(c.Path, args...)
|
||||
c.SetCmdEnv(cmd)
|
||||
|
||||
c.SetCmdEnv()
|
||||
|
||||
cmd := c.execHandle
|
||||
if c.stdin != nil {
|
||||
cmd.SetStdin(c.stdin)
|
||||
cmd.Stdin = c.stdin
|
||||
}
|
||||
|
||||
slog.Info("execute() - cmd", "path", c.Path, "args", args)
|
||||
output, stdoutPipeErr := cmd.StdoutPipe()
|
||||
if stdoutPipeErr != nil {
|
||||
return nil, stdoutPipeErr
|
||||
@ -121,13 +113,11 @@ func (c *Command) Defaults() {
|
||||
c.Stderr = string(stdErrOutput)
|
||||
c.ExitCode = c.GetExitCodeFromError(waitErr)
|
||||
|
||||
/*
|
||||
if len(stdOutOutput) > 100 {
|
||||
slog.Info("execute()", "path", c.Path, "args", args, "output", string(stdOutOutput[:100]), "error", string(stdErrOutput))
|
||||
} else {
|
||||
slog.Info("execute()", "path", c.Path, "args", args, "output", string(stdOutOutput), "error", string(stdErrOutput))
|
||||
}
|
||||
*/
|
||||
|
||||
if len(stdErrOutput) > 0 && c.FailOnError {
|
||||
return stdOutOutput, fmt.Errorf("%w %s", waitErr, string(stdErrOutput))
|
||||
@ -144,8 +134,8 @@ func (c *Command) LoadDecl(yamlResourceDeclaration string) error {
|
||||
return codec.NewYAMLStringDecoder(yamlResourceDeclaration).Decode(c)
|
||||
}
|
||||
|
||||
func (c *Command) SetCmdEnv() {
|
||||
c.execHandle.SetCmdEnv(c.Env)
|
||||
func (c *Command) SetCmdEnv(cmd *exec.Cmd) {
|
||||
cmd.Env = append(os.Environ(), c.Env...)
|
||||
}
|
||||
|
||||
func (c *Command) SetStdinReader(r io.Reader) {
|
||||
@ -198,10 +188,8 @@ func (c *Command) Execute(value any) ([]byte, error) {
|
||||
func (c *Command) SetInput(value any) error {
|
||||
if len(c.Input) > 0 {
|
||||
if r, err := c.Input.Template(value); err != nil {
|
||||
slog.Info("Command.SetInput", "input", r.String(), "error", err)
|
||||
return err
|
||||
} else {
|
||||
slog.Info("Command.SetInput", "input", r.String())
|
||||
c.SetStdinReader(strings.NewReader(r.String()))
|
||||
}
|
||||
}
|
||||
@ -212,3 +200,24 @@ func (c *CommandInput) Template(value any) (result strings.Builder, err error) {
|
||||
err = template.Must(template.New("commandInput").Parse(string(*c))).Execute(&result, value)
|
||||
return
|
||||
}
|
||||
|
||||
func (c *CommandArg) UnmarshalValue(value string) error {
|
||||
*c = CommandArg(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CommandArg) UnmarshalJSON(data []byte) error {
|
||||
var s string
|
||||
if unmarshalRouteTypeErr := json.Unmarshal(data, &s); unmarshalRouteTypeErr != nil {
|
||||
return unmarshalRouteTypeErr
|
||||
}
|
||||
return c.UnmarshalValue(s)
|
||||
}
|
||||
|
||||
func (c *CommandArg) UnmarshalYAML(value *yaml.Node) error {
|
||||
var s string
|
||||
if err := value.Decode(&s); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.UnmarshalValue(s)
|
||||
}
|
||||
|
@ -1,31 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
type CommandArg string
|
||||
|
||||
func (c *CommandArg) UnmarshalValue(value string) error {
|
||||
*c = CommandArg(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CommandArg) UnmarshalJSON(data []byte) error {
|
||||
var s string
|
||||
if unmarshalRouteTypeErr := json.Unmarshal(data, &s); unmarshalRouteTypeErr != nil {
|
||||
return unmarshalRouteTypeErr
|
||||
}
|
||||
return c.UnmarshalValue(s)
|
||||
}
|
||||
|
||||
func (c *CommandArg) UnmarshalYAML(value *yaml.Node) error {
|
||||
var s string
|
||||
if err := value.Decode(&s); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.UnmarshalValue(s)
|
||||
}
|
@ -1,30 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"decl/internal/identifier"
|
||||
)
|
||||
|
||||
type CommandTargetRef identifier.ID
|
||||
|
||||
func (r CommandTargetRef) GetType() (CommandType) {
|
||||
uri := identifier.ID(r).Parse()
|
||||
if uri != nil {
|
||||
var ct CommandType = CommandType(uri.Scheme)
|
||||
if ct.Validate() == nil {
|
||||
return ct
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (r CommandTargetRef) Provider(cmd *Command, value any) CommandProvider {
|
||||
return r.GetType().Provider(cmd, value)
|
||||
}
|
||||
|
||||
func (r CommandTargetRef) Name() string {
|
||||
u := identifier.ID(r).Parse()
|
||||
return filepath.Join(u.Hostname(), u.Path)
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidCommandType error = errors.New("Invalid command type")
|
||||
)
|
||||
|
||||
type CommandType string
|
||||
|
||||
const (
|
||||
ExecCommand CommandType = "exec"
|
||||
ContainerCommand CommandType = "container"
|
||||
SSHCommand CommandType = "ssh"
|
||||
)
|
||||
|
||||
|
||||
func (t CommandType) Validate() error {
|
||||
switch t {
|
||||
case ExecCommand, ContainerCommand, SSHCommand:
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("%w: %s", ErrInvalidCommandType, t)
|
||||
}
|
||||
|
||||
func (t CommandType) Provider(cmd *Command, value any) CommandProvider {
|
||||
switch t {
|
||||
case ContainerCommand:
|
||||
return NewContainerProvider(cmd, value)
|
||||
case SSHCommand:
|
||||
return NewSSHProvider(cmd, value)
|
||||
default:
|
||||
fallthrough
|
||||
case ExecCommand:
|
||||
return NewExecProvider(cmd, value)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1,18 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCommandType(t *testing.T) {
|
||||
|
||||
var cmdType CommandType = "container"
|
||||
|
||||
assert.Nil(t, cmdType.Validate())
|
||||
}
|
||||
|
||||
|
@ -1,104 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
_ "fmt"
|
||||
"github.com/stretchr/testify/assert"
|
||||
_ "os"
|
||||
_ "strings"
|
||||
"testing"
|
||||
"bytes"
|
||||
)
|
||||
|
||||
/*
|
||||
func TestNewContainerProvider(t *testing.T) {
|
||||
c := NewContainerProvider(nil, nil)
|
||||
assert.NotNil(t, c)
|
||||
}
|
||||
*/
|
||||
|
||||
func TestContainerCommandLoad(t *testing.T) {
|
||||
c := NewCommand()
|
||||
assert.NotNil(t, c)
|
||||
|
||||
decl := `
|
||||
path: find
|
||||
args:
|
||||
- "{{ .Path }}"
|
||||
`
|
||||
|
||||
assert.Nil(t, c.LoadDecl(decl))
|
||||
assert.Equal(t, "find", c.Path)
|
||||
}
|
||||
|
||||
func TestContainerCommandTemplate(t *testing.T) {
|
||||
c := NewCommand()
|
||||
assert.NotNil(t, c)
|
||||
|
||||
decl := `
|
||||
path: find
|
||||
args:
|
||||
- "{{ .Path }}"
|
||||
`
|
||||
|
||||
assert.Nil(t, c.LoadDecl(decl))
|
||||
assert.Equal(t, "find", c.Path)
|
||||
assert.Equal(t, 1, len(c.Args))
|
||||
|
||||
f := struct { Path string } {
|
||||
Path: "./",
|
||||
}
|
||||
|
||||
args, templateErr := c.Template(f)
|
||||
assert.Nil(t, templateErr)
|
||||
assert.Equal(t, 1, len(args))
|
||||
|
||||
assert.Equal(t, "./", string(args[0]))
|
||||
|
||||
out, err := c.Execute(f)
|
||||
assert.Nil(t, err)
|
||||
assert.Greater(t, len(out), 0)
|
||||
}
|
||||
|
||||
func TestContainerCommandStdin(t *testing.T) {
|
||||
var expected string = "stdin test data"
|
||||
var stdinBuffer bytes.Buffer
|
||||
stdinBuffer.WriteString(expected)
|
||||
|
||||
c := NewCommand()
|
||||
assert.NotNil(t, c)
|
||||
|
||||
decl := `
|
||||
path: cat
|
||||
stdinavailable: true
|
||||
`
|
||||
|
||||
assert.Nil(t, c.LoadDecl(decl))
|
||||
assert.Equal(t, "cat", c.Path)
|
||||
|
||||
c.SetStdinReader(&stdinBuffer)
|
||||
out, err := c.Execute(nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expected, string(out))
|
||||
}
|
||||
|
||||
func TestContainerCommandExitCode(t *testing.T) {
|
||||
c := NewCommand()
|
||||
assert.NotNil(t, c)
|
||||
decl := `
|
||||
path: ls
|
||||
args:
|
||||
- "amissingfile"
|
||||
`
|
||||
|
||||
assert.Nil(t, c.LoadDecl(decl))
|
||||
assert.Equal(t, "ls", c.Path)
|
||||
|
||||
out, err := c.Execute(nil)
|
||||
assert.NotNil(t, err)
|
||||
assert.Greater(t, c.ExitCode, 0)
|
||||
assert.Equal(t, string(out), c.Stdout)
|
||||
assert.Equal(t, string("ls: amissingfile: No such file or directory\n"), c.Stderr)
|
||||
}
|
@ -1,142 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"decl/internal/containerlog"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/strslice"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"time"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
type ContainerExecClient interface {
|
||||
ContainerExecAttach(ctx context.Context, execID string, config container.ExecAttachOptions) (types.HijackedResponse, error)
|
||||
ContainerExecCreate(ctx context.Context, containerID string, options container.ExecOptions) (types.IDResponse, error)
|
||||
ContainerExecInspect(ctx context.Context, execID string) (container.ExecInspect, error)
|
||||
ContainerExecStart(ctx context.Context, execID string, config container.ExecStartOptions) (error)
|
||||
ContainerList(context.Context, container.ListOptions) ([]types.Container, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type ContainerCommandProvider struct {
|
||||
containerID string
|
||||
execID string
|
||||
ExitCode int
|
||||
container.ExecOptions
|
||||
response types.HijackedResponse
|
||||
pipes *containerlog.StreamReader
|
||||
Stdin io.Reader
|
||||
apiClient ContainerExecClient `json:"-" yaml:"-"`
|
||||
}
|
||||
|
||||
// ref could be a resource, but it just needs to implement the ExecResource interface
|
||||
func NewContainerProvider(cmd *Command, value any) (p *ContainerCommandProvider) {
|
||||
if args, err := cmd.Template(value); err == nil {
|
||||
p = &ContainerCommandProvider {
|
||||
ExecOptions: container.ExecOptions {
|
||||
Cmd: strslice.StrSlice(append([]string{cmd.Path}, args...)),
|
||||
AttachStdin: true,
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
},
|
||||
}
|
||||
p.containerID = p.ResolveId(context.Background(), cmd.TargetRef)
|
||||
slog.Info("command.NewContainerProvider", "command", cmd.Path, "args", args, "target", cmd.TargetRef, "container", p.containerID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *ContainerCommandProvider) ResolveId(ctx context.Context, ref CommandTargetRef) (containerID string) {
|
||||
name := ref.Name()
|
||||
filterArgs := filters.NewArgs()
|
||||
filterArgs.Add("name", "/"+name)
|
||||
containers, listErr := c.apiClient.ContainerList(ctx, container.ListOptions{
|
||||
All: true,
|
||||
Filters: filterArgs,
|
||||
})
|
||||
|
||||
if listErr != nil {
|
||||
panic(listErr)
|
||||
}
|
||||
|
||||
for _, container := range containers {
|
||||
for _, containerName := range container.Names {
|
||||
if containerName == "/"+name {
|
||||
containerID = container.ID
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *ContainerCommandProvider) Start() (err error) {
|
||||
var execIDResponse types.IDResponse
|
||||
ctx := context.Background()
|
||||
if execIDResponse, err = c.apiClient.ContainerExecCreate(ctx, c.containerID, c.ExecOptions); err == nil {
|
||||
c.execID = execIDResponse.ID
|
||||
if c.execID == "" {
|
||||
return fmt.Errorf("Failed creating a container exec ID")
|
||||
}
|
||||
|
||||
execStartCheck := types.ExecStartCheck{
|
||||
Tty: false,
|
||||
}
|
||||
|
||||
if c.response, err = c.apiClient.ContainerExecAttach(ctx, c.execID, execStartCheck); err == nil {
|
||||
c.pipes = containerlog.NewStreamReader(c.response.Conn)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *ContainerCommandProvider) Wait() (err error) {
|
||||
var containerDetails container.ExecInspect
|
||||
ctx := context.Background()
|
||||
for {
|
||||
// copy Stdin to the connection
|
||||
if c.Stdin != nil {
|
||||
io.Copy(c.response.Conn, c.Stdin)
|
||||
}
|
||||
|
||||
if containerDetails, err = c.apiClient.ContainerExecInspect(ctx, c.execID); err != nil || ! containerDetails.Running {
|
||||
c.ExitCode = containerDetails.ExitCode
|
||||
break
|
||||
} else {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *ContainerCommandProvider) SetCmdEnv(env []string) {
|
||||
c.Env = append(os.Environ(), env...)
|
||||
}
|
||||
|
||||
func (c *ContainerCommandProvider) SetStdin(r io.Reader) {
|
||||
c.Stdin = r
|
||||
}
|
||||
|
||||
func (c *ContainerCommandProvider) StdinPipe() (io.WriteCloser, error) {
|
||||
return c.response.Conn, nil
|
||||
}
|
||||
|
||||
func (c *ContainerCommandProvider) StdoutPipe() (io.ReadCloser, error) {
|
||||
return c.pipes.StdoutPipe(), nil
|
||||
}
|
||||
|
||||
func (c *ContainerCommandProvider) StderrPipe() (io.ReadCloser, error) {
|
||||
return c.pipes.StderrPipe(), nil
|
||||
}
|
||||
|
||||
func (c *ContainerCommandProvider) Close() (err error) {
|
||||
err = c.response.CloseWrite()
|
||||
c.response.Close()
|
||||
return
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"io"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
var (
|
||||
)
|
||||
|
||||
type ExecCommandProvider struct {
|
||||
*exec.Cmd
|
||||
}
|
||||
|
||||
// Consturct a new exec
|
||||
func NewExecProvider(c *Command, value any) *ExecCommandProvider {
|
||||
if args, err := c.Template(value); err == nil {
|
||||
slog.Info("command.NewExecProvider", "command", c.Path, "args", args, "target", c.TargetRef)
|
||||
return &ExecCommandProvider{exec.Command(c.Path, args...)}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *ExecCommandProvider) SetCmdEnv(env []string) {
|
||||
e.Cmd.Env = append(os.Environ(), env...)
|
||||
}
|
||||
|
||||
func (e *ExecCommandProvider) SetStdin(r io.Reader) {
|
||||
e.Cmd.Stdin = r
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
var (
|
||||
)
|
||||
|
||||
type SSHCommandProvider struct {
|
||||
Env []string
|
||||
Stdin io.Reader
|
||||
}
|
||||
|
||||
// Consturct a new ssh exec
|
||||
func NewSSHProvider(c *Command, value any) (p *SSHCommandProvider) {
|
||||
if args, err := c.Template(value); err == nil {
|
||||
slog.Info("command.NewSSHProvider", "command", c.Path, "args", args, "target", c.TargetRef)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SSHCommandProvider) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SSHCommandProvider) Wait() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SSHCommandProvider) StdinPipe() (w io.WriteCloser, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *SSHCommandProvider) StdoutPipe() (r io.ReadCloser, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *SSHCommandProvider) StderrPipe() (r io.ReadCloser, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *SSHCommandProvider) SetCmdEnv(env []string) {
|
||||
s.Env = append(os.Environ(), env...)
|
||||
}
|
||||
|
||||
func (s *SSHCommandProvider) SetStdin(r io.Reader) {
|
||||
s.Stdin = r
|
||||
}
|
||||
|
@ -1,39 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package containerlog
|
||||
|
||||
import (
|
||||
"io"
|
||||
"encoding/binary"
|
||||
)
|
||||
|
||||
func Header(r io.Reader) (s StreamType, size uint64, err error) {
|
||||
var header []byte = make([]byte, 8)
|
||||
if _, err = io.ReadFull(r, header); err == nil {
|
||||
s = StreamType(header[0])
|
||||
if err = s.Validate(); err == nil {
|
||||
header[0] = 0x0
|
||||
size = binary.BigEndian.Uint64(header)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func ReadMessage(r io.Reader, size uint64) (message string, err error) {
|
||||
var messageData []byte = make([]byte, size)
|
||||
var bytesRead int
|
||||
if bytesRead, err = r.Read(messageData); err == nil && uint64(bytesRead) == size {
|
||||
message = string(messageData)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func Read(r io.Reader) (s StreamType, message string, err error) {
|
||||
var messageSize uint64
|
||||
if s, messageSize, err = Header(r); err == nil {
|
||||
if message, err = ReadMessage(r, messageSize); err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package containerlog
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
"bytes"
|
||||
)
|
||||
|
||||
func TestLogHeader(t *testing.T) {
|
||||
for _, v := range []struct{ expected error; value []byte } {
|
||||
{ expected: nil, value: StreamStdout.Log("test message") },
|
||||
{ expected: nil, value: StreamStderr.Log("test error") },
|
||||
{ expected: ErrInvalidStreamType, value: StreamType(0x3).Log("fail") },
|
||||
{ expected: ErrInvalidStreamType, value: StreamType(0x4).Log("fail") },
|
||||
} {
|
||||
var buf bytes.Buffer
|
||||
_, e := buf.Write(v.value)
|
||||
assert.Nil(t, e)
|
||||
logType, logSize, err := Header(&buf)
|
||||
assert.ErrorIs(t, err, v.expected)
|
||||
assert.ErrorIs(t, logType.Validate(), v.expected)
|
||||
if err == nil {
|
||||
assert.Equal(t, uint64(len(v.value) - 8), logSize)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,74 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package containerlog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
)
|
||||
|
||||
type StreamReader struct {
|
||||
source io.Reader
|
||||
streams []*ReadBuffer
|
||||
}
|
||||
|
||||
type ReadBuffer struct {
|
||||
streamtype StreamType
|
||||
reader *StreamReader
|
||||
bytes.Buffer
|
||||
}
|
||||
|
||||
func NewStreamReader(source io.Reader) (s *StreamReader) {
|
||||
s = &StreamReader{
|
||||
source: source,
|
||||
streams: make([]*ReadBuffer, 3),
|
||||
}
|
||||
s.streams[StreamStdout] = &ReadBuffer{
|
||||
streamtype: StreamStdout,
|
||||
reader: s,
|
||||
}
|
||||
s.streams[StreamStderr] = &ReadBuffer{
|
||||
streamtype: StreamStderr,
|
||||
reader: s,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *StreamReader) StdoutPipe() io.ReadCloser {
|
||||
return s.streams[StreamStdout]
|
||||
}
|
||||
|
||||
func (s *StreamReader) StderrPipe() io.ReadCloser {
|
||||
return s.streams[StreamStderr]
|
||||
}
|
||||
|
||||
func (b *ReadBuffer) Read(p []byte) (n int, e error) {
|
||||
for {
|
||||
if b.reader.streams[b.streamtype].Len() >= len(p) {
|
||||
break
|
||||
}
|
||||
|
||||
streamtype, message, err := Read(b.reader.source)
|
||||
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if b.reader.streams[streamtype] == nil {
|
||||
b.reader.streams[streamtype] = &ReadBuffer{
|
||||
streamtype: streamtype,
|
||||
reader: b.reader,
|
||||
}
|
||||
}
|
||||
|
||||
if bytesRead, bufferErr := b.reader.streams[streamtype].WriteString(message); bytesRead != len(message) || bufferErr != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return b.Buffer.Read(p)
|
||||
}
|
||||
|
||||
func (b *ReadBuffer) Close() error {
|
||||
return nil
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package containerlog
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
"bytes"
|
||||
"io"
|
||||
)
|
||||
|
||||
func TestStreamReader(t *testing.T) {
|
||||
|
||||
var logs bytes.Buffer
|
||||
logs.Write(StreamStdout.Log("stdout log message"))
|
||||
logs.Write(StreamStderr.Log("stderr log message"))
|
||||
logs.Write(StreamStdout.Log("stdout log message - line 2"))
|
||||
logs.Write(StreamStderr.Log("stderr log message - line 2"))
|
||||
logs.Write(StreamStderr.Log("stderr log message - line 3"))
|
||||
|
||||
sr := NewStreamReader(&logs)
|
||||
outpipe := sr.StdoutPipe()
|
||||
errpipe := sr.StderrPipe()
|
||||
|
||||
var message []byte = make([]byte, 20)
|
||||
n, ee := errpipe.Read(message)
|
||||
assert.Nil(t, ee)
|
||||
assert.Equal(t, 20, n)
|
||||
|
||||
assert.Equal(t, "stderr log messagest", string(message))
|
||||
|
||||
ov, oe := io.ReadAll(outpipe)
|
||||
assert.Nil(t, oe)
|
||||
assert.Equal(t, "stdout log messagestdout log message - line 2", string(ov))
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package containerlog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"errors"
|
||||
"encoding/binary"
|
||||
)
|
||||
|
||||
type StreamType byte
|
||||
|
||||
const (
|
||||
StreamStdin StreamType = 0x0
|
||||
StreamStdout StreamType = 0x1
|
||||
StreamStderr StreamType = 0x2
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidStreamType error = errors.New("Invalid container log stream type")
|
||||
)
|
||||
|
||||
func (s StreamType) Validate() error {
|
||||
switch s {
|
||||
case StreamStdin, StreamStdout, StreamStderr:
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("%w: %d", ErrInvalidStreamType, s)
|
||||
}
|
||||
|
||||
func (s StreamType) Log(msg string) (log []byte) {
|
||||
msgLen := len(msg)
|
||||
log = make([]byte, 8 + msgLen)
|
||||
binary.BigEndian.PutUint64(log, uint64(msgLen))
|
||||
log[0] = byte(s)
|
||||
copy(log[8:], []byte(msg))
|
||||
return
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||||
|
||||
package containerlog
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStreamType(t *testing.T) {
|
||||
for _, v := range []struct{ expected error; value StreamType } {
|
||||
{ expected: nil, value: 0x0 },
|
||||
{ expected: nil, value: 0x1 },
|
||||
{ expected: nil, value: 0x2 },
|
||||
{ expected: ErrInvalidStreamType, value: 0x3 },
|
||||
{ expected: ErrInvalidStreamType, value: 0x4 },
|
||||
} {
|
||||
assert.ErrorIs(t, v.value.Validate(), v.expected)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user