Compare commits

...

2 Commits

Author SHA1 Message Date
94f3998fcd add support for remote command execution
Some checks failed
Declarative Tests / test (push) Waiting to run
Lint / golangci-lint (push) Has been cancelled
2024-11-10 10:16:44 -08:00
1c6b113e15 add containerlog stream handling 2024-11-10 10:15:27 -08:00
15 changed files with 733 additions and 51 deletions

View File

@ -3,15 +3,12 @@
package command
import (
_ "context"
"encoding/json"
_ "context"
"fmt"
"errors"
"gopkg.in/yaml.v3"
"io"
"log/slog"
_ "net/url"
"os"
_ "net/url"
"os/exec"
"strings"
"text/template"
@ -19,31 +16,43 @@ import (
"syscall"
)
// A resource that implements the ExecProvider interface can be used as an exec target.
type CommandProvider interface {
Start() error
Wait() error
SetCmdEnv([]string)
SetStdin(io.Reader)
StdinPipe() (io.WriteCloser, error)
StdoutPipe() (io.ReadCloser, error)
StderrPipe() (io.ReadCloser, error)
}
var ErrUnknownCommand error = errors.New("Unable to find command in path")
type CommandExecutor func(value any) ([]byte, error)
type CommandExtractAttributes func(output []byte, target any) error
type CommandExists func() error
type CommandArg string
type CommandInput string
type Command struct {
Path string `json:"path" yaml:"path"`
Args []CommandArg `json:"args" yaml:"args"`
Env []string `json:"env" yaml:"env"`
Split bool `json:"split" yaml:"split"`
FailOnError bool `json:"failonerror" yaml:"failonerror"`
Path string `json:"path" yaml:"path"`
Args []CommandArg `json:"args" yaml:"args"`
Env []string `json:"env" yaml:"env"`
Split bool `json:"split" yaml:"split"`
FailOnError bool `json:"failonerror" yaml:"failonerror"`
StdinAvailable bool `json:"stdinavailable,omitempty" yaml:"stdinavailable,omitempty"`
ExitCode int `json:"exitcode,omitempty" yaml:"exitcode,omitempty"`
Stdout string `json:"stdout,omitempty" yaml:"stdout,omitempty"`
Stderr string `json:"stderr,omitempty" yaml:"stderr,omitempty"`
Executor CommandExecutor `json:"-" yaml:"-"`
Extractor CommandExtractAttributes `json:"-" yaml:"-"`
ExitCode int `json:"exitcode,omitempty" yaml:"exitcode,omitempty"`
Stdout string `json:"stdout,omitempty" yaml:"stdout,omitempty"`
Stderr string `json:"stderr,omitempty" yaml:"stderr,omitempty"`
Executor CommandExecutor `json:"-" yaml:"-"`
Extractor CommandExtractAttributes `json:"-" yaml:"-"`
CommandExists CommandExists `json:"-" yaml:"-"`
Input CommandInput `json:"-" yaml:"-"`
stdin io.Reader `json:"-" yaml:"-"`
Input CommandInput `json:"-" yaml:"-"`
stdin io.Reader `json:"-" yaml:"-"`
TargetRef CommandTargetRef `json:"targetref,omitempty" yaml:"targetref,omitempty"`
execHandle CommandProvider `json:"-" yaml:"-"`
}
func NewCommand() *Command {
@ -70,21 +79,20 @@ func (c *Command) Defaults() {
}
c.Executor = func(value any) ([]byte, error) {
c.ClearOutput()
args, err := c.Template(value)
if err != nil {
return nil, err
}
c.execHandle = c.TargetRef.Provider(c, value)
if inputErr := c.SetInput(value); inputErr != nil {
return nil, inputErr
}
cmd := exec.Command(c.Path, args...)
c.SetCmdEnv(cmd)
c.SetCmdEnv()
cmd := c.execHandle
if c.stdin != nil {
cmd.Stdin = c.stdin
cmd.SetStdin(c.stdin)
}
slog.Info("execute() - cmd", "path", c.Path, "args", args)
output, stdoutPipeErr := cmd.StdoutPipe()
if stdoutPipeErr != nil {
return nil, stdoutPipeErr
@ -113,11 +121,13 @@ func (c *Command) Defaults() {
c.Stderr = string(stdErrOutput)
c.ExitCode = c.GetExitCodeFromError(waitErr)
/*
if len(stdOutOutput) > 100 {
slog.Info("execute()", "path", c.Path, "args", args, "output", string(stdOutOutput[:100]), "error", string(stdErrOutput))
} else {
slog.Info("execute()", "path", c.Path, "args", args, "output", string(stdOutOutput), "error", string(stdErrOutput))
}
*/
if len(stdErrOutput) > 0 && c.FailOnError {
return stdOutOutput, fmt.Errorf("%w %s", waitErr, string(stdErrOutput))
@ -134,8 +144,8 @@ func (c *Command) LoadDecl(yamlResourceDeclaration string) error {
return codec.NewYAMLStringDecoder(yamlResourceDeclaration).Decode(c)
}
func (c *Command) SetCmdEnv(cmd *exec.Cmd) {
cmd.Env = append(os.Environ(), c.Env...)
func (c *Command) SetCmdEnv() {
c.execHandle.SetCmdEnv(c.Env)
}
func (c *Command) SetStdinReader(r io.Reader) {
@ -188,8 +198,10 @@ func (c *Command) Execute(value any) ([]byte, error) {
func (c *Command) SetInput(value any) error {
if len(c.Input) > 0 {
if r, err := c.Input.Template(value); err != nil {
slog.Info("Command.SetInput", "input", r.String(), "error", err)
return err
} else {
slog.Info("Command.SetInput", "input", r.String())
c.SetStdinReader(strings.NewReader(r.String()))
}
}
@ -200,24 +212,3 @@ func (c *CommandInput) Template(value any) (result strings.Builder, err error) {
err = template.Must(template.New("commandInput").Parse(string(*c))).Execute(&result, value)
return
}
func (c *CommandArg) UnmarshalValue(value string) error {
*c = CommandArg(value)
return nil
}
func (c *CommandArg) UnmarshalJSON(data []byte) error {
var s string
if unmarshalRouteTypeErr := json.Unmarshal(data, &s); unmarshalRouteTypeErr != nil {
return unmarshalRouteTypeErr
}
return c.UnmarshalValue(s)
}
func (c *CommandArg) UnmarshalYAML(value *yaml.Node) error {
var s string
if err := value.Decode(&s); err != nil {
return err
}
return c.UnmarshalValue(s)
}

View File

@ -0,0 +1,31 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package command
import (
"encoding/json"
"gopkg.in/yaml.v3"
)
type CommandArg string
func (c *CommandArg) UnmarshalValue(value string) error {
*c = CommandArg(value)
return nil
}
func (c *CommandArg) UnmarshalJSON(data []byte) error {
var s string
if unmarshalRouteTypeErr := json.Unmarshal(data, &s); unmarshalRouteTypeErr != nil {
return unmarshalRouteTypeErr
}
return c.UnmarshalValue(s)
}
func (c *CommandArg) UnmarshalYAML(value *yaml.Node) error {
var s string
if err := value.Decode(&s); err != nil {
return err
}
return c.UnmarshalValue(s)
}

View File

@ -0,0 +1,30 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package command
import (
"path/filepath"
"decl/internal/identifier"
)
type CommandTargetRef identifier.ID
func (r CommandTargetRef) GetType() (CommandType) {
uri := identifier.ID(r).Parse()
if uri != nil {
var ct CommandType = CommandType(uri.Scheme)
if ct.Validate() == nil {
return ct
}
}
return ""
}
func (r CommandTargetRef) Provider(cmd *Command, value any) CommandProvider {
return r.GetType().Provider(cmd, value)
}
func (r CommandTargetRef) Name() string {
u := identifier.ID(r).Parse()
return filepath.Join(u.Hostname(), u.Path)
}

View File

@ -0,0 +1,44 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package command
import (
"errors"
"fmt"
)
var (
ErrInvalidCommandType error = errors.New("Invalid command type")
)
type CommandType string
const (
ExecCommand CommandType = "exec"
ContainerCommand CommandType = "container"
SSHCommand CommandType = "ssh"
)
func (t CommandType) Validate() error {
switch t {
case ExecCommand, ContainerCommand, SSHCommand:
return nil
}
return fmt.Errorf("%w: %s", ErrInvalidCommandType, t)
}
func (t CommandType) Provider(cmd *Command, value any) CommandProvider {
switch t {
case ContainerCommand:
return NewContainerProvider(cmd, value)
case SSHCommand:
return NewSSHProvider(cmd, value)
default:
fallthrough
case ExecCommand:
return NewExecProvider(cmd, value)
}
return nil
}

View File

@ -0,0 +1,18 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package command
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestCommandType(t *testing.T) {
var cmdType CommandType = "container"
assert.Nil(t, cmdType.Validate())
}

View File

@ -0,0 +1,104 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package command
import (
_ "fmt"
"github.com/stretchr/testify/assert"
_ "os"
_ "strings"
"testing"
"bytes"
)
/*
func TestNewContainerProvider(t *testing.T) {
c := NewContainerProvider(nil, nil)
assert.NotNil(t, c)
}
*/
func TestContainerCommandLoad(t *testing.T) {
c := NewCommand()
assert.NotNil(t, c)
decl := `
path: find
args:
- "{{ .Path }}"
`
assert.Nil(t, c.LoadDecl(decl))
assert.Equal(t, "find", c.Path)
}
func TestContainerCommandTemplate(t *testing.T) {
c := NewCommand()
assert.NotNil(t, c)
decl := `
path: find
args:
- "{{ .Path }}"
`
assert.Nil(t, c.LoadDecl(decl))
assert.Equal(t, "find", c.Path)
assert.Equal(t, 1, len(c.Args))
f := struct { Path string } {
Path: "./",
}
args, templateErr := c.Template(f)
assert.Nil(t, templateErr)
assert.Equal(t, 1, len(args))
assert.Equal(t, "./", string(args[0]))
out, err := c.Execute(f)
assert.Nil(t, err)
assert.Greater(t, len(out), 0)
}
func TestContainerCommandStdin(t *testing.T) {
var expected string = "stdin test data"
var stdinBuffer bytes.Buffer
stdinBuffer.WriteString(expected)
c := NewCommand()
assert.NotNil(t, c)
decl := `
path: cat
stdinavailable: true
`
assert.Nil(t, c.LoadDecl(decl))
assert.Equal(t, "cat", c.Path)
c.SetStdinReader(&stdinBuffer)
out, err := c.Execute(nil)
assert.Nil(t, err)
assert.Equal(t, expected, string(out))
}
func TestContainerCommandExitCode(t *testing.T) {
c := NewCommand()
assert.NotNil(t, c)
decl := `
path: ls
args:
- "amissingfile"
`
assert.Nil(t, c.LoadDecl(decl))
assert.Equal(t, "ls", c.Path)
out, err := c.Execute(nil)
assert.NotNil(t, err)
assert.Greater(t, c.ExitCode, 0)
assert.Equal(t, string(out), c.Stdout)
assert.Equal(t, string("ls: amissingfile: No such file or directory\n"), c.Stderr)
}

View File

@ -0,0 +1,142 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package command
import (
"context"
"fmt"
"io"
"os"
"decl/internal/containerlog"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/strslice"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"time"
"log/slog"
)
type ContainerExecClient interface {
ContainerExecAttach(ctx context.Context, execID string, config container.ExecAttachOptions) (types.HijackedResponse, error)
ContainerExecCreate(ctx context.Context, containerID string, options container.ExecOptions) (types.IDResponse, error)
ContainerExecInspect(ctx context.Context, execID string) (container.ExecInspect, error)
ContainerExecStart(ctx context.Context, execID string, config container.ExecStartOptions) (error)
ContainerList(context.Context, container.ListOptions) ([]types.Container, error)
Close() error
}
type ContainerCommandProvider struct {
containerID string
execID string
ExitCode int
container.ExecOptions
response types.HijackedResponse
pipes *containerlog.StreamReader
Stdin io.Reader
apiClient ContainerExecClient `json:"-" yaml:"-"`
}
// ref could be a resource, but it just needs to implement the ExecResource interface
func NewContainerProvider(cmd *Command, value any) (p *ContainerCommandProvider) {
if args, err := cmd.Template(value); err == nil {
p = &ContainerCommandProvider {
ExecOptions: container.ExecOptions {
Cmd: strslice.StrSlice(append([]string{cmd.Path}, args...)),
AttachStdin: true,
AttachStdout: true,
AttachStderr: true,
},
}
p.containerID = p.ResolveId(context.Background(), cmd.TargetRef)
slog.Info("command.NewContainerProvider", "command", cmd.Path, "args", args, "target", cmd.TargetRef, "container", p.containerID)
}
return
}
func (c *ContainerCommandProvider) ResolveId(ctx context.Context, ref CommandTargetRef) (containerID string) {
name := ref.Name()
filterArgs := filters.NewArgs()
filterArgs.Add("name", "/"+name)
containers, listErr := c.apiClient.ContainerList(ctx, container.ListOptions{
All: true,
Filters: filterArgs,
})
if listErr != nil {
panic(listErr)
}
for _, container := range containers {
for _, containerName := range container.Names {
if containerName == "/"+name {
containerID = container.ID
}
}
}
return
}
func (c *ContainerCommandProvider) Start() (err error) {
var execIDResponse types.IDResponse
ctx := context.Background()
if execIDResponse, err = c.apiClient.ContainerExecCreate(ctx, c.containerID, c.ExecOptions); err == nil {
c.execID = execIDResponse.ID
if c.execID == "" {
return fmt.Errorf("Failed creating a container exec ID")
}
execStartCheck := types.ExecStartCheck{
Tty: false,
}
if c.response, err = c.apiClient.ContainerExecAttach(ctx, c.execID, execStartCheck); err == nil {
c.pipes = containerlog.NewStreamReader(c.response.Conn)
}
}
return
}
func (c *ContainerCommandProvider) Wait() (err error) {
var containerDetails container.ExecInspect
ctx := context.Background()
for {
// copy Stdin to the connection
if c.Stdin != nil {
io.Copy(c.response.Conn, c.Stdin)
}
if containerDetails, err = c.apiClient.ContainerExecInspect(ctx, c.execID); err != nil || ! containerDetails.Running {
c.ExitCode = containerDetails.ExitCode
break
} else {
time.Sleep(500 * time.Millisecond)
}
}
return
}
func (c *ContainerCommandProvider) SetCmdEnv(env []string) {
c.Env = append(os.Environ(), env...)
}
func (c *ContainerCommandProvider) SetStdin(r io.Reader) {
c.Stdin = r
}
func (c *ContainerCommandProvider) StdinPipe() (io.WriteCloser, error) {
return c.response.Conn, nil
}
func (c *ContainerCommandProvider) StdoutPipe() (io.ReadCloser, error) {
return c.pipes.StdoutPipe(), nil
}
func (c *ContainerCommandProvider) StderrPipe() (io.ReadCloser, error) {
return c.pipes.StderrPipe(), nil
}
func (c *ContainerCommandProvider) Close() (err error) {
err = c.response.CloseWrite()
c.response.Close()
return
}

View File

@ -0,0 +1,34 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package command
import (
"os"
"os/exec"
"io"
"log/slog"
)
var (
)
type ExecCommandProvider struct {
*exec.Cmd
}
// Consturct a new exec
func NewExecProvider(c *Command, value any) *ExecCommandProvider {
if args, err := c.Template(value); err == nil {
slog.Info("command.NewExecProvider", "command", c.Path, "args", args, "target", c.TargetRef)
return &ExecCommandProvider{exec.Command(c.Path, args...)}
}
return nil
}
func (e *ExecCommandProvider) SetCmdEnv(env []string) {
e.Cmd.Env = append(os.Environ(), env...)
}
func (e *ExecCommandProvider) SetStdin(r io.Reader) {
e.Cmd.Stdin = r
}

View File

@ -0,0 +1,54 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package command
import (
"io"
"os"
"log/slog"
)
var (
)
type SSHCommandProvider struct {
Env []string
Stdin io.Reader
}
// Consturct a new ssh exec
func NewSSHProvider(c *Command, value any) (p *SSHCommandProvider) {
if args, err := c.Template(value); err == nil {
slog.Info("command.NewSSHProvider", "command", c.Path, "args", args, "target", c.TargetRef)
}
return nil
}
func (s *SSHCommandProvider) Start() error {
return nil
}
func (s *SSHCommandProvider) Wait() error {
return nil
}
func (s *SSHCommandProvider) StdinPipe() (w io.WriteCloser, err error) {
return
}
func (s *SSHCommandProvider) StdoutPipe() (r io.ReadCloser, err error) {
return
}
func (s *SSHCommandProvider) StderrPipe() (r io.ReadCloser, err error) {
return
}
func (s *SSHCommandProvider) SetCmdEnv(env []string) {
s.Env = append(os.Environ(), env...)
}
func (s *SSHCommandProvider) SetStdin(r io.Reader) {
s.Stdin = r
}

View File

@ -0,0 +1,39 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"io"
"encoding/binary"
)
func Header(r io.Reader) (s StreamType, size uint64, err error) {
var header []byte = make([]byte, 8)
if _, err = io.ReadFull(r, header); err == nil {
s = StreamType(header[0])
if err = s.Validate(); err == nil {
header[0] = 0x0
size = binary.BigEndian.Uint64(header)
}
}
return
}
func ReadMessage(r io.Reader, size uint64) (message string, err error) {
var messageData []byte = make([]byte, size)
var bytesRead int
if bytesRead, err = r.Read(messageData); err == nil && uint64(bytesRead) == size {
message = string(messageData)
}
return
}
func Read(r io.Reader) (s StreamType, message string, err error) {
var messageSize uint64
if s, messageSize, err = Header(r); err == nil {
if message, err = ReadMessage(r, messageSize); err == nil {
return
}
}
return
}

View File

@ -0,0 +1,28 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"github.com/stretchr/testify/assert"
"testing"
"bytes"
)
func TestLogHeader(t *testing.T) {
for _, v := range []struct{ expected error; value []byte } {
{ expected: nil, value: StreamStdout.Log("test message") },
{ expected: nil, value: StreamStderr.Log("test error") },
{ expected: ErrInvalidStreamType, value: StreamType(0x3).Log("fail") },
{ expected: ErrInvalidStreamType, value: StreamType(0x4).Log("fail") },
} {
var buf bytes.Buffer
_, e := buf.Write(v.value)
assert.Nil(t, e)
logType, logSize, err := Header(&buf)
assert.ErrorIs(t, err, v.expected)
assert.ErrorIs(t, logType.Validate(), v.expected)
if err == nil {
assert.Equal(t, uint64(len(v.value) - 8), logSize)
}
}
}

View File

@ -0,0 +1,74 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"bytes"
"io"
)
type StreamReader struct {
source io.Reader
streams []*ReadBuffer
}
type ReadBuffer struct {
streamtype StreamType
reader *StreamReader
bytes.Buffer
}
func NewStreamReader(source io.Reader) (s *StreamReader) {
s = &StreamReader{
source: source,
streams: make([]*ReadBuffer, 3),
}
s.streams[StreamStdout] = &ReadBuffer{
streamtype: StreamStdout,
reader: s,
}
s.streams[StreamStderr] = &ReadBuffer{
streamtype: StreamStderr,
reader: s,
}
return
}
func (s *StreamReader) StdoutPipe() io.ReadCloser {
return s.streams[StreamStdout]
}
func (s *StreamReader) StderrPipe() io.ReadCloser {
return s.streams[StreamStderr]
}
func (b *ReadBuffer) Read(p []byte) (n int, e error) {
for {
if b.reader.streams[b.streamtype].Len() >= len(p) {
break
}
streamtype, message, err := Read(b.reader.source)
if err != nil {
break
}
if b.reader.streams[streamtype] == nil {
b.reader.streams[streamtype] = &ReadBuffer{
streamtype: streamtype,
reader: b.reader,
}
}
if bytesRead, bufferErr := b.reader.streams[streamtype].WriteString(message); bytesRead != len(message) || bufferErr != nil {
break
}
}
return b.Buffer.Read(p)
}
func (b *ReadBuffer) Close() error {
return nil
}

View File

@ -0,0 +1,35 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"github.com/stretchr/testify/assert"
"testing"
"bytes"
"io"
)
func TestStreamReader(t *testing.T) {
var logs bytes.Buffer
logs.Write(StreamStdout.Log("stdout log message"))
logs.Write(StreamStderr.Log("stderr log message"))
logs.Write(StreamStdout.Log("stdout log message - line 2"))
logs.Write(StreamStderr.Log("stderr log message - line 2"))
logs.Write(StreamStderr.Log("stderr log message - line 3"))
sr := NewStreamReader(&logs)
outpipe := sr.StdoutPipe()
errpipe := sr.StderrPipe()
var message []byte = make([]byte, 20)
n, ee := errpipe.Read(message)
assert.Nil(t, ee)
assert.Equal(t, 20, n)
assert.Equal(t, "stderr log messagest", string(message))
ov, oe := io.ReadAll(outpipe)
assert.Nil(t, oe)
assert.Equal(t, "stdout log messagestdout log message - line 2", string(ov))
}

View File

@ -0,0 +1,38 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"fmt"
"errors"
"encoding/binary"
)
type StreamType byte
const (
StreamStdin StreamType = 0x0
StreamStdout StreamType = 0x1
StreamStderr StreamType = 0x2
)
var (
ErrInvalidStreamType error = errors.New("Invalid container log stream type")
)
func (s StreamType) Validate() error {
switch s {
case StreamStdin, StreamStdout, StreamStderr:
return nil
}
return fmt.Errorf("%w: %d", ErrInvalidStreamType, s)
}
func (s StreamType) Log(msg string) (log []byte) {
msgLen := len(msg)
log = make([]byte, 8 + msgLen)
binary.BigEndian.PutUint64(log, uint64(msgLen))
log[0] = byte(s)
copy(log[8:], []byte(msg))
return
}

View File

@ -0,0 +1,20 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package containerlog
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestStreamType(t *testing.T) {
for _, v := range []struct{ expected error; value StreamType } {
{ expected: nil, value: 0x0 },
{ expected: nil, value: 0x1 },
{ expected: nil, value: 0x2 },
{ expected: ErrInvalidStreamType, value: 0x3 },
{ expected: ErrInvalidStreamType, value: 0x4 },
} {
assert.ErrorIs(t, v.value.Validate(), v.expected)
}
}