diff --git a/stream/messaging.go b/stream/messaging.go index 71e6b6a..05d55ad 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -9,7 +9,7 @@ type Stream struct { Broadcast chan<- []byte // Use a map to be able to delete an item - outputs map[chan<- []byte]struct{} + outputs map[chan []byte]struct{} // Mutex to lock this ressource lock sync.Mutex @@ -20,7 +20,7 @@ func New() *Stream { s := &Stream{} broadcast := make(chan []byte, 64) s.Broadcast = broadcast - s.outputs = make(map[chan<- []byte]struct{}) + s.outputs = make(map[chan []byte]struct{}) go s.run(broadcast) return s } @@ -34,9 +34,9 @@ func (s *Stream) run(broadcast <-chan []byte) { select { case output <- msg: default: - // Remove output if failed - delete(s.outputs, output) - close(output) + // If full, do a ring buffer + <-output + output <- msg } } }() @@ -57,14 +57,14 @@ func (s *Stream) Close() { } // Register a new output on a stream -func (s *Stream) Register(output chan<- []byte) { +func (s *Stream) Register(output chan []byte) { s.lock.Lock() defer s.lock.Unlock() s.outputs[output] = struct{}{} } // Unregister removes an output -func (s *Stream) Unregister(output chan<- []byte) { +func (s *Stream) Unregister(output chan []byte) { s.lock.Lock() defer s.lock.Unlock() diff --git a/stream/srt/handler.go b/stream/srt/handler.go index c40036e..da6528e 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -5,23 +5,30 @@ import ( "log" "github.com/haivision/srtgo" + "gitlab.crans.org/nounous/ghostream/stream" ) -func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[string][]chan Packet, forwardingChannel, webrtcChannel chan Packet) { +func handleStreamer(socket *srtgo.SrtSocket, streams map[string]stream.Stream, name string) { + // Check stream does not exist + if _, ok := streams[name]; ok { + log.Print("Stream already exists, refusing new streamer") + socket.Close() + return + } + + // Create stream log.Printf("New SRT streamer for stream %s", name) + st := *stream.New() + streams[name] = st // Create a new buffer // UDP packet cannot be larger than MTU (1500) buff := make([]byte, 1500) - // Setup stream forwarding - forwardingChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} - webrtcChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} - // Read RTP packets forever and send them to the WebRTC Client for { // 5s timeout - n, err := s.Read(buff, 5000) + n, err := socket.Read(buff, 5000) if err != nil { log.Println("Error occurred while reading SRT socket:", err) break @@ -33,40 +40,50 @@ func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[stri break } - // Send raw packet to other streams + // Send raw data to other streams // Copy data in another buffer to ensure that the data would not be overwritten + // FIXME: might be unnecessary data := make([]byte, n) copy(data, buff[:n]) - forwardingChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - webrtcChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - for _, dataChannel := range clientDataChannels[name] { - dataChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - } + st.Broadcast <- data } - forwardingChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} - webrtcChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} + // Close stream + st.Close() + socket.Close() + delete(streams, name) } -func handleViewer(s *srtgo.SrtSocket, name string, dataChannel chan Packet, dataChannels map[string][]chan Packet) { - // FIXME Should not pass all dataChannels to one viewer - +func handleViewer(s *srtgo.SrtSocket, streams map[string]stream.Stream, name string) { log.Printf("New SRT viewer for stream %s", name) - // Receive packets from channel and send them + // Get requested stream + st, ok := streams[name] + if !ok { + log.Println("Stream does not exist, refusing new viewer") + return + } + + // Register new output + c := make(chan []byte, 128) + st.Register(c) + + // Receive data and send them for { - packet := <-dataChannel - if packet.PacketType == "sendData" { - _, err := s.Write(packet.Data, 10000) - if err != nil { - s.Close() - for i, channel := range dataChannels[name] { - if channel == dataChannel { - dataChannels[name] = append(dataChannels[name][:i], dataChannels[name][i+1:]...) - } - } - return - } + data := <-c + if len(data) < 1 { + log.Print("Remove SRT viewer because of end of stream") + break + } + + _, err := s.Write(data, 1000) + if err != nil { + log.Printf("Remove SRT viewer because of sending error, %s", err) + break } } + + // Close output + st.Unregister(c) + s.Close() } diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 916441a..dad5f4e 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -12,10 +12,7 @@ import ( "github.com/haivision/srtgo" "gitlab.crans.org/nounous/ghostream/auth" -) - -var ( - clientDataChannels map[string][]chan Packet + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds web package configuration @@ -25,13 +22,6 @@ type Options struct { MaxClients int } -// Packet contains the necessary data to broadcast events like stream creating, packet receiving or stream closing. -type Packet struct { - Data []byte - PacketType string - StreamName string -} - // Split host and port from listen address func splitHostPort(hostport string) (string, uint16, error) { host, portS, err := net.SplitHostPort(hostport) @@ -48,13 +38,8 @@ func splitHostPort(hostport string) (string, uint16, error) { return host, uint16(port64), nil } -// GetNumberConnectedSessions get the number of currently connected clients -func GetNumberConnectedSessions(streamID string) int { - return len(clientDataChannels[streamID]) -} - // Serve SRT server -func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChannel chan Packet) { +func Serve(streams map[string]stream.Stream, authBackend auth.Backend, cfg *Options) { if !cfg.Enabled { // SRT is not enabled, ignore return @@ -75,8 +60,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan log.Fatal("Unable to listen for SRT clients:", err) } - clientDataChannels = make(map[string][]chan Packet) - for { // Wait for new connection s, err := sck.Accept() @@ -94,10 +77,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan } split := strings.Split(streamID, ":") - if clientDataChannels[streamID] == nil { - clientDataChannels[streamID] = make([]chan Packet, 0, cfg.MaxClients) - } - if len(split) > 1 { // password was provided so it is a streamer name, password := split[0], split[1] @@ -110,15 +89,13 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan } } - go handleStreamer(s, name, clientDataChannels, forwardingChannel, webrtcChannel) + go handleStreamer(s, streams, name) } else { // password was not provided so it is a viewer name := split[0] - dataChannel := make(chan Packet, 4096) - clientDataChannels[streamID] = append(clientDataChannels[streamID], dataChannel) - - go handleViewer(s, name, dataChannel, clientDataChannels) + // Send stream + go handleViewer(s, streams, name) } } } diff --git a/stream/srt/srt_test.go b/stream/srt/srt_test.go index 37f44f5..015537d 100644 --- a/stream/srt/srt_test.go +++ b/stream/srt/srt_test.go @@ -5,6 +5,8 @@ import ( "os/exec" "testing" "time" + + "gitlab.crans.org/nounous/ghostream/stream" ) // TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à @@ -55,7 +57,9 @@ func TestServeSRT(t *testing.T) { t.Skip("WARNING: FFMPEG is not installed. Skipping stream test") } - go Serve(&Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}, nil, nil, nil) + // Init streams messaging and SRT server + streams := make(map[string]stream.Stream) + go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}) ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10", @@ -78,6 +82,4 @@ func TestServeSRT(t *testing.T) { }() time.Sleep(5 * time.Second) // Delay is in nanoseconds, here 5s - - // TODO Kill SRT server }