feudal/context/queue.go

73 lines
1.4 KiB
Go
Raw Permalink Normal View History

2024-05-05 07:11:52 +00:00
// Copyright 2024 Matthew Rich <matthewrich.conf@gmail.com>. All rights reserved.
package context;
import (
"log"
"feudal/message"
"feudal/feudal"
)
type Runner interface {
Start(d Dispatcher, c feudal.Context)
Stop()
}
type Queue struct {
receive chan message.Envelope
quit chan bool
stopped chan bool
}
func NewQueue() *Queue {
return &Queue{ receive: make(chan message.Envelope), quit: make(chan bool), stopped: make(chan bool) }
}
func NewBufferedQueue(size int) *Queue {
return &Queue{ receive: make(chan message.Envelope, size), quit: make(chan bool), stopped: make(chan bool) }
}
func (q *Queue) Add(m message.Envelope) {
q.receive <- m
}
func (q *Queue) Remove() message.Envelope {
return <- q.receive
}
func (q *Queue) contextTrigger(c feudal.Context, trigger string) {
if c != nil {
log.Printf("triggering %s on %s", trigger, c.Address())
c.Trigger(trigger)
}
}
func (q *Queue) Start(d Dispatcher, c feudal.Context) {
go func() {
q.contextTrigger(c, "start")
for {
select {
case m := <- q.receive:
d.Dispatch(m, c)
case <- q.quit:
close(q.receive)
c.Disolve() // change to trigger
close(q.stopped)
q.contextTrigger(c, "stop")
return
}
}
}()
}
func (q *Queue) Stop() {
close(q.quit)
select {
case <- q.stopped:
}
}
func (q *Queue) Len() int {
return len(q.receive)
}