// Copyright 2024 Matthew Rich . All rights reserved. // Container resource package resource import ( "context" "fmt" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/strslice" "github.com/docker/docker/client" "github.com/docker/go-connections/nat" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "gopkg.in/yaml.v3" _ "gopkg.in/yaml.v3" "log/slog" "net/url" _ "os" _ "os/exec" "path/filepath" "strings" "encoding/json" "io" "gitea.rosskeen.house/rosskeen.house/machine" "decl/internal/codec" "decl/internal/data" "decl/internal/folio" "decl/internal/containerlog" "bytes" _ "encoding/base64" ) const ( ContainerTypeName TypeName = "container" ) type ContainerClient interface { ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *ocispec.Platform, containerName string) (container.CreateResponse, error) ContainerStart(ctx context.Context, containerID string, options container.StartOptions) error ContainerList(context.Context, container.ListOptions) ([]types.Container, error) ContainerInspect(context.Context, string) (types.ContainerJSON, error) ContainerRemove(context.Context, string, container.RemoveOptions) error ContainerStop(context.Context, string, container.StopOptions) error ContainerWait(ctx context.Context, containerID string, condition container.WaitCondition) (<-chan container.WaitResponse, <-chan error) ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error) Close() error } type Container struct { *Common `yaml:",inline" json:",inline"` stater machine.Stater `yaml:"-" json:"-"` Id string `json:"ID,omitempty" yaml:"ID,omitempty"` Name string `json:"name" yaml:"name"` Cmd []string `json:"cmd,omitempty" yaml:"cmd,omitempty"` WorkingDir string `json:"workingdir,omitempty" yaml:"workingdir,omitempty"` Entrypoint strslice.StrSlice `json:"entrypoint,omitempty" yaml:"entrypoint,omitempty"` Args []string `json:"args,omitempty" yaml:"args,omitempty"` Ports []string `json:"ports,omitempty" yaml:"ports,omitempty"` Environment map[string]string `json:"environment" yaml:"environment"` Image string `json:"image" yaml:"image"` ResolvConfPath string `json:"resolvconfpath" yaml:"resolvconfpath"` HostnamePath string `json:"hostnamepath" yaml:"hostnamepath"` HostsPath string `json:"hostpath" yaml:"hostspath"` LogPath string `json:"logpath" yaml:"logpath"` Created string `json:"created" yaml:"created"` ContainerState types.ContainerState `json:"containerstate" yaml:"containerstate"` RestartCount int `json:"restartcount" yaml:"restartcount"` Driver string `json:"driver" yaml:"driver"` Platform string `json:"platform" yaml:"platform"` MountLabel string `json:"mountlabel" yaml:"mountlabel"` ProcessLabel string `json:"processlabel" yaml:"processlabel"` AppArmorProfile string `json:"apparmorprofile" yaml:"apparmorprofile"` ExecIDs []string `json:"execids" yaml:"execids"` HostConfig container.HostConfig `json:"hostconfig" yaml:"hostconfig"` GraphDriver types.GraphDriverData `json:"graphdriver" yaml:"graphdriver"` SizeRw *int64 `json:",omitempty" yaml:",omitempty"` SizeRootFs *int64 `json:",omitempty" yaml:",omitempty"` Networks []string `json:"networks,omitempty" yaml:"networks,omitempty"` /* Mounts []MountPoint Config *container.Config NetworkSettings *NetworkSettings */ Wait bool `json:"wait,omitempty" yaml:"wait,omitempty"` Stdout string `json:"stdout,omitempty" yaml:"stdout,omitempty"` Stderr string `json:"stderr,omitempty" yaml:"stderr,omitempty"` apiClient ContainerClient Resources data.ResourceMapper `json:"-" yaml:"-"` } func init() { ResourceTypes.Register([]string{"container"}, func(u *url.URL) (c data.Resource) { c = NewContainer(nil) if u != nil { if err := folio.CastParsedURI(u).ConstructResource(c); err != nil { panic(err) } } return }) } func NewContainer(containerClientApi ContainerClient) *Container { var apiClient ContainerClient = containerClientApi if apiClient == nil { var err error apiClient, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { panic(err) } } return &Container{ Common: NewCommon(ContainerTypeName, false), apiClient: apiClient, } } func (c *Container) Init(u data.URIParser) error { if u == nil { u = folio.URI(c.URI()).Parse() } uri := u.URL() c.Name = filepath.Join(uri.Hostname(), uri.Path) return c.SetParsedURI(u) } func (c *Container) SetParsedURI(u data.URIParser) (err error) { if err = c.Common.SetParsedURI(u); err == nil { c.Name = filepath.Join(c.Common.parsedURI.Hostname(), c.Common.parsedURI.Path) } return } func (c *Container) SetResourceMapper(resources data.ResourceMapper) { c.Resources = resources } func (c *Container) Clone() data.Resource { return &Container { Id: c.Id, Name: c.Name, Common: c.Common.Clone(), Cmd: c.Cmd, Entrypoint: c.Entrypoint, Args: c.Args, Environment: c.Environment, Image: c.Image, ResolvConfPath: c.ResolvConfPath, HostnamePath: c.HostnamePath, HostsPath: c.HostsPath, LogPath: c.LogPath, Created: c.Created, ContainerState: c.ContainerState, RestartCount: c.RestartCount, Driver: c.Driver, Platform: c.Platform, MountLabel: c.MountLabel, ProcessLabel: c.ProcessLabel, AppArmorProfile: c.AppArmorProfile, ExecIDs: c.ExecIDs, HostConfig: c.HostConfig, GraphDriver: c.GraphDriver, SizeRw: c.SizeRw, SizeRootFs: c.SizeRootFs, Networks: c.Networks, apiClient: c.apiClient, } } func (c *Container) StateMachine() machine.Stater { if c.stater == nil { c.stater = ProcessMachine(c) } return c.stater } func (c *Container) JSON() ([]byte, error) { return json.Marshal(c) } func (c *Container) Validate() error { return fmt.Errorf("failed") } func (c *Container) Notify(m *machine.EventMessage) { slog.Info("Container.Notify()", "destination_event", m.Dest, "uri", c.URI()) ctx := context.Background() switch m.On { case machine.ENTERSTATEEVENT: switch m.Dest { case "start_stat": if statErr := c.ReadStat(); statErr == nil { slog.Info("Container.Notify() - ReadStat", "event", "start_stat", "error", statErr) if triggerErr := c.StateMachine().Trigger("exists"); triggerErr == nil { slog.Info("Container.Notify()", "event", "start_stat", "trigger", "exists") return } } else { slog.Info("Container.Notify() - ReadStat", "event", "start_stat", "error", statErr) if triggerErr := c.StateMachine().Trigger("notexists"); triggerErr == nil { slog.Info("Container.Notify()", "event", "start_stat", "trigger", "notexists") return } } case "start_read": if _,readErr := c.Read(ctx); readErr == nil { if triggerErr := c.stater.Trigger("state_read"); triggerErr == nil { return } else { c.Common.State = "absent" panic(triggerErr) } } else { c.Common.State = "absent" panic(readErr) } case "start_create": if createErr := c.Create(ctx); createErr == nil { if triggerErr := c.StateMachine().Trigger("created"); triggerErr == nil { return } else { c.Common.State = "absent" panic(triggerErr) } } else { c.Common.State = "absent" panic(createErr) } case "start_delete": slog.Info("Container.Notify()", "event", "start_delete") if deleteErr := c.Delete(ctx); deleteErr == nil { if triggerErr := c.StateMachine().Trigger("deleted"); triggerErr == nil { return } else { c.Common.State = "present" panic(triggerErr) } } else { c.Common.State = "present" panic(deleteErr) } case "present", "created", "read": c.Common.State = "present" case "running": c.Common.State = "running" case "absent": c.Common.State = "absent" } case machine.EXITSTATEEVENT: } } func (c *Container) ReadStat() (err error) { err = fmt.Errorf("%w: %s", ErrResourceStateAbsent, c.Name) filterArgs := filters.NewArgs() filterArgs.Add("name", "/"+c.Name) if containers, listErr := c.apiClient.ContainerList(context.Background(), container.ListOptions{ All: true, Filters: filterArgs, }); listErr == nil { for _, container := range containers { for _, containerName := range container.Names { if containerName == "/"+c.Name { slog.Info("Container.ReadStat() exists", "container", c.Name) return nil } } } } else { err = fmt.Errorf("%w: %w", err, listErr) } return } func (c *Container) Apply() error { ctx := context.Background() switch c.Common.State { case "absent": return c.Delete(ctx) case "present": return c.Create(ctx) } return nil } func (c *Container) Load(docData []byte, f codec.Format) (err error) { err = f.StringDecoder(string(docData)).Decode(c) return } func (c *Container) LoadReader(r io.ReadCloser, f codec.Format) (err error) { err = f.Decoder(r).Decode(c) return } func (c *Container) LoadString(docData string, f codec.Format) (err error) { err = f.StringDecoder(docData).Decode(c) return } func (c *Container) LoadDecl(yamlResourceDeclaration string) error { return c.LoadString(yamlResourceDeclaration, codec.FormatYaml) } func (c *Container) ReadFromContainer(ctx context.Context) (err error) { var buf bytes.Buffer var stdout, stderr []string if stdoutReader, err := c.apiClient.ContainerLogs(ctx, c.Id, container.LogsOptions{ShowStdout: true, ShowStderr: true}); err != nil { return err } else { defer stdoutReader.Close() if _, copyErr := io.Copy(&buf, stdoutReader); copyErr != nil { return copyErr } slog.Info("Container.ReadFromContainer() - ContainerLogs", "read", buf.String()) for { if streamType, message, extractErr := containerlog.Read(&buf); extractErr == nil { switch streamType { case containerlog.StreamStdout: stdout = append(stdout, message) case containerlog.StreamStderr: stderr = append(stderr, message) } } else { if extractErr == io.EOF { break } err = extractErr } /* if streamType, size, extractErr := c.ExtractLogHeader(&buf); extractErr == nil { slog.Info("Container.Create() - ContainerLogs", "streamtype", streamType, "size", size) var logMessage string if logMessage, err = c.ReadLogMessage(&buf, size); err == nil { switch streamType { case ContainerLogStreamStdout: stdout = append(stdout, logMessage) case ContainerLogStreamStderr: stderr = append(stderr, logMessage) } } } else { if extractErr == io.EOF { break } err = extractErr } */ } } c.Stdout = strings.Join(stdout, "") c.Stderr = strings.Join(stderr, "") slog.Info("Container.ReadFromContainer()", "stdout", c.Stdout, "stderr", c.Stderr, "error", err) return } func (c *Container) Create(ctx context.Context) error { numberOfEnvironmentVariables := len(c.Environment) portset := nat.PortSet {} for _, port := range c.Ports { portset[nat.Port(port)] = struct{}{} } config := &container.Config{ Image: c.Image, Cmd: c.Cmd, Entrypoint: c.Entrypoint, Tty: false, ExposedPorts: portset, WorkingDir: c.WorkingDir, AttachStdout: true, AttachStderr: true, } config.Env = make([]string, numberOfEnvironmentVariables) index := 0 for k, v := range c.Environment { config.Env[index] = k + "=" + v index++ } for i := range c.HostConfig.Mounts { if c.HostConfig.Mounts[i].Type == mount.TypeBind { if mountSourceAbsolutePath, e := filepath.Abs(c.HostConfig.Mounts[i].Source); e == nil { c.HostConfig.Mounts[i].Source = mountSourceAbsolutePath } } } networkConfig := &network.NetworkingConfig{ EndpointsConfig: map[string]*network.EndpointSettings{}, } settings := &network.EndpointSettings{} for _, network := range c.Networks { networkConfig.EndpointsConfig[network] = settings } resp, err := c.apiClient.ContainerCreate(ctx, config, &c.HostConfig, networkConfig, nil, c.Name) if err != nil { return err } c.Id = resp.ID if startErr := c.apiClient.ContainerStart(ctx, c.Id, container.StartOptions{}); startErr != nil { return startErr } if err = c.ReadFromContainer(ctx); err != nil { return err } if c.Wait { slog.Info("Container.Create() - waiting for container to stop", "id", c.Id, "name", c.Name) statusCh, errCh := c.apiClient.ContainerWait(ctx, c.Id, container.WaitConditionNotRunning) select { case err := <-errCh: if err != nil { panic(err) } case <-statusCh: } } if len(c.Stdout) == 0 && len(c.Stderr) == 0 { if err = c.ReadFromContainer(ctx); err != nil { return err } } return err } func (c *Container) Update(ctx context.Context) error { return c.Create(ctx) } // produce yaml representation of any resource func (c *Container) Read(ctx context.Context) ([]byte, error) { var containerID string filterArgs := filters.NewArgs() filterArgs.Add("name", "/"+c.Name) containers, err := c.apiClient.ContainerList(ctx, container.ListOptions{ All: true, Filters: filterArgs, }) if err != nil { return nil, fmt.Errorf("%w: %s %s", err, c.Type(), c.Name) } for _, container := range containers { for _, containerName := range container.Names { if containerName == "/"+c.Name { containerID = container.ID } } } if inspectErr := c.Inspect(ctx, containerID); inspectErr != nil { return nil, fmt.Errorf("%w: container %s", inspectErr, containerID) } slog.Info("Read() ", "type", c.Type(), "name", c.Name, "Id", c.Id) return yaml.Marshal(c) } func (c *Container) Inspect(ctx context.Context, containerID string) error { containerJSON, err := c.apiClient.ContainerInspect(ctx, containerID) if client.IsErrNotFound(err) { c.Common.State = "absent" } else { c.Common.State = "present" c.Id = containerJSON.ID if c.Name == "" { if containerJSON.Name[0] == '/' { c.Name = containerJSON.Name[1:] } else { c.Name = containerJSON.Name } } c.Common.Path = containerJSON.Path c.Image = containerJSON.Image c.Created = containerJSON.Created c.ResolvConfPath = containerJSON.ResolvConfPath c.HostnamePath = containerJSON.HostnamePath c.HostsPath = containerJSON.HostsPath c.LogPath = containerJSON.LogPath c.RestartCount = containerJSON.RestartCount c.Driver = containerJSON.Driver if containerJSON.State != nil { c.ContainerState = *containerJSON.State if c.ContainerState.ExitCode != 0 { return fmt.Errorf("%s", c.ContainerState.Error) } } } return nil } func (c *Container) Delete(ctx context.Context) error { slog.Info("Container.Delete()", "id", c.Id, "name", c.Name) if stopErr := c.apiClient.ContainerStop(ctx, c.Id, container.StopOptions{}); stopErr != nil { slog.Error("Container.Delete() - failed to stop: ", "Id", c.Id, "error", stopErr) return stopErr } err := c.apiClient.ContainerRemove(ctx, c.Id, container.RemoveOptions{ RemoveVolumes: true, Force: false, }) if err != nil { slog.Error("Container.Delete() - failed to remove: ", "Id", c.Id, "error", err) return err } statusCh, errCh := c.apiClient.ContainerWait(ctx, c.Id, container.WaitConditionNotRunning) select { case waitErr := <-errCh: if waitErr != nil { if strings.Contains(waitErr.Error(), "No such container:") { return nil } return waitErr } case <-statusCh: } return err } func (c *Container) URI() string { return fmt.Sprintf("%s://%s", c.Type(), c.Name) } func (c *Container) Type() string { return "container" } func (c *Container) ResolveId(ctx context.Context) string { var err error if err = c.Common.SetParsedURI(folio.URI(c.URI()).Parse()); err != nil { triggerErr := c.StateMachine().Trigger("notexists") panic(fmt.Errorf("%w: %s %s, %w", err, c.Type(), c.Name, triggerErr)) } filterArgs := filters.NewArgs() filterArgs.Add("name", "/"+c.Name) containers, listErr := c.apiClient.ContainerList(ctx, container.ListOptions{ All: true, Filters: filterArgs, }) if listErr != nil { triggerErr := c.StateMachine().Trigger("notexists") panic(fmt.Errorf("%w: %s %s, %w", listErr, c.Type(), c.Name, triggerErr)) } slog.Info("Container.ResolveId()", "containers", containers) for _, container := range containers { for _, containerName := range container.Names { if containerName == "/"+c.Name { slog.Info("Container.ResolveId()", "state", c.StateMachine()) if c.Id == "" { c.Id = container.ID } if triggerErr := c.StateMachine().Trigger("exists"); triggerErr != nil { panic(fmt.Errorf("%w: %s %s", triggerErr, c.Type(), c.Name)) } slog.Info("Container.ResolveId() trigger created", "machine", c.StateMachine(), "state", c.StateMachine().CurrentState()) return container.ID } } } if triggerErr := c.StateMachine().Trigger("notexists"); triggerErr != nil { panic(fmt.Errorf("%w: %s %s", triggerErr, c.Type(), c.Name)) } return "" }