// Copyright 2024 Matthew Rich . All rights reserved. package context; import ( "log" "feudal/message" "feudal/feudal" ) type Runner interface { Start(d Dispatcher, c feudal.Context) Stop() } type Queue struct { receive chan message.Envelope quit chan bool stopped chan bool } func NewQueue() *Queue { return &Queue{ receive: make(chan message.Envelope), quit: make(chan bool), stopped: make(chan bool) } } func NewBufferedQueue(size int) *Queue { return &Queue{ receive: make(chan message.Envelope, size), quit: make(chan bool), stopped: make(chan bool) } } func (q *Queue) Add(m message.Envelope) { q.receive <- m } func (q *Queue) Remove() message.Envelope { return <- q.receive } func (q *Queue) contextTrigger(c feudal.Context, trigger string) { if c != nil { log.Printf("triggering %s on %s", trigger, c.Address()) c.Trigger(trigger) } } func (q *Queue) Start(d Dispatcher, c feudal.Context) { go func() { q.contextTrigger(c, "start") for { select { case m := <- q.receive: d.Dispatch(m, c) case <- q.quit: close(q.receive) c.Disolve() // change to trigger close(q.stopped) q.contextTrigger(c, "stop") return } } }() } func (q *Queue) Stop() { close(q.quit) select { case <- q.stopped: } } func (q *Queue) Len() int { return len(q.receive) }