157 lines
3.4 KiB
Go
157 lines
3.4 KiB
Go
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
|
|
|
package context
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"gitea.rosskeen.house/rosskeen.house/machine"
|
|
"feudal/feudal"
|
|
"feudal/identity"
|
|
"feudal/message"
|
|
"feudal/subjects"
|
|
)
|
|
|
|
// a context has the creator interface, identifier?
|
|
// has a factor.producer
|
|
// dispatcher calls receive on self
|
|
|
|
type Abstract struct {
|
|
name string
|
|
state machine.Stater
|
|
id identity.Identifier
|
|
self feudal.Worker
|
|
ruler feudal.WorkerRouter
|
|
subjects feudal.Subjects
|
|
messages *Queue
|
|
sovereign feudal.Sovereign
|
|
subscribers map[string]feudal.Subjects
|
|
}
|
|
|
|
func New(name string, ruler feudal.WorkerRouter, worker feudal.Worker) feudal.WorkerRouter {
|
|
var i identity.Identifier
|
|
var sovereign feudal.Sovereign = nil
|
|
if ruler == nil {
|
|
i = identity.NewRoot(name)
|
|
} else {
|
|
i = ruler.Id().CreateChild(name)
|
|
sovereign = ruler.(feudal.Audience).Sovereign()
|
|
}
|
|
a := &Abstract{ name: name, id: i, ruler: ruler, self: worker, messages: NewBufferedQueue(1024), state: NewWorkerStateMachine(), subjects: subjects.New(), sovereign: sovereign, subscribers: make(map[string]feudal.Subjects) }
|
|
a.subscribers["lifecycle"] = subjects.New()
|
|
a.subscribers["subjects"] = subjects.New()
|
|
a.state.AddSubscription("start", a)
|
|
a.state.AddSubscription("stop", a)
|
|
worker.SetContext(a)
|
|
a.messages.Start(DefaultDispatcher(a), a)
|
|
return a
|
|
}
|
|
|
|
func NewWorkerStateMachine() machine.Stater {
|
|
m := machine.New("initialized")
|
|
m.AddStates("initialized", "started", "stopped")
|
|
m.AddTransition("start", "initialized", "started")
|
|
m.AddTransition("stop", "started", "stopped")
|
|
return m
|
|
}
|
|
|
|
func (a *Abstract) Receive(m message.Envelope) {
|
|
a.self.Receive(m)
|
|
}
|
|
|
|
func (a *Abstract) Send(m message.Envelope) {
|
|
if a.State() == "stopped" {
|
|
if a.sovereign != nil {
|
|
dn := a.sovereign.Subjects().Get("devnull")
|
|
if dn != nil {
|
|
dn.Send(m)
|
|
}
|
|
}
|
|
} else {
|
|
a.messages.Add(m)
|
|
}
|
|
}
|
|
|
|
func (a *Abstract) Address() string {
|
|
return a.id.Address()
|
|
}
|
|
|
|
func (a *Abstract) Id() identity.Identifier {
|
|
return a.id
|
|
}
|
|
|
|
func (a *Abstract) Type() string {
|
|
typeName := fmt.Sprintf("%T", a.self)
|
|
n := strings.Split(typeName, ".")
|
|
return n[len(n) - 1]
|
|
}
|
|
|
|
func (a *Abstract) Disolve() {
|
|
if a.subjects != nil {
|
|
a.subjects.Disolve()
|
|
}
|
|
}
|
|
|
|
func (a *Abstract) Self() feudal.WorkerRouter {
|
|
return a
|
|
}
|
|
|
|
func (a *Abstract) Stop() {
|
|
if a.State() != "stopped" {
|
|
a.messages.Stop()
|
|
}
|
|
}
|
|
|
|
func (a *Abstract) WorkerWhence(f feudal.Factory, name string) feudal.WorkerRouter {
|
|
ruler := a
|
|
if ruler.isStarted() {
|
|
w := f(name, ruler)
|
|
wr := New(name, ruler, w)
|
|
a.subscribers["subjects"].Add(name, wr)
|
|
return wr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Abstract) State() string {
|
|
return string(a.state.CurrentState())
|
|
}
|
|
|
|
func (a *Abstract) isStarted() bool {
|
|
if a.State() == "started" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (a *Abstract) isStopped() bool {
|
|
if a.State() == "stopped" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (a *Abstract) Trigger(transition string) {
|
|
a.state.Trigger(transition)
|
|
}
|
|
|
|
func (a *Abstract) Subjects() feudal.Subjects {
|
|
return a.subscribers["subjects"]
|
|
}
|
|
|
|
func (a *Abstract) Subscribers(topic string) feudal.Subjects {
|
|
return a.subscribers[topic]
|
|
}
|
|
|
|
func (a *Abstract) Sovereign() feudal.Sovereign {
|
|
return a.sovereign
|
|
}
|
|
|
|
func (a *Abstract) SwearFealty(s feudal.Sovereign) {
|
|
a.sovereign = s
|
|
}
|
|
|
|
func (a *Abstract) Notify(m *machine.EventMessage) {
|
|
a.Send(message.New(m, a))
|
|
}
|