diff --git a/main.go b/main.go index 32a1fda..354aaf1 100644 --- a/main.go +++ b/main.go @@ -109,7 +109,7 @@ func main() { webrtcChannel := make(chan srt.Packet, 65536) // Start stream, web and monitoring server, and stream forwarding - go forwarding.Serve(cfg.Forwarding, forwardingChannel) + go forwarding.Serve(forwardingChannel, cfg.Forwarding) go monitoring.Serve(&cfg.Monitoring) go srt.Serve(&cfg.Srt, authBackend, forwardingChannel, webrtcChannel) go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) diff --git a/stream/forwarding/forwarding.go b/stream/forwarding/forwarding.go index 31611f2..a1d4f76 100644 --- a/stream/forwarding/forwarding.go +++ b/stream/forwarding/forwarding.go @@ -2,10 +2,11 @@ package forwarding import ( "bufio" - "gitlab.crans.org/nounous/ghostream/stream/srt" "io" "log" "os/exec" + + "gitlab.crans.org/nounous/ghostream/stream/srt" ) // Options to configure the stream forwarding. @@ -13,27 +14,20 @@ import ( type Options map[string][]string var ( - cfg Options - forwardingChannel chan srt.Packet ffmpegInstances = make(map[string]*exec.Cmd) ffmpegInputStreams = make(map[string]*io.WriteCloser) ) -// Serve Load configuration and initialize SRT channel -func Serve(c Options, channel chan srt.Packet) { - cfg = c - forwardingChannel = channel +// Serve handles incoming packets from SRT and forward them to other external services +func Serve(inputChannel chan srt.Packet, cfg Options) { log.Printf("Stream forwarding initialized") - waitForPackets() -} - -func waitForPackets() { for { var err error = nil - packet := <-forwardingChannel + // Wait for packets + packet := <-inputChannel switch packet.PacketType { case "register": - err = RegisterStream(packet.StreamName) + err = RegisterStream(packet.StreamName, cfg) break case "sendData": err = SendPacket(packet.StreamName, packet.Data) @@ -52,7 +46,7 @@ func waitForPackets() { } // RegisterStream Declare a new open stream and create ffmpeg instances -func RegisterStream(name string) error { +func RegisterStream(name string, cfg Options) error { streams, exist := cfg[name] if !exist || len(streams) == 0 { // Nothing to do, not configured diff --git a/stream/forwarding/forwarding_test.go b/stream/forwarding/forwarding_test.go index ab8af06..1cb2a1a 100644 --- a/stream/forwarding/forwarding_test.go +++ b/stream/forwarding/forwarding_test.go @@ -2,15 +2,17 @@ package forwarding import ( "bufio" - "gitlab.crans.org/nounous/ghostream/stream/srt" "log" "os/exec" "testing" "time" + + "gitlab.crans.org/nounous/ghostream/stream/srt" ) // TestServeSRT Serve a SRT server, stream content during 5 seconds and ensure that it is well received func TestForwardStream(t *testing.T) { + // Check that ffmpeg is installed which := exec.Command("which", "ffmpeg") if err := which.Start(); err != nil { t.Fatal("Error while checking if ffmpeg got installed:", err) @@ -24,6 +26,7 @@ func TestForwardStream(t *testing.T) { t.Skip("WARNING: FFMPEG is not installed. Skipping stream test") } + // Start virtual RTMP server with ffmpeg forwardedFfmpeg := exec.Command("ffmpeg", "-y", // allow overwrite /dev/null "-listen", "1", "-i", "rtmp://127.0.0.1:1936/live/app", "-f", "null", "-c", "copy", "/dev/null") forwardingOutput, err := forwardedFfmpeg.StdoutPipe() @@ -31,7 +34,6 @@ func TestForwardStream(t *testing.T) { if err != nil { t.Fatal("Error while querying ffmpeg forwardingOutput:", err) } - if err := forwardedFfmpeg.Start(); err != nil { t.Fatal("Error while starting forwarding stream ffmpeg instance:", err) } @@ -53,13 +55,13 @@ func TestForwardStream(t *testing.T) { forwardingList := make(map[string][]string) forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} - forwardingChannel = make(chan srt.Packet) + forwardingChannel := make(chan srt.Packet) // Register forwarding stream list - go Serve(forwardingList, forwardingChannel) + go Serve(forwardingChannel, forwardingList) - // Serve HTTP Server - go srt.Serve(&srt.Options{ListenAddress: ":9712", MaxClients: 2}, nil, forwardingChannel) + // Serve SRT Server without authentification backend + go srt.Serve(&srt.Options{ListenAddress: ":9712", MaxClients: 2}, nil, forwardingChannel, nil) ffmpeg := exec.Command("ffmpeg", "-re", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10", diff --git a/stream/srt/srt_test.go b/stream/srt/srt_test.go index b3572db..bf8c718 100644 --- a/stream/srt/srt_test.go +++ b/stream/srt/srt_test.go @@ -36,7 +36,7 @@ func TestServeSRT(t *testing.T) { t.Skip("WARNING: FFMPEG is not installed. Skipping stream test") } - go Serve(&Options{ListenAddress: ":9711", MaxClients: 2}, nil, nil) + go Serve(&Options{ListenAddress: ":9711", MaxClients: 2}, nil, nil, nil) ffmpeg := exec.Command("ffmpeg", "-i", "http://ftp.crans.org/events/Blender%20OpenMovies/big_buck_bunny_480p_stereo.ogg", diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index 69a4a45..9893602 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -281,7 +281,6 @@ func Serve(remoteSdpChan, localSdpChan chan webrtc.SessionDescription, inputChan for { // Wait for incoming session description // then send the local description to browser - offer := <-remoteSdpChan - localSdpChan <- newPeerHandler(offer, cfg) + localSdpChan <- newPeerHandler(<-remoteSdpChan, cfg) } }