add context
Some checks failed
Declarative Tests / test (push) Waiting to run
Lint / golangci-lint (push) Has been cancelled

This commit is contained in:
Matthew Rich 2024-05-05 00:11:52 -07:00
parent c545200ade
commit 1b9d1b1fb1
14 changed files with 797 additions and 3 deletions

View File

@ -1,7 +1,8 @@
LDFLAGS?=--ldflags '-extldflags "-static"'
LDFLAGS?=--ldflags '-extldflags "-static"' --ldflags="-X 'main.commit=$(shell git rev-parse HEAD)' -X 'main.version=$(shell git describe --tags)' -X 'main.date=$(shell date '+%Y-%m-%d %T.%s%z')'"
export CGO_ENABLED=0
build:
test:
go test ./...
go test -coverprofile=artifacts/coverage.profile ./...
go tool cover -html=artifacts/coverage.profile -o artifacts/code-coverage.html

156
context/context.go Normal file
View File

@ -0,0 +1,156 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package context
import (
"fmt"
"strings"
"gitea.rosskeen.house/rosskeen.house/machine"
"feudal/feudal"
"feudal/identity"
"feudal/message"
"feudal/subjects"
)
// a context has the creator interface, identifier?
// has a factor.producer
// dispatcher calls receive on self
type Abstract struct {
name string
state machine.Stater
id identity.Identifier
self feudal.Worker
ruler feudal.WorkerRouter
subjects feudal.Subjects
messages *Queue
sovereign feudal.Sovereign
subscribers map[string]feudal.Subjects
}
func New(name string, ruler feudal.WorkerRouter, worker feudal.Worker) feudal.WorkerRouter {
var i identity.Identifier
var sovereign feudal.Sovereign = nil
if ruler == nil {
i = identity.NewRoot(name)
} else {
i = ruler.Id().CreateChild(name)
sovereign = ruler.(feudal.Audience).Sovereign()
}
a := &Abstract{ name: name, id: i, ruler: ruler, self: worker, messages: NewBufferedQueue(1024), state: NewWorkerStateMachine(), subjects: subjects.New(), sovereign: sovereign, subscribers: make(map[string]feudal.Subjects) }
a.subscribers["lifecycle"] = subjects.New()
a.subscribers["subjects"] = subjects.New()
a.state.AddSubscription("start", a)
a.state.AddSubscription("stop", a)
worker.SetContext(a)
a.messages.Start(DefaultDispatcher(a), a)
return a
}
func NewWorkerStateMachine() machine.Stater {
m := machine.New("initialized")
m.AddStates("initialized", "started", "stopped")
m.AddTransition("start", "initialized", "started")
m.AddTransition("stop", "started", "stopped")
return m
}
func (a *Abstract) Receive(m message.Envelope) {
a.self.Receive(m)
}
func (a *Abstract) Send(m message.Envelope) {
if a.State() == "stopped" {
if a.sovereign != nil {
dn := a.sovereign.Subjects().Get("devnull")
if dn != nil {
dn.Send(m)
}
}
} else {
a.messages.Add(m)
}
}
func (a *Abstract) Address() string {
return a.id.Address()
}
func (a *Abstract) Id() identity.Identifier {
return a.id
}
func (a *Abstract) Type() string {
typeName := fmt.Sprintf("%T", a.self)
n := strings.Split(typeName, ".")
return n[len(n) - 1]
}
func (a *Abstract) Disolve() {
if a.subjects != nil {
a.subjects.Disolve()
}
}
func (a *Abstract) Self() feudal.WorkerRouter {
return a
}
func (a *Abstract) Stop() {
if a.State() != "stopped" {
a.messages.Stop()
}
}
func (a *Abstract) WorkerWhence(f feudal.Factory, name string) feudal.WorkerRouter {
ruler := a
if ruler.isStarted() {
w := f(name, ruler)
wr := New(name, ruler, w)
a.subscribers["subjects"].Add(name, wr)
return wr
}
return nil
}
func (a *Abstract) State() string {
return string(a.state.CurrentState())
}
func (a *Abstract) isStarted() bool {
if a.State() == "started" {
return true
}
return false
}
func (a *Abstract) isStopped() bool {
if a.State() == "stopped" {
return true
}
return false
}
func (a *Abstract) Trigger(transition string) {
a.state.Trigger(transition)
}
func (a *Abstract) Subjects() feudal.Subjects {
return a.subscribers["subjects"]
}
func (a *Abstract) Subscribers(topic string) feudal.Subjects {
return a.subscribers[topic]
}
func (a *Abstract) Sovereign() feudal.Sovereign {
return a.sovereign
}
func (a *Abstract) SwearFealty(s feudal.Sovereign) {
a.sovereign = s
}
func (a *Abstract) Notify(m *machine.EventMessage) {
a.Send(message.New(m, a))
}

110
context/context_test.go Normal file
View File

@ -0,0 +1,110 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package context;
import (
"testing"
"log"
"gitea.rosskeen.house/rosskeen.house/machine"
"feudal/message"
"feudal/interrogator"
"feudal/tests"
"feudal/tests/ponger"
"feudal/feudal"
)
type testWorker struct {
context interface{}
}
func (w *testWorker) Receive(m message.Envelope) {
switch m.Body().(type) {
case string:
m.Sender().Send(m)
case *machine.EventMessage:
log.Printf("event: %s on %s", m.Body(), w.context.(feudal.Context).Address())
default:
log.Fatal("unexpected message")
}
}
func (w *testWorker) Address() string { return w.context.(*Abstract).id.Address() }
func (w *testWorker) SetContext(c interface{}) { w.context = c }
func setupTestWorker() feudal.WorkerRouter {
return New("test", nil, &testWorker{})
}
func TestNew(t *testing.T) {
w := New("test", nil, &testWorker{})
defer w.Stop()
r := tests.SendTestMessage(w, "test message")
tests.AssertMessageValue(r, "test message")
}
func TestWorkerType(t *testing.T) {
nw := setupTestWorker()
defer nw.Stop()
ty := nw.Type()
if ty != "testWorker" {
t.Errorf("Type() returned incorrect type %s", ty)
}
}
func TestWorkerStop(t *testing.T) {
nw := setupTestWorker()
defer nw.Stop()
}
func TestWorkerStateInitial(t *testing.T) {
nw := setupTestWorker()
defer nw.Stop()
if nw.State() != "initialized" {
t.Errorf("Invalid state")
}
}
func TestWorkerStateStarted(t *testing.T) {
i := interrogator.NewBuffered(10)
nw := setupTestWorker()
nw.(feudal.Context).Subscribers("lifecycle").Add("test", i)
defer nw.Stop()
_ = <- i
_ = <- i
if ! nw.(*Abstract).isStarted() {
t.Errorf("worker failed to transition to started state")
}
}
func TestWorkerWhenceParentState(t *testing.T) {
nw := setupTestWorker()
nw.Stop()
p1 := nw.(feudal.Context).WorkerWhence(ponger.PongerFactory(), "p1")
if p1 != nil {
t.Errorf("Created worker from invalid state")
}
}
func TestWorkerWhence(t *testing.T) {
i := interrogator.NewBuffered(10)
nw := setupTestWorker()
nw.(feudal.Context).Subscribers("lifecycle").Add("test", i)
enter_start_nw := <- i
log.Printf("%s", enter_start_nw.(*message.AbstractEnvelope))
exit_start_nw := <- i
log.Printf("%s", exit_start_nw.(*message.AbstractEnvelope))
ip := interrogator.NewBuffered(10)
p1 := nw.(feudal.Context).WorkerWhence(ponger.PongerFactory(), "p1")
p1.(feudal.Context).Subscribers("lifecycle").Add("testip", ip)
sp1 := <- ip
log.Printf("%s %s", sp1.Sender().Address(), sp1.(*message.AbstractEnvelope))
p2 := p1.(feudal.Context).WorkerWhence(ponger.PongerFactory(), "p2")
if nw.State() == "started" && p1.State() == "started" && p2.State() == "started" {
}
}

41
context/dispatcher.go Normal file
View File

@ -0,0 +1,41 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package context;
import (
"feudal/feudal"
"gitea.rosskeen.house/rosskeen.house/machine"
"feudal/message"
)
type DispatchReceiveFunc func(m message.Envelope, r message.Receiver)
type Dispatcher interface {
Dispatch(m message.Envelope, r message.Receiver)
}
type DispatchReceive struct {
worker feudal.Context
}
func (d *DispatchReceive) DispatchToStateSubscribers(m message.Envelope, r message.Receiver) {
switch b := m.Body().(type) {
case *machine.EventMessage:
recipients := d.worker.Subscribers("lifecycle")
var event machine.EventMessage = *b
recipients.Send(message.New(event, m.Sender()))
}
}
func (d *DispatchReceive) Dispatch(m message.Envelope, r message.Receiver) {
d.DispatchToStateSubscribers(m, r)
r.Receive(m)
}
func DefaultDispatcher(worker feudal.Context) Dispatcher {
return &DispatchReceive{ worker: worker }
}
func (f DispatchReceiveFunc) Dispatch(m message.Envelope, r message.Receiver) {
f(m, r)
}

View File

@ -0,0 +1,81 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package context;
import (
"testing"
"feudal/message"
"feudal/interrogator"
"feudal/tests/receiver"
)
func TestRunner(t *testing.T) {
q := NewQueue()
q.Start(DefaultDispatcher(nil), &receiver.Ctx{})
i := interrogator.New()
q.receive <- message.New("test message", i)
r := <- i
if r.Body() != "test message" {
t.Errorf("invalid message")
}
}
func TestRunnerStartStop(t *testing.T) {
q := NewQueue()
worker := &receiver.Ctx{}
q.Start(DefaultDispatcher(worker), worker)
select {
case _, ok := <- q.receive:
if ! ok {
t.Errorf("receive channel closed")
}
default:
}
select {
case _, ok := <- q.quit:
if ! ok {
t.Errorf("quit channel closed")
}
default:
}
select {
case _, ok := <- q.stopped:
if ! ok {
t.Errorf("stopped channel closed")
}
default:
}
q.Stop()
select {
case _, ok := <- q.receive:
if ok {
t.Errorf("receive channel not closed")
}
default:
t.Errorf("receive channel not closed")
}
select {
case _, ok := <- q.quit:
if ok {
t.Errorf("quit channel not closed")
}
default:
t.Errorf("quit channel not closed")
}
select {
case _, ok := <- q.stopped:
if ok {
t.Errorf("stopped channel not closed")
}
default:
t.Errorf("stopped channel not closed")
}
}

15
context/handler.go Normal file
View File

@ -0,0 +1,15 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package context
import (
"feudal/message"
)
// Register a handler for a message event type
type MessageHandler func(m message.Envelope)
func (h MessageHandler) Handle(m message.Envelope) {
h(m)
}

72
context/queue.go Normal file
View File

@ -0,0 +1,72 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package context;
import (
"log"
"feudal/message"
"feudal/feudal"
)
type Runner interface {
Start(d Dispatcher, c feudal.Context)
Stop()
}
type Queue struct {
receive chan message.Envelope
quit chan bool
stopped chan bool
}
func NewQueue() *Queue {
return &Queue{ receive: make(chan message.Envelope), quit: make(chan bool), stopped: make(chan bool) }
}
func NewBufferedQueue(size int) *Queue {
return &Queue{ receive: make(chan message.Envelope, size), quit: make(chan bool), stopped: make(chan bool) }
}
func (q *Queue) Add(m message.Envelope) {
q.receive <- m
}
func (q *Queue) Remove() message.Envelope {
return <- q.receive
}
func (q *Queue) contextTrigger(c feudal.Context, trigger string) {
if c != nil {
log.Printf("triggering %s on %s", trigger, c.Address())
c.Trigger(trigger)
}
}
func (q *Queue) Start(d Dispatcher, c feudal.Context) {
go func() {
q.contextTrigger(c, "start")
for {
select {
case m := <- q.receive:
d.Dispatch(m, c)
case <- q.quit:
close(q.receive)
c.Disolve() // change to trigger
close(q.stopped)
q.contextTrigger(c, "stop")
return
}
}
}()
}
func (q *Queue) Stop() {
close(q.quit)
select {
case <- q.stopped:
}
}
func (q *Queue) Len() int {
return len(q.receive)
}

61
context/queue_test.go Normal file
View File

@ -0,0 +1,61 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package context
import (
"testing"
"feudal/message"
)
func setupQueue() *Queue {
q := NewQueue()
return q
}
func setupBufferedQueue(size int) *Queue {
return NewBufferedQueue(size)
}
func TestNewQueue(t *testing.T) {
q := setupQueue()
if q == nil {
t.Errorf("Failed creating new queue")
}
}
func TestNewBufferedQueue(t *testing.T) {
q := setupBufferedQueue(10)
if q == nil {
t.Errorf("Failed creating new buffered queue")
}
}
func TestQueueAdd(t *testing.T) {
q := setupQueue()
var result message.Envelope
var testDispatcher DispatchReceiveFunc = func(m message.Envelope, r message.Receiver) { result = m }
q.Start(testDispatcher, nil)
q.Add(message.NewAnonymous(""))
if result.Body().(string) != "" {
t.Errorf("Failed receiving queued message")
}
}
func TestQueueRemove(t *testing.T) {
q := setupBufferedQueue(1)
q.Add(message.NewAnonymous(""))
result := q.Remove()
if result.Body().(string) != "" {
t.Errorf("Failed removing queued message")
}
}
func TestQueueMetrics(t *testing.T) {
q := setupBufferedQueue(1)
q.Add(message.NewAnonymous(""))
if q.Len() != 1 {
t.Errorf("Failed checking queue length")
}
}

83
feudal/interfaces.go Normal file
View File

@ -0,0 +1,83 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package feudal
import(
"feudal/message"
"feudal/identity"
)
type Transporter interface {
Transport(m message.Envelope)
}
type WorkerRouter interface {
message.Sender
Id() identity.Identifier
Type() string
State() string
Stop()
}
type Factory func(name string, ruler WorkerRouter) Worker
type Creator interface {
WorkerWhence(Factory, string) WorkerRouter
}
type Division interface {
Join(w Worker)
Disolve()
}
type Worker interface {
message.Receiver
SetContext(c interface{})
}
type Subjects interface {
Add(name string, worker WorkerRouter)
Remove(name string)
Get(name string) WorkerRouter
message.Sender
Disolve()
}
type Context interface {
Creator
message.Receiver
Disolve()
Self() WorkerRouter
Subjects() Subjects
Trigger(transition string)
Subscribers(topic string) Subjects
}
type Audience interface {
SwearFealty(Sovereign)
Sovereign() Sovereign
}
type Sovereign interface {
Creator
Subjects() Subjects
}
/*
// provides interfaces for managing the worker tree (organization)
type Context interface {
Creator
System() System
Ruler() feudal.Worker
Subjects() Subjects
}
type System interface {
identity.Addresser
Creator
Terminator
WorkersPath(path string) subjects.Selector
}
*/

11
feudal/messenger.go Normal file
View File

@ -0,0 +1,11 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package feudal
import (
"feudal/message"
)
type Messenger interface {
Deliver(message message.Envelope)
}

View File

@ -0,0 +1,35 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package interrogator;
import (
"feudal/message"
"feudal/identity"
)
type Interrogator chan message.Envelope
// workerrouter interface: this is degenerate
func (i Interrogator) Type() string { return "" }
func (i Interrogator) Stop() {}
func (i Interrogator) State() string { return "started" }
func (i Interrogator) Id() identity.Identifier {
return nil
}
func (i Interrogator) Address() string {
return ""
}
func (i Interrogator) Send(m message.Envelope) {
i <- m
}
func New() Interrogator {
return make(chan message.Envelope)
}
func NewBuffered(size int) Interrogator {
return make(chan message.Envelope, size)
}

View File

@ -0,0 +1,28 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package interrogator;
import (
"testing"
"feudal/message"
)
func TestInterrogator(t *testing.T) {
i := New()
m := message.New("foo", i)
go i.Send(m)
r := <- i
if r.Body() != "foo" {
t.Errorf("Invalid message")
}
}
func TestBufferedInterrogator(t *testing.T) {
i := NewBuffered(1)
m := message.New("foo", i)
i.Send(m)
r := <- i
if r.Body() != "foo" {
t.Errorf("Invalid message")
}
}

45
worker/clock.go Normal file
View File

@ -0,0 +1,45 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package worker;
import (
"feudal/message"
"time"
)
// A ticker thread that implements the runner interface and generates tick events for the given context
type Clock struct {
ticker *time.Ticker
quit chan bool
stopped chan bool
}
func NewClock(interval int) *Clock {
return &Clock{ ticker: time.NewTicker(time.Duration(interval) * time.Millisecond), quit: make(chan bool), stopped: make(chan bool) }
}
func (l *Clock) Start(d Dispatcher, c Context) {
go func() {
for {
select {
case tick := <- l.ticker.C:
d.Dispatch(message.New(&message.ClockTick{ T:tick }, c), c)
case <- l.quit:
l.ticker.Stop()
if v := c.Division(); v != nil {
v.Disolve()
}
close(l.stopped)
return
}
}
}()
}
func (l *Clock) Stop() {
close(l.quit)
select {
case <- l.stopped:
}
}

55
worker/clock_test.go Normal file
View File

@ -0,0 +1,55 @@
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package worker;
import (
"testing"
"log"
"feudal/message"
)
type testClockWorker struct {
AbstractContext
ticker Runner
recipients []message.Sender
}
func (w *testClockWorker) Receive(m message.Envelope) {
switch m.Body().(type) {
case *message.SubscribeToClockTick:
w.Subscribe(m.Sender())
case *message.ClockTick:
for _,r := range(w.recipients) {
r.Send(m)
}
default:
log.Fatalln("unexpected message", m.Body())
}
}
func (w *testClockWorker) Subscribe(recipient message.Sender) {
w.recipients = append(w.recipients, recipient)
}
func (w *testClockWorker) Stop() {
w.ticker.Stop()
w.AbstractContext.Queue.Stop()
}
func TestNewClock(t *testing.T) {
nw := &testClockWorker{ ticker: NewClock(500) }
subscriber := NewInterrogator()
nw.Init(nw, "clocktest")
nw.Start(DefaultDispatcher(), nw)
nw.ticker.Start(DefaultDispatcher(), nw)
nw.Send(message.New(&message.SubscribeToClockTick{}, subscriber))
tick := <- subscriber
switch tick.Body().(type) {
case *message.ClockTick:
default:
t.Errorf("invalid message")
}
}