73 lines
1.4 KiB
Go
73 lines
1.4 KiB
Go
|
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
|
||
|
|
||
|
package context;
|
||
|
|
||
|
import (
|
||
|
"log"
|
||
|
"feudal/message"
|
||
|
"feudal/feudal"
|
||
|
)
|
||
|
|
||
|
type Runner interface {
|
||
|
Start(d Dispatcher, c feudal.Context)
|
||
|
Stop()
|
||
|
}
|
||
|
|
||
|
type Queue struct {
|
||
|
receive chan message.Envelope
|
||
|
quit chan bool
|
||
|
stopped chan bool
|
||
|
}
|
||
|
|
||
|
func NewQueue() *Queue {
|
||
|
return &Queue{ receive: make(chan message.Envelope), quit: make(chan bool), stopped: make(chan bool) }
|
||
|
}
|
||
|
|
||
|
func NewBufferedQueue(size int) *Queue {
|
||
|
return &Queue{ receive: make(chan message.Envelope, size), quit: make(chan bool), stopped: make(chan bool) }
|
||
|
}
|
||
|
|
||
|
func (q *Queue) Add(m message.Envelope) {
|
||
|
q.receive <- m
|
||
|
}
|
||
|
|
||
|
func (q *Queue) Remove() message.Envelope {
|
||
|
return <- q.receive
|
||
|
}
|
||
|
|
||
|
func (q *Queue) contextTrigger(c feudal.Context, trigger string) {
|
||
|
if c != nil {
|
||
|
log.Printf("triggering %s on %s", trigger, c.Address())
|
||
|
c.Trigger(trigger)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (q *Queue) Start(d Dispatcher, c feudal.Context) {
|
||
|
go func() {
|
||
|
q.contextTrigger(c, "start")
|
||
|
for {
|
||
|
select {
|
||
|
case m := <- q.receive:
|
||
|
d.Dispatch(m, c)
|
||
|
case <- q.quit:
|
||
|
close(q.receive)
|
||
|
c.Disolve() // change to trigger
|
||
|
close(q.stopped)
|
||
|
q.contextTrigger(c, "stop")
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func (q *Queue) Stop() {
|
||
|
close(q.quit)
|
||
|
select {
|
||
|
case <- q.stopped:
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (q *Queue) Len() int {
|
||
|
return len(q.receive)
|
||
|
}
|