jx/internal/resource/container.go
Matthew Rich 07808c62fd
Some checks failed
Declarative Tests / test (push) Waiting to run
Lint / golangci-lint (push) Has been cancelled
add support for remote command execution
2024-11-10 10:24:06 -08:00

586 lines
17 KiB
Go

// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
// Container resource
package resource
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/strslice"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"gopkg.in/yaml.v3"
_ "gopkg.in/yaml.v3"
"log/slog"
"net/url"
_ "os"
_ "os/exec"
"path/filepath"
"strings"
"encoding/json"
"io"
"gitea.rosskeen.house/rosskeen.house/machine"
"decl/internal/codec"
"decl/internal/data"
"decl/internal/folio"
"decl/internal/containerlog"
"bytes"
_ "encoding/base64"
)
const (
ContainerTypeName TypeName = "container"
)
type ContainerClient interface {
ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *ocispec.Platform, containerName string) (container.CreateResponse, error)
ContainerStart(ctx context.Context, containerID string, options container.StartOptions) error
ContainerList(context.Context, container.ListOptions) ([]types.Container, error)
ContainerInspect(context.Context, string) (types.ContainerJSON, error)
ContainerRemove(context.Context, string, container.RemoveOptions) error
ContainerStop(context.Context, string, container.StopOptions) error
ContainerWait(ctx context.Context, containerID string, condition container.WaitCondition) (<-chan container.WaitResponse, <-chan error)
ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error)
Close() error
}
type Container struct {
*Common `yaml:",inline" json:",inline"`
stater machine.Stater `yaml:"-" json:"-"`
Id string `json:"ID,omitempty" yaml:"ID,omitempty"`
Name string `json:"name" yaml:"name"`
Cmd []string `json:"cmd,omitempty" yaml:"cmd,omitempty"`
WorkingDir string `json:"workingdir,omitempty" yaml:"workingdir,omitempty"`
Entrypoint strslice.StrSlice `json:"entrypoint,omitempty" yaml:"entrypoint,omitempty"`
Args []string `json:"args,omitempty" yaml:"args,omitempty"`
Ports []string `json:"ports,omitempty" yaml:"ports,omitempty"`
Environment map[string]string `json:"environment" yaml:"environment"`
Image string `json:"image" yaml:"image"`
ResolvConfPath string `json:"resolvconfpath" yaml:"resolvconfpath"`
HostnamePath string `json:"hostnamepath" yaml:"hostnamepath"`
HostsPath string `json:"hostpath" yaml:"hostspath"`
LogPath string `json:"logpath" yaml:"logpath"`
Created string `json:"created" yaml:"created"`
ContainerState types.ContainerState `json:"containerstate" yaml:"containerstate"`
RestartCount int `json:"restartcount" yaml:"restartcount"`
Driver string `json:"driver" yaml:"driver"`
Platform string `json:"platform" yaml:"platform"`
MountLabel string `json:"mountlabel" yaml:"mountlabel"`
ProcessLabel string `json:"processlabel" yaml:"processlabel"`
AppArmorProfile string `json:"apparmorprofile" yaml:"apparmorprofile"`
ExecIDs []string `json:"execids" yaml:"execids"`
HostConfig container.HostConfig `json:"hostconfig" yaml:"hostconfig"`
GraphDriver types.GraphDriverData `json:"graphdriver" yaml:"graphdriver"`
SizeRw *int64 `json:",omitempty" yaml:",omitempty"`
SizeRootFs *int64 `json:",omitempty" yaml:",omitempty"`
Networks []string `json:"networks,omitempty" yaml:"networks,omitempty"`
/*
Mounts []MountPoint
Config *container.Config
NetworkSettings *NetworkSettings
*/
Wait bool `json:"wait,omitempty" yaml:"wait,omitempty"`
Stdout string `json:"stdout,omitempty" yaml:"stdout,omitempty"`
Stderr string `json:"stderr,omitempty" yaml:"stderr,omitempty"`
apiClient ContainerClient
Resources data.ResourceMapper `json:"-" yaml:"-"`
}
func init() {
ResourceTypes.Register([]string{"container"}, func(u *url.URL) (c data.Resource) {
c = NewContainer(nil)
if u != nil {
if err := folio.CastParsedURI(u).ConstructResource(c); err != nil {
panic(err)
}
}
return
})
}
func NewContainer(containerClientApi ContainerClient) *Container {
var apiClient ContainerClient = containerClientApi
if apiClient == nil {
var err error
apiClient, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
}
}
return &Container{
Common: NewCommon(ContainerTypeName, false),
apiClient: apiClient,
}
}
func (c *Container) Init(u data.URIParser) error {
if u == nil {
u = folio.URI(c.URI()).Parse()
}
uri := u.URL()
c.Name = filepath.Join(uri.Hostname(), uri.Path)
return c.SetParsedURI(u)
}
func (c *Container) SetParsedURI(u data.URIParser) (err error) {
if err = c.Common.SetParsedURI(u); err == nil {
c.Name = filepath.Join(c.Common.parsedURI.Hostname(), c.Common.parsedURI.Path)
}
return
}
func (c *Container) SetResourceMapper(resources data.ResourceMapper) {
c.Resources = resources
}
func (c *Container) Clone() data.Resource {
return &Container {
Id: c.Id,
Name: c.Name,
Common: c.Common.Clone(),
Cmd: c.Cmd,
Entrypoint: c.Entrypoint,
Args: c.Args,
Environment: c.Environment,
Image: c.Image,
ResolvConfPath: c.ResolvConfPath,
HostnamePath: c.HostnamePath,
HostsPath: c.HostsPath,
LogPath: c.LogPath,
Created: c.Created,
ContainerState: c.ContainerState,
RestartCount: c.RestartCount,
Driver: c.Driver,
Platform: c.Platform,
MountLabel: c.MountLabel,
ProcessLabel: c.ProcessLabel,
AppArmorProfile: c.AppArmorProfile,
ExecIDs: c.ExecIDs,
HostConfig: c.HostConfig,
GraphDriver: c.GraphDriver,
SizeRw: c.SizeRw,
SizeRootFs: c.SizeRootFs,
Networks: c.Networks,
apiClient: c.apiClient,
}
}
func (c *Container) StateMachine() machine.Stater {
if c.stater == nil {
c.stater = ProcessMachine(c)
}
return c.stater
}
func (c *Container) JSON() ([]byte, error) {
return json.Marshal(c)
}
func (c *Container) Validate() error {
return fmt.Errorf("failed")
}
func (c *Container) Notify(m *machine.EventMessage) {
slog.Info("Container.Notify()", "destination_event", m.Dest, "uri", c.URI())
ctx := context.Background()
switch m.On {
case machine.ENTERSTATEEVENT:
switch m.Dest {
case "start_stat":
if statErr := c.ReadStat(); statErr == nil {
slog.Info("Container.Notify() - ReadStat", "event", "start_stat", "error", statErr)
if triggerErr := c.StateMachine().Trigger("exists"); triggerErr == nil {
slog.Info("Container.Notify()", "event", "start_stat", "trigger", "exists")
return
}
} else {
slog.Info("Container.Notify() - ReadStat", "event", "start_stat", "error", statErr)
if triggerErr := c.StateMachine().Trigger("notexists"); triggerErr == nil {
slog.Info("Container.Notify()", "event", "start_stat", "trigger", "notexists")
return
}
}
case "start_read":
if _,readErr := c.Read(ctx); readErr == nil {
if triggerErr := c.stater.Trigger("state_read"); triggerErr == nil {
return
} else {
c.Common.State = "absent"
panic(triggerErr)
}
} else {
c.Common.State = "absent"
panic(readErr)
}
case "start_create":
if createErr := c.Create(ctx); createErr == nil {
if triggerErr := c.StateMachine().Trigger("created"); triggerErr == nil {
return
} else {
c.Common.State = "absent"
panic(triggerErr)
}
} else {
c.Common.State = "absent"
panic(createErr)
}
case "start_delete":
slog.Info("Container.Notify()", "event", "start_delete")
if deleteErr := c.Delete(ctx); deleteErr == nil {
if triggerErr := c.StateMachine().Trigger("deleted"); triggerErr == nil {
return
} else {
c.Common.State = "present"
panic(triggerErr)
}
} else {
c.Common.State = "present"
panic(deleteErr)
}
case "present", "created", "read":
c.Common.State = "present"
case "running":
c.Common.State = "running"
case "absent":
c.Common.State = "absent"
}
case machine.EXITSTATEEVENT:
}
}
func (c *Container) ReadStat() (err error) {
err = fmt.Errorf("%w: %s", ErrResourceStateAbsent, c.Name)
filterArgs := filters.NewArgs()
filterArgs.Add("name", "/"+c.Name)
if containers, listErr := c.apiClient.ContainerList(context.Background(), container.ListOptions{
All: true,
Filters: filterArgs,
}); listErr == nil {
for _, container := range containers {
for _, containerName := range container.Names {
if containerName == "/"+c.Name {
slog.Info("Container.ReadStat() exists", "container", c.Name)
return nil
}
}
}
} else {
err = fmt.Errorf("%w: %w", err, listErr)
}
return
}
func (c *Container) Apply() error {
ctx := context.Background()
switch c.Common.State {
case "absent":
return c.Delete(ctx)
case "present":
return c.Create(ctx)
}
return nil
}
func (c *Container) Load(docData []byte, f codec.Format) (err error) {
err = f.StringDecoder(string(docData)).Decode(c)
return
}
func (c *Container) LoadReader(r io.ReadCloser, f codec.Format) (err error) {
err = f.Decoder(r).Decode(c)
return
}
func (c *Container) LoadString(docData string, f codec.Format) (err error) {
err = f.StringDecoder(docData).Decode(c)
return
}
func (c *Container) LoadDecl(yamlResourceDeclaration string) error {
return c.LoadString(yamlResourceDeclaration, codec.FormatYaml)
}
func (c *Container) ReadFromContainer(ctx context.Context) (err error) {
var buf bytes.Buffer
var stdout, stderr []string
if stdoutReader, err := c.apiClient.ContainerLogs(ctx, c.Id, container.LogsOptions{ShowStdout: true, ShowStderr: true}); err != nil {
return err
} else {
defer stdoutReader.Close()
if _, copyErr := io.Copy(&buf, stdoutReader); copyErr != nil {
return copyErr
}
slog.Info("Container.ReadFromContainer() - ContainerLogs", "read", buf.String())
for {
if streamType, message, extractErr := containerlog.Read(&buf); extractErr == nil {
switch streamType {
case containerlog.StreamStdout:
stdout = append(stdout, message)
case containerlog.StreamStderr:
stderr = append(stderr, message)
}
} else {
if extractErr == io.EOF {
break
}
err = extractErr
}
/*
if streamType, size, extractErr := c.ExtractLogHeader(&buf); extractErr == nil {
slog.Info("Container.Create() - ContainerLogs", "streamtype", streamType, "size", size)
var logMessage string
if logMessage, err = c.ReadLogMessage(&buf, size); err == nil {
switch streamType {
case ContainerLogStreamStdout:
stdout = append(stdout, logMessage)
case ContainerLogStreamStderr:
stderr = append(stderr, logMessage)
}
}
} else {
if extractErr == io.EOF {
break
}
err = extractErr
}
*/
}
}
c.Stdout = strings.Join(stdout, "")
c.Stderr = strings.Join(stderr, "")
slog.Info("Container.ReadFromContainer()", "stdout", c.Stdout, "stderr", c.Stderr, "error", err)
return
}
func (c *Container) Create(ctx context.Context) error {
numberOfEnvironmentVariables := len(c.Environment)
portset := nat.PortSet {}
for _, port := range c.Ports {
portset[nat.Port(port)] = struct{}{}
}
config := &container.Config{
Image: c.Image,
Cmd: c.Cmd,
Entrypoint: c.Entrypoint,
Tty: false,
ExposedPorts: portset,
WorkingDir: c.WorkingDir,
AttachStdout: true,
AttachStderr: true,
}
config.Env = make([]string, numberOfEnvironmentVariables)
index := 0
for k, v := range c.Environment {
config.Env[index] = k + "=" + v
index++
}
for i := range c.HostConfig.Mounts {
if c.HostConfig.Mounts[i].Type == mount.TypeBind {
if mountSourceAbsolutePath, e := filepath.Abs(c.HostConfig.Mounts[i].Source); e == nil {
c.HostConfig.Mounts[i].Source = mountSourceAbsolutePath
}
}
}
networkConfig := &network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{},
}
settings := &network.EndpointSettings{}
for _, network := range c.Networks {
networkConfig.EndpointsConfig[network] = settings
}
resp, err := c.apiClient.ContainerCreate(ctx, config, &c.HostConfig, networkConfig, nil, c.Name)
if err != nil {
return err
}
c.Id = resp.ID
if startErr := c.apiClient.ContainerStart(ctx, c.Id, container.StartOptions{}); startErr != nil {
return startErr
}
if err = c.ReadFromContainer(ctx); err != nil {
return err
}
if c.Wait {
slog.Info("Container.Create() - waiting for container to stop", "id", c.Id, "name", c.Name)
statusCh, errCh := c.apiClient.ContainerWait(ctx, c.Id, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
panic(err)
}
case <-statusCh:
}
}
if len(c.Stdout) == 0 && len(c.Stderr) == 0 {
if err = c.ReadFromContainer(ctx); err != nil {
return err
}
}
return err
}
func (c *Container) Update(ctx context.Context) error {
return c.Create(ctx)
}
// produce yaml representation of any resource
func (c *Container) Read(ctx context.Context) ([]byte, error) {
var containerID string
filterArgs := filters.NewArgs()
filterArgs.Add("name", "/"+c.Name)
containers, err := c.apiClient.ContainerList(ctx, container.ListOptions{
All: true,
Filters: filterArgs,
})
if err != nil {
return nil, fmt.Errorf("%w: %s %s", err, c.Type(), c.Name)
}
for _, container := range containers {
for _, containerName := range container.Names {
if containerName == "/"+c.Name {
containerID = container.ID
}
}
}
if inspectErr := c.Inspect(ctx, containerID); inspectErr != nil {
return nil, fmt.Errorf("%w: container %s", inspectErr, containerID)
}
slog.Info("Read() ", "type", c.Type(), "name", c.Name, "Id", c.Id)
return yaml.Marshal(c)
}
func (c *Container) Inspect(ctx context.Context, containerID string) error {
containerJSON, err := c.apiClient.ContainerInspect(ctx, containerID)
if client.IsErrNotFound(err) {
c.Common.State = "absent"
} else {
c.Common.State = "present"
c.Id = containerJSON.ID
if c.Name == "" {
if containerJSON.Name[0] == '/' {
c.Name = containerJSON.Name[1:]
} else {
c.Name = containerJSON.Name
}
}
c.Common.Path = containerJSON.Path
c.Image = containerJSON.Image
c.Created = containerJSON.Created
c.ResolvConfPath = containerJSON.ResolvConfPath
c.HostnamePath = containerJSON.HostnamePath
c.HostsPath = containerJSON.HostsPath
c.LogPath = containerJSON.LogPath
c.RestartCount = containerJSON.RestartCount
c.Driver = containerJSON.Driver
if containerJSON.State != nil {
c.ContainerState = *containerJSON.State
if c.ContainerState.ExitCode != 0 {
return fmt.Errorf("%s", c.ContainerState.Error)
}
}
}
return nil
}
func (c *Container) Delete(ctx context.Context) error {
slog.Info("Container.Delete()", "id", c.Id, "name", c.Name)
if stopErr := c.apiClient.ContainerStop(ctx, c.Id, container.StopOptions{}); stopErr != nil {
slog.Error("Container.Delete() - failed to stop: ", "Id", c.Id, "error", stopErr)
return stopErr
}
err := c.apiClient.ContainerRemove(ctx, c.Id, container.RemoveOptions{
RemoveVolumes: true,
Force: false,
})
if err != nil {
slog.Error("Container.Delete() - failed to remove: ", "Id", c.Id, "error", err)
return err
}
statusCh, errCh := c.apiClient.ContainerWait(ctx, c.Id, container.WaitConditionNotRunning)
select {
case waitErr := <-errCh:
if waitErr != nil {
if strings.Contains(waitErr.Error(), "No such container:") {
return nil
}
return waitErr
}
case <-statusCh:
}
return err
}
func (c *Container) URI() string {
return fmt.Sprintf("%s://%s", c.Type(), c.Name)
}
func (c *Container) Type() string { return "container" }
func (c *Container) ResolveId(ctx context.Context) string {
var err error
if err = c.Common.SetParsedURI(folio.URI(c.URI()).Parse()); err != nil {
triggerErr := c.StateMachine().Trigger("notexists")
panic(fmt.Errorf("%w: %s %s, %w", err, c.Type(), c.Name, triggerErr))
}
filterArgs := filters.NewArgs()
filterArgs.Add("name", "/"+c.Name)
containers, listErr := c.apiClient.ContainerList(ctx, container.ListOptions{
All: true,
Filters: filterArgs,
})
if listErr != nil {
triggerErr := c.StateMachine().Trigger("notexists")
panic(fmt.Errorf("%w: %s %s, %w", listErr, c.Type(), c.Name, triggerErr))
}
slog.Info("Container.ResolveId()", "containers", containers)
for _, container := range containers {
for _, containerName := range container.Names {
if containerName == "/"+c.Name {
slog.Info("Container.ResolveId()", "state", c.StateMachine())
if c.Id == "" {
c.Id = container.ID
}
if triggerErr := c.StateMachine().Trigger("exists"); triggerErr != nil {
panic(fmt.Errorf("%w: %s %s", triggerErr, c.Type(), c.Name))
}
slog.Info("Container.ResolveId() trigger created", "machine", c.StateMachine(), "state", c.StateMachine().CurrentState())
return container.ID
}
}
}
if triggerErr := c.StateMachine().Trigger("notexists"); triggerErr != nil {
panic(fmt.Errorf("%w: %s %s", triggerErr, c.Type(), c.Name))
}
return ""
}