From e0911ab0505804565fd11f4a26283e3b046d1b90 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 10:21:40 +0200 Subject: [PATCH] Use []byte for stream data --- stream/messaging.go | 14 +++++++------- stream/messaging_test.go | 10 +++++----- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/stream/messaging.go b/stream/messaging.go index c22e6b0..71e6b6a 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -6,10 +6,10 @@ import "sync" // Stream makes packages able to subscribe to an incoming stream type Stream struct { // Incoming data come from this channel - Broadcast chan<- interface{} + Broadcast chan<- []byte // Use a map to be able to delete an item - outputs map[chan<- interface{}]struct{} + outputs map[chan<- []byte]struct{} // Mutex to lock this ressource lock sync.Mutex @@ -18,14 +18,14 @@ type Stream struct { // New creates a new stream. func New() *Stream { s := &Stream{} - broadcast := make(chan interface{}, 64) + broadcast := make(chan []byte, 64) s.Broadcast = broadcast - s.outputs = make(map[chan<- interface{}]struct{}) + s.outputs = make(map[chan<- []byte]struct{}) go s.run(broadcast) return s } -func (s *Stream) run(broadcast <-chan interface{}) { +func (s *Stream) run(broadcast <-chan []byte) { for msg := range broadcast { func() { s.lock.Lock() @@ -57,14 +57,14 @@ func (s *Stream) Close() { } // Register a new output on a stream -func (s *Stream) Register(output chan<- interface{}) { +func (s *Stream) Register(output chan<- []byte) { s.lock.Lock() defer s.lock.Unlock() s.outputs[output] = struct{}{} } // Unregister removes an output -func (s *Stream) Unregister(output chan<- interface{}) { +func (s *Stream) Unregister(output chan<- []byte) { s.lock.Lock() defer s.lock.Unlock() diff --git a/stream/messaging_test.go b/stream/messaging_test.go index 178ce75..b2845e2 100644 --- a/stream/messaging_test.go +++ b/stream/messaging_test.go @@ -7,7 +7,7 @@ import ( func TestWithoutOutputs(t *testing.T) { stream := New() defer stream.Close() - stream.Broadcast <- "hello world" + stream.Broadcast <- []byte("hello world") } func TestWithOneOutput(t *testing.T) { @@ -15,14 +15,14 @@ func TestWithOneOutput(t *testing.T) { defer stream.Close() // Register one output - output := make(chan interface{}, 64) + output := make(chan []byte, 64) stream.Register(output) // Try to pass one message - stream.Broadcast <- "hello world" + stream.Broadcast <- []byte("hello world") msg := <-output - if m, ok := msg.(string); !ok || m != "hello world" { - t.Error("Message has wrong type or content") + if string(msg) != "hello world" { + t.Errorf("Message has wrong content: %s != hello world", msg) } // Unregister