From 73e6be1274cb46fc68de0a3f24ecba8b449b530d Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 10:02:38 +0200 Subject: [PATCH] Add Stream messaging struct --- stream/messaging.go | 77 ++++++++++++++++++++++++++++++++++++++++ stream/messaging_test.go | 30 ++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 stream/messaging.go create mode 100644 stream/messaging_test.go diff --git a/stream/messaging.go b/stream/messaging.go new file mode 100644 index 0000000..c22e6b0 --- /dev/null +++ b/stream/messaging.go @@ -0,0 +1,77 @@ +// Package stream defines a structure to communication between inputs and outputs +package stream + +import "sync" + +// Stream makes packages able to subscribe to an incoming stream +type Stream struct { + // Incoming data come from this channel + Broadcast chan<- interface{} + + // Use a map to be able to delete an item + outputs map[chan<- interface{}]struct{} + + // Mutex to lock this ressource + lock sync.Mutex +} + +// New creates a new stream. +func New() *Stream { + s := &Stream{} + broadcast := make(chan interface{}, 64) + s.Broadcast = broadcast + s.outputs = make(map[chan<- interface{}]struct{}) + go s.run(broadcast) + return s +} + +func (s *Stream) run(broadcast <-chan interface{}) { + for msg := range broadcast { + func() { + s.lock.Lock() + defer s.lock.Unlock() + for output := range s.outputs { + select { + case output <- msg: + default: + // Remove output if failed + delete(s.outputs, output) + close(output) + } + } + }() + } + + // Incoming chan has been closed, close all outputs + s.lock.Lock() + defer s.lock.Unlock() + for ch := range s.outputs { + delete(s.outputs, ch) + close(ch) + } +} + +// Close the incoming chan, this will also delete all outputs +func (s *Stream) Close() { + close(s.Broadcast) +} + +// Register a new output on a stream +func (s *Stream) Register(output chan<- interface{}) { + s.lock.Lock() + defer s.lock.Unlock() + s.outputs[output] = struct{}{} +} + +// Unregister removes an output +func (s *Stream) Unregister(output chan<- interface{}) { + s.lock.Lock() + defer s.lock.Unlock() + + // Make sure we did not already close this output + _, ok := s.outputs[output] + if ok { + delete(s.outputs, output) + close(output) + } +} diff --git a/stream/messaging_test.go b/stream/messaging_test.go new file mode 100644 index 0000000..178ce75 --- /dev/null +++ b/stream/messaging_test.go @@ -0,0 +1,30 @@ +package stream + +import ( + "testing" +) + +func TestWithoutOutputs(t *testing.T) { + stream := New() + defer stream.Close() + stream.Broadcast <- "hello world" +} + +func TestWithOneOutput(t *testing.T) { + stream := New() + defer stream.Close() + + // Register one output + output := make(chan interface{}, 64) + stream.Register(output) + + // Try to pass one message + stream.Broadcast <- "hello world" + msg := <-output + if m, ok := msg.(string); !ok || m != "hello world" { + t.Error("Message has wrong type or content") + } + + // Unregister + stream.Unregister(output) +}