diff --git a/stream/forwarding/forwarding.go b/stream/forwarding/forwarding.go index bc0b2ad..e6bd55d 100644 --- a/stream/forwarding/forwarding.go +++ b/stream/forwarding/forwarding.go @@ -82,6 +82,10 @@ func RegisterStream(name string) error { // SendPacket forward data to all FFMpeg instances related to the stream name func SendPacket(name string, data []byte) { stdin := ffmpegInputStreams[name] + if stdin == nil { + // Don't need to forward stream + return + } _, err := (*stdin).Write(data) if err != nil { log.Printf("Error while sending a packet to external streaming server for key %s: %s", name, err) @@ -91,6 +95,10 @@ func SendPacket(name string, data []byte) { // CloseConnection When the stream is ended, close FFMPEG instances func CloseConnection(name string) error { ffmpeg := ffmpegInstances[name] + if ffmpeg == nil { + // No stream to close + return nil + } if err := ffmpeg.Process.Kill(); err != nil { return err } diff --git a/stream/srt/srt.go b/stream/srt/srt.go index a584d1f..a11f173 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -40,7 +40,9 @@ func Serve(cfg *Options) { log.Printf("SRT server listening on %s", cfg.ListenAddress) host, port := splitHostPort(cfg.ListenAddress) sck := srtgo.NewSrtSocket(host, uint16(port), options) - sck.Listen(cfg.MaxClients) + if err := sck.Listen(cfg.MaxClients); err != nil { + log.Fatal("Unable to listen to SRT clients:", err) + } // FIXME: See srtgo.SocketOptions and value, err := s.GetSockOptString to get parameters // http://ffmpeg.org/ffmpeg-protocols.html#srt diff --git a/stream/srt/srt_test.go b/stream/srt/srt_test.go index 403edf8..cf16fc4 100644 --- a/stream/srt/srt_test.go +++ b/stream/srt/srt_test.go @@ -1,9 +1,14 @@ package srt import ( + "bufio" + "log" + "os/exec" "testing" + "time" ) +// TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à func TestSplitHostPort(t *testing.T) { host, port := splitHostPort("127.0.0.1:1234") if host != "127.0.0.1" || port != 1234 { @@ -15,3 +20,40 @@ func TestSplitHostPort(t *testing.T) { t.Errorf("splitHostPort returned %v:%d != 0.0.0.0:1234", host, port) } } + +// TestServeSRT Serve a SRT server, stream content during 5 seconds and ensure that it is well received +func TestServeSRT(t *testing.T) { + go Serve(&Options{ListenAddress: ":9711", MaxClients: 2}) + + ffmpeg := exec.Command("ffmpeg", + "-i", "http://ftp.crans.org/events/Blender%20OpenMovies/big_buck_bunny_480p_stereo.ogg", + "-f", "flv", "srt://127.0.0.1:9711") + + output, err := ffmpeg.StdoutPipe() + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + t.Fatal("Error while querying ffmpeg output:", err) + } + + if err := ffmpeg.Start(); err != nil { + t.Fatal("Error while starting ffmpeg:", err) + } + + go func() { + scanner := bufio.NewScanner(output) + for scanner.Scan() { + log.Printf("[FFMPEG TEST] %s", scanner.Text()) + } + }() + + go func() { + scanner := bufio.NewScanner(errOutput) + for scanner.Scan() { + log.Printf("[FFMPEG ERR TEST] %s", scanner.Text()) + } + }() + + time.Sleep(5000000000) // Delay is in nanoseconds, here 5s + + // TODO Check that the stream ran +}