// Copyright 2024 Matthew Rich . All rights reserved. package context import ( "fmt" "strings" "gitea.rosskeen.house/rosskeen.house/machine" "feudal/feudal" "feudal/identity" "feudal/message" "feudal/subjects" ) // a context has the creator interface, identifier? // has a factor.producer // dispatcher calls receive on self type Abstract struct { name string state machine.Stater id identity.Identifier self feudal.Worker ruler feudal.WorkerRouter subjects feudal.Subjects messages *Queue sovereign feudal.Sovereign subscribers map[string]feudal.Subjects } func New(name string, ruler feudal.WorkerRouter, worker feudal.Worker) feudal.WorkerRouter { var i identity.Identifier var sovereign feudal.Sovereign = nil if ruler == nil { i = identity.NewRoot(name) } else { i = ruler.Id().CreateChild(name) sovereign = ruler.(feudal.Audience).Sovereign() } a := &Abstract{ name: name, id: i, ruler: ruler, self: worker, messages: NewBufferedQueue(1024), state: NewWorkerStateMachine(), subjects: subjects.New(), sovereign: sovereign, subscribers: make(map[string]feudal.Subjects) } a.subscribers["lifecycle"] = subjects.New() a.subscribers["subjects"] = subjects.New() a.state.AddSubscription("start", a) a.state.AddSubscription("stop", a) worker.SetContext(a) a.messages.Start(DefaultDispatcher(a), a) return a } func NewWorkerStateMachine() machine.Stater { m := machine.New("initialized") m.AddStates("initialized", "started", "stopped") m.AddTransition("start", "initialized", "started") m.AddTransition("stop", "started", "stopped") return m } func (a *Abstract) Receive(m message.Envelope) { a.self.Receive(m) } func (a *Abstract) Send(m message.Envelope) { if a.State() == "stopped" { if a.sovereign != nil { dn := a.sovereign.Subjects().Get("devnull") if dn != nil { dn.Send(m) } } } else { a.messages.Add(m) } } func (a *Abstract) Address() string { return a.id.Address() } func (a *Abstract) Id() identity.Identifier { return a.id } func (a *Abstract) Type() string { typeName := fmt.Sprintf("%T", a.self) n := strings.Split(typeName, ".") return n[len(n) - 1] } func (a *Abstract) Disolve() { if a.subjects != nil { a.subjects.Disolve() } } func (a *Abstract) Self() feudal.WorkerRouter { return a } func (a *Abstract) Stop() { if a.State() != "stopped" { a.messages.Stop() } } func (a *Abstract) WorkerWhence(f feudal.Factory, name string) feudal.WorkerRouter { ruler := a if ruler.isStarted() { w := f(name, ruler) wr := New(name, ruler, w) a.subscribers["subjects"].Add(name, wr) return wr } return nil } func (a *Abstract) State() string { return string(a.state.CurrentState()) } func (a *Abstract) isStarted() bool { if a.State() == "started" { return true } return false } func (a *Abstract) isStopped() bool { if a.State() == "stopped" { return true } return false } func (a *Abstract) Trigger(transition string) { a.state.Trigger(transition) } func (a *Abstract) Subjects() feudal.Subjects { return a.subscribers["subjects"] } func (a *Abstract) Subscribers(topic string) feudal.Subjects { return a.subscribers[topic] } func (a *Abstract) Sovereign() feudal.Sovereign { return a.sovereign } func (a *Abstract) SwearFealty(s feudal.Sovereign) { a.sovereign = s } func (a *Abstract) Notify(m *machine.EventMessage) { a.Send(message.New(m, a)) }