feudal/context/context.go

157 lines
3.4 KiB
Go
Raw Permalink Normal View History

2024-05-05 07:11:52 +00:00
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package context
import (
"fmt"
"strings"
"gitea.rosskeen.house/rosskeen.house/machine"
"feudal/feudal"
"feudal/identity"
"feudal/message"
"feudal/subjects"
)
// a context has the creator interface, identifier?
// has a factor.producer
// dispatcher calls receive on self
type Abstract struct {
name string
state machine.Stater
id identity.Identifier
self feudal.Worker
ruler feudal.WorkerRouter
subjects feudal.Subjects
messages *Queue
sovereign feudal.Sovereign
subscribers map[string]feudal.Subjects
}
func New(name string, ruler feudal.WorkerRouter, worker feudal.Worker) feudal.WorkerRouter {
var i identity.Identifier
var sovereign feudal.Sovereign = nil
if ruler == nil {
i = identity.NewRoot(name)
} else {
i = ruler.Id().CreateChild(name)
sovereign = ruler.(feudal.Audience).Sovereign()
}
a := &Abstract{ name: name, id: i, ruler: ruler, self: worker, messages: NewBufferedQueue(1024), state: NewWorkerStateMachine(), subjects: subjects.New(), sovereign: sovereign, subscribers: make(map[string]feudal.Subjects) }
a.subscribers["lifecycle"] = subjects.New()
a.subscribers["subjects"] = subjects.New()
a.state.AddSubscription("start", a)
a.state.AddSubscription("stop", a)
worker.SetContext(a)
a.messages.Start(DefaultDispatcher(a), a)
return a
}
func NewWorkerStateMachine() machine.Stater {
m := machine.New("initialized")
m.AddStates("initialized", "started", "stopped")
m.AddTransition("start", "initialized", "started")
m.AddTransition("stop", "started", "stopped")
return m
}
func (a *Abstract) Receive(m message.Envelope) {
a.self.Receive(m)
}
func (a *Abstract) Send(m message.Envelope) {
if a.State() == "stopped" {
if a.sovereign != nil {
dn := a.sovereign.Subjects().Get("devnull")
if dn != nil {
dn.Send(m)
}
}
} else {
a.messages.Add(m)
}
}
func (a *Abstract) Address() string {
return a.id.Address()
}
func (a *Abstract) Id() identity.Identifier {
return a.id
}
func (a *Abstract) Type() string {
typeName := fmt.Sprintf("%T", a.self)
n := strings.Split(typeName, ".")
return n[len(n) - 1]
}
func (a *Abstract) Disolve() {
if a.subjects != nil {
a.subjects.Disolve()
}
}
func (a *Abstract) Self() feudal.WorkerRouter {
return a
}
func (a *Abstract) Stop() {
if a.State() != "stopped" {
a.messages.Stop()
}
}
func (a *Abstract) WorkerWhence(f feudal.Factory, name string) feudal.WorkerRouter {
ruler := a
if ruler.isStarted() {
w := f(name, ruler)
wr := New(name, ruler, w)
a.subscribers["subjects"].Add(name, wr)
return wr
}
return nil
}
func (a *Abstract) State() string {
return string(a.state.CurrentState())
}
func (a *Abstract) isStarted() bool {
if a.State() == "started" {
return true
}
return false
}
func (a *Abstract) isStopped() bool {
if a.State() == "stopped" {
return true
}
return false
}
func (a *Abstract) Trigger(transition string) {
a.state.Trigger(transition)
}
func (a *Abstract) Subjects() feudal.Subjects {
return a.subscribers["subjects"]
}
func (a *Abstract) Subscribers(topic string) feudal.Subjects {
return a.subscribers[topic]
}
func (a *Abstract) Sovereign() feudal.Sovereign {
return a.sovereign
}
func (a *Abstract) SwearFealty(s feudal.Sovereign) {
a.sovereign = s
}
func (a *Abstract) Notify(m *machine.EventMessage) {
a.Send(message.New(m, a))
}