From 1b9d1b1fb197cbb36b8022fdc6ec83bd318691b4 Mon Sep 17 00:00:00 2001 From: Matthew Rich Date: Sun, 5 May 2024 00:11:52 -0700 Subject: [PATCH] add context --- Makefile | 7 +- context/context.go | 156 ++++++++++++++++++++++++++++++ context/context_test.go | 110 +++++++++++++++++++++ context/dispatcher.go | 41 ++++++++ context/dispatcher_test.go | 81 ++++++++++++++++ context/handler.go | 15 +++ context/queue.go | 72 ++++++++++++++ context/queue_test.go | 61 ++++++++++++ feudal/interfaces.go | 83 ++++++++++++++++ feudal/messenger.go | 11 +++ interrogator/interrogator.go | 35 +++++++ interrogator/interrogator_test.go | 28 ++++++ worker/clock.go | 45 +++++++++ worker/clock_test.go | 55 +++++++++++ 14 files changed, 797 insertions(+), 3 deletions(-) create mode 100644 context/context.go create mode 100644 context/context_test.go create mode 100644 context/dispatcher.go create mode 100644 context/dispatcher_test.go create mode 100644 context/handler.go create mode 100644 context/queue.go create mode 100644 context/queue_test.go create mode 100644 feudal/interfaces.go create mode 100644 feudal/messenger.go create mode 100644 interrogator/interrogator.go create mode 100644 interrogator/interrogator_test.go create mode 100644 worker/clock.go create mode 100644 worker/clock_test.go diff --git a/Makefile b/Makefile index f2fa516..82b1e83 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ -LDFLAGS?=--ldflags '-extldflags "-static"' +LDFLAGS?=--ldflags '-extldflags "-static"' --ldflags="-X 'main.commit=$(shell git rev-parse HEAD)' -X 'main.version=$(shell git describe --tags)' -X 'main.date=$(shell date '+%Y-%m-%d %T.%s%z')'" export CGO_ENABLED=0 build: - + test: - go test ./... + go test -coverprofile=artifacts/coverage.profile ./... + go tool cover -html=artifacts/coverage.profile -o artifacts/code-coverage.html diff --git a/context/context.go b/context/context.go new file mode 100644 index 0000000..7872b3b --- /dev/null +++ b/context/context.go @@ -0,0 +1,156 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package context + +import ( + "fmt" + "strings" + "gitea.rosskeen.house/rosskeen.house/machine" + "feudal/feudal" + "feudal/identity" + "feudal/message" + "feudal/subjects" +) + +// a context has the creator interface, identifier? +// has a factor.producer +// dispatcher calls receive on self + +type Abstract struct { + name string + state machine.Stater + id identity.Identifier + self feudal.Worker + ruler feudal.WorkerRouter + subjects feudal.Subjects + messages *Queue + sovereign feudal.Sovereign + subscribers map[string]feudal.Subjects +} + +func New(name string, ruler feudal.WorkerRouter, worker feudal.Worker) feudal.WorkerRouter { + var i identity.Identifier + var sovereign feudal.Sovereign = nil + if ruler == nil { + i = identity.NewRoot(name) + } else { + i = ruler.Id().CreateChild(name) + sovereign = ruler.(feudal.Audience).Sovereign() + } + a := &Abstract{ name: name, id: i, ruler: ruler, self: worker, messages: NewBufferedQueue(1024), state: NewWorkerStateMachine(), subjects: subjects.New(), sovereign: sovereign, subscribers: make(map[string]feudal.Subjects) } + a.subscribers["lifecycle"] = subjects.New() + a.subscribers["subjects"] = subjects.New() + a.state.AddSubscription("start", a) + a.state.AddSubscription("stop", a) + worker.SetContext(a) + a.messages.Start(DefaultDispatcher(a), a) + return a +} + +func NewWorkerStateMachine() machine.Stater { + m := machine.New("initialized") + m.AddStates("initialized", "started", "stopped") + m.AddTransition("start", "initialized", "started") + m.AddTransition("stop", "started", "stopped") + return m +} + +func (a *Abstract) Receive(m message.Envelope) { + a.self.Receive(m) +} + +func (a *Abstract) Send(m message.Envelope) { + if a.State() == "stopped" { + if a.sovereign != nil { + dn := a.sovereign.Subjects().Get("devnull") + if dn != nil { + dn.Send(m) + } + } + } else { + a.messages.Add(m) + } +} + +func (a *Abstract) Address() string { + return a.id.Address() +} + +func (a *Abstract) Id() identity.Identifier { + return a.id +} + +func (a *Abstract) Type() string { + typeName := fmt.Sprintf("%T", a.self) + n := strings.Split(typeName, ".") + return n[len(n) - 1] +} + +func (a *Abstract) Disolve() { + if a.subjects != nil { + a.subjects.Disolve() + } +} + +func (a *Abstract) Self() feudal.WorkerRouter { + return a +} + +func (a *Abstract) Stop() { + if a.State() != "stopped" { + a.messages.Stop() + } +} + +func (a *Abstract) WorkerWhence(f feudal.Factory, name string) feudal.WorkerRouter { + ruler := a + if ruler.isStarted() { + w := f(name, ruler) + wr := New(name, ruler, w) + a.subscribers["subjects"].Add(name, wr) + return wr + } + return nil +} + +func (a *Abstract) State() string { + return string(a.state.CurrentState()) +} + +func (a *Abstract) isStarted() bool { + if a.State() == "started" { + return true + } + return false +} + +func (a *Abstract) isStopped() bool { + if a.State() == "stopped" { + return true + } + return false +} + +func (a *Abstract) Trigger(transition string) { + a.state.Trigger(transition) +} + +func (a *Abstract) Subjects() feudal.Subjects { + return a.subscribers["subjects"] +} + +func (a *Abstract) Subscribers(topic string) feudal.Subjects { + return a.subscribers[topic] +} + +func (a *Abstract) Sovereign() feudal.Sovereign { + return a.sovereign +} + +func (a *Abstract) SwearFealty(s feudal.Sovereign) { + a.sovereign = s +} + +func (a *Abstract) Notify(m *machine.EventMessage) { + a.Send(message.New(m, a)) +} diff --git a/context/context_test.go b/context/context_test.go new file mode 100644 index 0000000..7edc988 --- /dev/null +++ b/context/context_test.go @@ -0,0 +1,110 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package context; + +import ( + "testing" + "log" + "gitea.rosskeen.house/rosskeen.house/machine" + "feudal/message" + "feudal/interrogator" + "feudal/tests" + "feudal/tests/ponger" + "feudal/feudal" +) + +type testWorker struct { + context interface{} +} + +func (w *testWorker) Receive(m message.Envelope) { + switch m.Body().(type) { + case string: + m.Sender().Send(m) + case *machine.EventMessage: + log.Printf("event: %s on %s", m.Body(), w.context.(feudal.Context).Address()) + default: + log.Fatal("unexpected message") + } +} + +func (w *testWorker) Address() string { return w.context.(*Abstract).id.Address() } + +func (w *testWorker) SetContext(c interface{}) { w.context = c } + +func setupTestWorker() feudal.WorkerRouter { + return New("test", nil, &testWorker{}) +} + +func TestNew(t *testing.T) { + w := New("test", nil, &testWorker{}) + defer w.Stop() + r := tests.SendTestMessage(w, "test message") + tests.AssertMessageValue(r, "test message") +} + +func TestWorkerType(t *testing.T) { + nw := setupTestWorker() + defer nw.Stop() + ty := nw.Type() + if ty != "testWorker" { + t.Errorf("Type() returned incorrect type %s", ty) + } +} + +func TestWorkerStop(t *testing.T) { + nw := setupTestWorker() + defer nw.Stop() +} + +func TestWorkerStateInitial(t *testing.T) { + nw := setupTestWorker() + defer nw.Stop() + if nw.State() != "initialized" { + t.Errorf("Invalid state") + } +} + +func TestWorkerStateStarted(t *testing.T) { + i := interrogator.NewBuffered(10) + nw := setupTestWorker() + nw.(feudal.Context).Subscribers("lifecycle").Add("test", i) + defer nw.Stop() + _ = <- i + _ = <- i + if ! nw.(*Abstract).isStarted() { + t.Errorf("worker failed to transition to started state") + } +} + +func TestWorkerWhenceParentState(t *testing.T) { + nw := setupTestWorker() + nw.Stop() + p1 := nw.(feudal.Context).WorkerWhence(ponger.PongerFactory(), "p1") + if p1 != nil { + t.Errorf("Created worker from invalid state") + } +} + +func TestWorkerWhence(t *testing.T) { + i := interrogator.NewBuffered(10) + nw := setupTestWorker() + nw.(feudal.Context).Subscribers("lifecycle").Add("test", i) + enter_start_nw := <- i + log.Printf("%s", enter_start_nw.(*message.AbstractEnvelope)) + exit_start_nw := <- i + log.Printf("%s", exit_start_nw.(*message.AbstractEnvelope)) + + ip := interrogator.NewBuffered(10) + p1 := nw.(feudal.Context).WorkerWhence(ponger.PongerFactory(), "p1") + p1.(feudal.Context).Subscribers("lifecycle").Add("testip", ip) + + sp1 := <- ip + log.Printf("%s %s", sp1.Sender().Address(), sp1.(*message.AbstractEnvelope)) + + p2 := p1.(feudal.Context).WorkerWhence(ponger.PongerFactory(), "p2") + if nw.State() == "started" && p1.State() == "started" && p2.State() == "started" { + + } + +} diff --git a/context/dispatcher.go b/context/dispatcher.go new file mode 100644 index 0000000..22f3c88 --- /dev/null +++ b/context/dispatcher.go @@ -0,0 +1,41 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package context; + +import ( + "feudal/feudal" + "gitea.rosskeen.house/rosskeen.house/machine" + "feudal/message" +) + +type DispatchReceiveFunc func(m message.Envelope, r message.Receiver) + +type Dispatcher interface { + Dispatch(m message.Envelope, r message.Receiver) +} + +type DispatchReceive struct { + worker feudal.Context +} + +func (d *DispatchReceive) DispatchToStateSubscribers(m message.Envelope, r message.Receiver) { + switch b := m.Body().(type) { + case *machine.EventMessage: + recipients := d.worker.Subscribers("lifecycle") + var event machine.EventMessage = *b + recipients.Send(message.New(event, m.Sender())) + } +} + +func (d *DispatchReceive) Dispatch(m message.Envelope, r message.Receiver) { + d.DispatchToStateSubscribers(m, r) + r.Receive(m) +} + +func DefaultDispatcher(worker feudal.Context) Dispatcher { + return &DispatchReceive{ worker: worker } +} + +func (f DispatchReceiveFunc) Dispatch(m message.Envelope, r message.Receiver) { + f(m, r) +} diff --git a/context/dispatcher_test.go b/context/dispatcher_test.go new file mode 100644 index 0000000..a50a158 --- /dev/null +++ b/context/dispatcher_test.go @@ -0,0 +1,81 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package context; + +import ( + "testing" + "feudal/message" + "feudal/interrogator" + "feudal/tests/receiver" +) + +func TestRunner(t *testing.T) { + q := NewQueue() + q.Start(DefaultDispatcher(nil), &receiver.Ctx{}) + + i := interrogator.New() + q.receive <- message.New("test message", i) + r := <- i + if r.Body() != "test message" { + t.Errorf("invalid message") + } +} + +func TestRunnerStartStop(t *testing.T) { + q := NewQueue() + worker := &receiver.Ctx{} + q.Start(DefaultDispatcher(worker), worker) + + select { + case _, ok := <- q.receive: + if ! ok { + t.Errorf("receive channel closed") + } + default: + } + + select { + case _, ok := <- q.quit: + if ! ok { + t.Errorf("quit channel closed") + } + default: + } + + select { + case _, ok := <- q.stopped: + if ! ok { + t.Errorf("stopped channel closed") + } + default: + } + + q.Stop() + + select { + case _, ok := <- q.receive: + if ok { + t.Errorf("receive channel not closed") + } + default: + t.Errorf("receive channel not closed") + } + + select { + case _, ok := <- q.quit: + if ok { + t.Errorf("quit channel not closed") + } + default: + t.Errorf("quit channel not closed") + } + + select { + case _, ok := <- q.stopped: + if ok { + t.Errorf("stopped channel not closed") + } + default: + t.Errorf("stopped channel not closed") + } +} diff --git a/context/handler.go b/context/handler.go new file mode 100644 index 0000000..ae05e3f --- /dev/null +++ b/context/handler.go @@ -0,0 +1,15 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package context + +import ( + "feudal/message" +) + +// Register a handler for a message event type + +type MessageHandler func(m message.Envelope) + +func (h MessageHandler) Handle(m message.Envelope) { + h(m) +} diff --git a/context/queue.go b/context/queue.go new file mode 100644 index 0000000..0af4e31 --- /dev/null +++ b/context/queue.go @@ -0,0 +1,72 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package context; + +import ( + "log" + "feudal/message" + "feudal/feudal" +) + +type Runner interface { + Start(d Dispatcher, c feudal.Context) + Stop() +} + +type Queue struct { + receive chan message.Envelope + quit chan bool + stopped chan bool +} + +func NewQueue() *Queue { + return &Queue{ receive: make(chan message.Envelope), quit: make(chan bool), stopped: make(chan bool) } +} + +func NewBufferedQueue(size int) *Queue { + return &Queue{ receive: make(chan message.Envelope, size), quit: make(chan bool), stopped: make(chan bool) } +} + +func (q *Queue) Add(m message.Envelope) { + q.receive <- m +} + +func (q *Queue) Remove() message.Envelope { + return <- q.receive +} + +func (q *Queue) contextTrigger(c feudal.Context, trigger string) { + if c != nil { + log.Printf("triggering %s on %s", trigger, c.Address()) + c.Trigger(trigger) + } +} + +func (q *Queue) Start(d Dispatcher, c feudal.Context) { + go func() { + q.contextTrigger(c, "start") + for { + select { + case m := <- q.receive: + d.Dispatch(m, c) + case <- q.quit: + close(q.receive) + c.Disolve() // change to trigger + close(q.stopped) + q.contextTrigger(c, "stop") + return + } + } + }() +} + +func (q *Queue) Stop() { + close(q.quit) + select { + case <- q.stopped: + } +} + +func (q *Queue) Len() int { + return len(q.receive) +} diff --git a/context/queue_test.go b/context/queue_test.go new file mode 100644 index 0000000..f8faae9 --- /dev/null +++ b/context/queue_test.go @@ -0,0 +1,61 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + + +package context + +import ( + "testing" + "feudal/message" +) + +func setupQueue() *Queue { + q := NewQueue() + return q +} + +func setupBufferedQueue(size int) *Queue { + return NewBufferedQueue(size) +} + +func TestNewQueue(t *testing.T) { + q := setupQueue() + if q == nil { + t.Errorf("Failed creating new queue") + } +} + +func TestNewBufferedQueue(t *testing.T) { + q := setupBufferedQueue(10) + if q == nil { + t.Errorf("Failed creating new buffered queue") + } +} + +func TestQueueAdd(t *testing.T) { + q := setupQueue() + var result message.Envelope + var testDispatcher DispatchReceiveFunc = func(m message.Envelope, r message.Receiver) { result = m } + + q.Start(testDispatcher, nil) + q.Add(message.NewAnonymous("")) + if result.Body().(string) != "" { + t.Errorf("Failed receiving queued message") + } +} + +func TestQueueRemove(t *testing.T) { + q := setupBufferedQueue(1) + q.Add(message.NewAnonymous("")) + result := q.Remove() + if result.Body().(string) != "" { + t.Errorf("Failed removing queued message") + } +} + +func TestQueueMetrics(t *testing.T) { + q := setupBufferedQueue(1) + q.Add(message.NewAnonymous("")) + if q.Len() != 1 { + t.Errorf("Failed checking queue length") + } +} diff --git a/feudal/interfaces.go b/feudal/interfaces.go new file mode 100644 index 0000000..c55648f --- /dev/null +++ b/feudal/interfaces.go @@ -0,0 +1,83 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package feudal + +import( + "feudal/message" + "feudal/identity" +) + +type Transporter interface { + Transport(m message.Envelope) +} + +type WorkerRouter interface { + message.Sender + Id() identity.Identifier + Type() string + State() string + Stop() +} + +type Factory func(name string, ruler WorkerRouter) Worker + +type Creator interface { + WorkerWhence(Factory, string) WorkerRouter +} + +type Division interface { + Join(w Worker) + Disolve() +} + +type Worker interface { + message.Receiver + SetContext(c interface{}) +} + +type Subjects interface { + Add(name string, worker WorkerRouter) + Remove(name string) + Get(name string) WorkerRouter + message.Sender + Disolve() +} + +type Context interface { + Creator + message.Receiver + Disolve() + Self() WorkerRouter + Subjects() Subjects + Trigger(transition string) + Subscribers(topic string) Subjects +} + +type Audience interface { + SwearFealty(Sovereign) + Sovereign() Sovereign +} + +type Sovereign interface { + Creator + Subjects() Subjects +} + +/* + +// provides interfaces for managing the worker tree (organization) +type Context interface { + Creator + System() System + Ruler() feudal.Worker + Subjects() Subjects +} + +type System interface { + identity.Addresser + Creator + Terminator + WorkersPath(path string) subjects.Selector +} + +*/ diff --git a/feudal/messenger.go b/feudal/messenger.go new file mode 100644 index 0000000..90e2539 --- /dev/null +++ b/feudal/messenger.go @@ -0,0 +1,11 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package feudal + +import ( + "feudal/message" +) + +type Messenger interface { + Deliver(message message.Envelope) +} diff --git a/interrogator/interrogator.go b/interrogator/interrogator.go new file mode 100644 index 0000000..f53131c --- /dev/null +++ b/interrogator/interrogator.go @@ -0,0 +1,35 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package interrogator; + +import ( + "feudal/message" + "feudal/identity" +) + +type Interrogator chan message.Envelope + +// workerrouter interface: this is degenerate +func (i Interrogator) Type() string { return "" } +func (i Interrogator) Stop() {} +func (i Interrogator) State() string { return "started" } + +func (i Interrogator) Id() identity.Identifier { + return nil +} + +func (i Interrogator) Address() string { + return "" +} + +func (i Interrogator) Send(m message.Envelope) { + i <- m +} + +func New() Interrogator { + return make(chan message.Envelope) +} + +func NewBuffered(size int) Interrogator { + return make(chan message.Envelope, size) +} diff --git a/interrogator/interrogator_test.go b/interrogator/interrogator_test.go new file mode 100644 index 0000000..3e05d78 --- /dev/null +++ b/interrogator/interrogator_test.go @@ -0,0 +1,28 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package interrogator; + +import ( + "testing" + "feudal/message" +) + +func TestInterrogator(t *testing.T) { + i := New() + m := message.New("foo", i) + go i.Send(m) + r := <- i + if r.Body() != "foo" { + t.Errorf("Invalid message") + } +} + +func TestBufferedInterrogator(t *testing.T) { + i := NewBuffered(1) + m := message.New("foo", i) + i.Send(m) + r := <- i + if r.Body() != "foo" { + t.Errorf("Invalid message") + } +} diff --git a/worker/clock.go b/worker/clock.go new file mode 100644 index 0000000..648e7fa --- /dev/null +++ b/worker/clock.go @@ -0,0 +1,45 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package worker; + +import ( + "feudal/message" + "time" +) + +// A ticker thread that implements the runner interface and generates tick events for the given context + +type Clock struct { + ticker *time.Ticker + quit chan bool + stopped chan bool +} + +func NewClock(interval int) *Clock { + return &Clock{ ticker: time.NewTicker(time.Duration(interval) * time.Millisecond), quit: make(chan bool), stopped: make(chan bool) } +} + +func (l *Clock) Start(d Dispatcher, c Context) { + go func() { + for { + select { + case tick := <- l.ticker.C: + d.Dispatch(message.New(&message.ClockTick{ T:tick }, c), c) + case <- l.quit: + l.ticker.Stop() + if v := c.Division(); v != nil { + v.Disolve() + } + close(l.stopped) + return + } + } + }() +} + +func (l *Clock) Stop() { + close(l.quit) + select { + case <- l.stopped: + } +} diff --git a/worker/clock_test.go b/worker/clock_test.go new file mode 100644 index 0000000..59dee9e --- /dev/null +++ b/worker/clock_test.go @@ -0,0 +1,55 @@ +// Copyright 2024 Matthew Rich . All rights reserved. + +package worker; + +import ( + "testing" + "log" + "feudal/message" +) + +type testClockWorker struct { + AbstractContext + ticker Runner + recipients []message.Sender +} + +func (w *testClockWorker) Receive(m message.Envelope) { + switch m.Body().(type) { + case *message.SubscribeToClockTick: + w.Subscribe(m.Sender()) + case *message.ClockTick: + for _,r := range(w.recipients) { + r.Send(m) + } + default: + log.Fatalln("unexpected message", m.Body()) + } +} + +func (w *testClockWorker) Subscribe(recipient message.Sender) { + w.recipients = append(w.recipients, recipient) +} + +func (w *testClockWorker) Stop() { + w.ticker.Stop() + w.AbstractContext.Queue.Stop() +} + +func TestNewClock(t *testing.T) { + nw := &testClockWorker{ ticker: NewClock(500) } + + subscriber := NewInterrogator() + + nw.Init(nw, "clocktest") + nw.Start(DefaultDispatcher(), nw) + nw.ticker.Start(DefaultDispatcher(), nw) + + nw.Send(message.New(&message.SubscribeToClockTick{}, subscriber)) + tick := <- subscriber + switch tick.Body().(type) { + case *message.ClockTick: + default: + t.Errorf("invalid message") + } +}