From 969d36fab419055831f8a8c6ad398ae28ac2bd7b Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Tue, 29 Sep 2020 20:56:31 +0200 Subject: [PATCH 1/7] Use FFMPEG to broadcast the stream on Twitch --- stream/srt/srt.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/stream/srt/srt.go b/stream/srt/srt.go index f920046..95157f1 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -1,8 +1,11 @@ package srt import ( + "bufio" + "fmt" "log" "net" + "os/exec" "strconv" "github.com/haivision/srtgo" @@ -48,8 +51,14 @@ func Serve(cfg *Options) { continue } + // Launch ffmpeg to stream on other RTMP servers + ffmpeg := exec.Command("ffmpeg", "-re", "-i", "pipe:0", "-f", "flv", "-c:v", "libx264", "-preset", "veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "44100", fmt.Sprintf("rtmp://live.twitch.tv/app/%s", "TWITCH_STREAM_KEY")) //nolint + ffmpegIn, _ := ffmpeg.StdinPipe() + ffmpegOut, _ := ffmpeg.StderrPipe() + buff := make([]byte, 2048) n, err := s.Read(buff, 10000) + ffmpegIn.Write(buff[:n]) if err != nil { log.Println("Error occurred while reading SRT socket:", err) break @@ -68,6 +77,18 @@ func Serve(cfg *Options) { // videoTrack, err := peerConnection.NewTrack(payloadType, packet.SSRC, "video", "pion") + if err := ffmpeg.Start(); err != nil { + panic(err) + } + + // Log ffmpeg output + go func() { + scanner := bufio.NewScanner(ffmpegOut) + for scanner.Scan() { + log.Println(scanner.Text()) + } + }() + // Read RTP packets forever and send them to the WebRTC Client for { n, err := s.Read(buff, 10000) @@ -79,6 +100,7 @@ func Serve(cfg *Options) { log.Printf("Received %d bytes", n) packet := &rtp.Packet{} + ffmpegIn.Write(buff[:n]) if err := packet.Unmarshal(buff[:n]); err != nil { panic(err) } From 2920b6883cb65ebbabf66b8c245d6878ee7eb726 Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Tue, 29 Sep 2020 21:31:53 +0200 Subject: [PATCH 2/7] Separate multicast feature --- stream/multicast/muticast.go | 67 ++++++++++++++++++++++++++++++++++++ stream/srt/srt.go | 26 +++----------- 2 files changed, 72 insertions(+), 21 deletions(-) create mode 100644 stream/multicast/muticast.go diff --git a/stream/multicast/muticast.go b/stream/multicast/muticast.go new file mode 100644 index 0000000..c74eb91 --- /dev/null +++ b/stream/multicast/muticast.go @@ -0,0 +1,67 @@ +package multicast + +import ( + "bufio" + "fmt" + "io" + "log" + "os/exec" +) + +var ( + ffmpegInstances = make(map[string][]*exec.Cmd) + ffmpegInputStreams = make(map[string][]io.WriteCloser) +) + +// Declare a new open stream and create ffmpeg instances +func RegisterStream(streamKey string) { + ffmpegInstances[streamKey] = []*exec.Cmd{} + ffmpegInputStreams[streamKey] = []io.WriteCloser{} + + // TODO Export the list of multicasts + for _, stream := range []string{fmt.Sprintf("rtmp://live.twitch.tv/app/%s", "TWITCH_STREAM_KEY")} { + // Launch FFMPEG instance + // TODO Set optimal parameters + ffmpeg := exec.Command("ffmpeg", "-re", "-i", "pipe:0", "-f", "flv", "-c:v", "libx264", "-preset", + "veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac", + "-b:a", "160k", "-ac", "2", "-ar", "44100", stream) + ffmpegInstances[streamKey] = append(ffmpegInstances[streamKey], ffmpeg) + input, _ := ffmpeg.StdinPipe() + ffmpegInputStreams[streamKey] = append(ffmpegInputStreams[streamKey], input) + output, _ := ffmpeg.StdoutPipe() + + if err := ffmpeg.Start(); err != nil { + panic(err) + } + + // Log ffmpeg output + go func() { + scanner := bufio.NewScanner(output) + for scanner.Scan() { + log.Println(scanner.Text()) + } + }() + } +} + +// When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key +func SendPacket(streamKey string, data []byte) { + for _, stdin := range ffmpegInputStreams[streamKey] { + _, err := stdin.Write(data) + if err != nil { + panic(err) + } + } + +} + +// When the stream is ended, close FFMPEG instances +func CloseConnection(streamKey string) { + for _, ffmpeg := range ffmpegInstances[streamKey] { + if err := ffmpeg.Process.Kill(); err != nil { + panic(err) + } + } + delete(ffmpegInstances, streamKey) + delete(ffmpegInputStreams, streamKey) +} diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 95157f1..6289e07 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -1,11 +1,9 @@ package srt import ( - "bufio" - "fmt" + "gitlab.crans.org/nounous/ghostream/stream/multicast" "log" "net" - "os/exec" "strconv" "github.com/haivision/srtgo" @@ -51,14 +49,11 @@ func Serve(cfg *Options) { continue } - // Launch ffmpeg to stream on other RTMP servers - ffmpeg := exec.Command("ffmpeg", "-re", "-i", "pipe:0", "-f", "flv", "-c:v", "libx264", "-preset", "veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "44100", fmt.Sprintf("rtmp://live.twitch.tv/app/%s", "TWITCH_STREAM_KEY")) //nolint - ffmpegIn, _ := ffmpeg.StdinPipe() - ffmpegOut, _ := ffmpeg.StderrPipe() + multicast.RegisterStream("demo") // FIXME Replace with real stream key buff := make([]byte, 2048) n, err := s.Read(buff, 10000) - ffmpegIn.Write(buff[:n]) + multicast.SendPacket("demo", buff[:n]) if err != nil { log.Println("Error occurred while reading SRT socket:", err) break @@ -77,30 +72,19 @@ func Serve(cfg *Options) { // videoTrack, err := peerConnection.NewTrack(payloadType, packet.SSRC, "video", "pion") - if err := ffmpeg.Start(); err != nil { - panic(err) - } - - // Log ffmpeg output - go func() { - scanner := bufio.NewScanner(ffmpegOut) - for scanner.Scan() { - log.Println(scanner.Text()) - } - }() - // Read RTP packets forever and send them to the WebRTC Client for { n, err := s.Read(buff, 10000) if err != nil { log.Println("Error occured while reading SRT socket:", err) + multicast.CloseConnection("demo") break } log.Printf("Received %d bytes", n) packet := &rtp.Packet{} - ffmpegIn.Write(buff[:n]) + multicast.SendPacket("demo", buff[:n]) if err := packet.Unmarshal(buff[:n]); err != nil { panic(err) } From 5fa492547bbfb6eb20f32f54539e5a78f75e1d9a Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Tue, 29 Sep 2020 22:40:49 +0200 Subject: [PATCH 3/7] Use external configuration to setup multicasts --- docs/ghostream.example.yml | 6 ++++ main.go | 9 ++++++ stream/multicast/muticast.go | 55 +++++++++++++++++++++++++++--------- stream/srt/srt.go | 7 +++-- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/docs/ghostream.example.yml b/docs/ghostream.example.yml index 1df72ae..7130496 100644 --- a/docs/ghostream.example.yml +++ b/docs/ghostream.example.yml @@ -46,3 +46,9 @@ webrtc: # STUN servers, you should host your own Coturn instance STUNServers: - stun:stun.l.google.com:19302 + +# Configuration for the multicast feature +multicast: + outputs: + # demo: + # - rtmp://localhost:1925 diff --git a/main.go b/main.go index 5f8f1d3..e1c9342 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( + "gitlab.crans.org/nounous/ghostream/stream/multicast" "log" "strings" @@ -57,6 +58,7 @@ func loadConfiguration() { viper.SetDefault("WebRTC.MinPortUDP", 10000) viper.SetDefault("WebRTC.MaxPortUDP", 10005) viper.SetDefault("WebRTC.STUNServers", []string{"stun:stun.l.google.com:19302"}) + viper.SetDefault("Multicast.Outputs", make(map[string][]string)) // Copy STUN configuration to clients viper.Set("Web.STUNServers", viper.Get("WebRTC.STUNServers")) @@ -68,6 +70,7 @@ func main() { cfg := struct { Auth auth.Options Monitoring monitoring.Options + Multicast multicast.Options Srt srt.Options Web web.Options WebRTC webrtc.Options @@ -93,6 +96,12 @@ func main() { go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC) + // Init multicast + err = multicast.New(&cfg.Multicast) + if err != nil { + log.Fatalln("Failed to load multicast app:", err) + } + // Wait for routines select {} } diff --git a/stream/multicast/muticast.go b/stream/multicast/muticast.go index c74eb91..e948e32 100644 --- a/stream/multicast/muticast.go +++ b/stream/multicast/muticast.go @@ -2,33 +2,55 @@ package multicast import ( "bufio" - "fmt" "io" "log" "os/exec" ) +type Options struct { + Outputs map[string][]string +} + var ( + options Options ffmpegInstances = make(map[string][]*exec.Cmd) - ffmpegInputStreams = make(map[string][]io.WriteCloser) + ffmpegInputStreams = make(map[string][]*io.WriteCloser) ) -// Declare a new open stream and create ffmpeg instances +// New Load configuration +func New(cfg *Options) error { + options = *cfg + return nil +} + +// RegisterStream Declare a new open stream and create ffmpeg instances func RegisterStream(streamKey string) { ffmpegInstances[streamKey] = []*exec.Cmd{} - ffmpegInputStreams[streamKey] = []io.WriteCloser{} + ffmpegInputStreams[streamKey] = []*io.WriteCloser{} // TODO Export the list of multicasts - for _, stream := range []string{fmt.Sprintf("rtmp://live.twitch.tv/app/%s", "TWITCH_STREAM_KEY")} { + for _, stream := range options.Outputs[streamKey] { // Launch FFMPEG instance // TODO Set optimal parameters ffmpeg := exec.Command("ffmpeg", "-re", "-i", "pipe:0", "-f", "flv", "-c:v", "libx264", "-preset", "veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "44100", stream) + + // Open pipes + input, err := ffmpeg.StdinPipe() + if err != nil { + panic(err) + } + output, err := ffmpeg.StdoutPipe() + if err != nil { + panic(err) + } + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + panic(err) + } ffmpegInstances[streamKey] = append(ffmpegInstances[streamKey], ffmpeg) - input, _ := ffmpeg.StdinPipe() - ffmpegInputStreams[streamKey] = append(ffmpegInputStreams[streamKey], input) - output, _ := ffmpeg.StdoutPipe() + ffmpegInputStreams[streamKey] = append(ffmpegInputStreams[streamKey], &input) if err := ffmpeg.Start(); err != nil { panic(err) @@ -38,24 +60,31 @@ func RegisterStream(streamKey string) { go func() { scanner := bufio.NewScanner(output) for scanner.Scan() { - log.Println(scanner.Text()) + log.Println("[FFMPEG " + streamKey + "] " + scanner.Text()) + } + }() + // Log also error output + go func() { + scanner := bufio.NewScanner(errOutput) + for scanner.Scan() { + log.Println("[FFMPEG ERROR " + streamKey + "] " + scanner.Text()) } }() } } -// When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key +// SendPacket When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key func SendPacket(streamKey string, data []byte) { for _, stdin := range ffmpegInputStreams[streamKey] { - _, err := stdin.Write(data) + _, err := (*stdin).Write(data) if err != nil { - panic(err) + log.Println("Error while sending a packet to external streaming server for key "+streamKey, err) } } } -// When the stream is ended, close FFMPEG instances +// CloseConnection When the stream is ended, close FFMPEG instances func CloseConnection(streamKey string) { for _, ffmpeg := range ffmpegInstances[streamKey] { if err := ffmpeg.Process.Kill(); err != nil { diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 6289e07..67445a0 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -53,16 +53,18 @@ func Serve(cfg *Options) { buff := make([]byte, 2048) n, err := s.Read(buff, 10000) - multicast.SendPacket("demo", buff[:n]) if err != nil { log.Println("Error occurred while reading SRT socket:", err) break } if n == 0 { // End of stream + multicast.CloseConnection("demo") break } + multicast.SendPacket("demo", buff[:n]) + // Unmarshal the incoming packet packet := &rtp.Packet{} if err = packet.Unmarshal(buff[:n]); err != nil { @@ -81,10 +83,11 @@ func Serve(cfg *Options) { break } + multicast.SendPacket("demo", buff[:n]) + log.Printf("Received %d bytes", n) packet := &rtp.Packet{} - multicast.SendPacket("demo", buff[:n]) if err := packet.Unmarshal(buff[:n]); err != nil { panic(err) } From 9049935ee41b5eead4fca9b225b3a2d8dd89d627 Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Tue, 29 Sep 2020 23:43:19 +0200 Subject: [PATCH 4/7] Use only one instance of FFPMEG per SRT stream instead of one per additional cast --- stream/multicast/muticast.go | 103 +++++++++++++++++------------------ 1 file changed, 51 insertions(+), 52 deletions(-) diff --git a/stream/multicast/muticast.go b/stream/multicast/muticast.go index e948e32..abac889 100644 --- a/stream/multicast/muticast.go +++ b/stream/multicast/muticast.go @@ -13,8 +13,8 @@ type Options struct { var ( options Options - ffmpegInstances = make(map[string][]*exec.Cmd) - ffmpegInputStreams = make(map[string][]*io.WriteCloser) + ffmpegInstances = make(map[string]*exec.Cmd) + ffmpegInputStreams = make(map[string]*io.WriteCloser) ) // New Load configuration @@ -25,71 +25,70 @@ func New(cfg *Options) error { // RegisterStream Declare a new open stream and create ffmpeg instances func RegisterStream(streamKey string) { - ffmpegInstances[streamKey] = []*exec.Cmd{} - ffmpegInputStreams[streamKey] = []*io.WriteCloser{} + if len(options.Outputs[streamKey]) == 0 { + return + } - // TODO Export the list of multicasts + params := []string{"-re", "-i", "pipe:0"} for _, stream := range options.Outputs[streamKey] { - // Launch FFMPEG instance // TODO Set optimal parameters - ffmpeg := exec.Command("ffmpeg", "-re", "-i", "pipe:0", "-f", "flv", "-c:v", "libx264", "-preset", + params = append(params, "-f", "flv", "-c:v", "libx264", "-preset", "veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "44100", stream) - - // Open pipes - input, err := ffmpeg.StdinPipe() - if err != nil { - panic(err) - } - output, err := ffmpeg.StdoutPipe() - if err != nil { - panic(err) - } - errOutput, err := ffmpeg.StderrPipe() - if err != nil { - panic(err) - } - ffmpegInstances[streamKey] = append(ffmpegInstances[streamKey], ffmpeg) - ffmpegInputStreams[streamKey] = append(ffmpegInputStreams[streamKey], &input) - - if err := ffmpeg.Start(); err != nil { - panic(err) - } - - // Log ffmpeg output - go func() { - scanner := bufio.NewScanner(output) - for scanner.Scan() { - log.Println("[FFMPEG " + streamKey + "] " + scanner.Text()) - } - }() - // Log also error output - go func() { - scanner := bufio.NewScanner(errOutput) - for scanner.Scan() { - log.Println("[FFMPEG ERROR " + streamKey + "] " + scanner.Text()) - } - }() } + // Launch FFMPEG instance + ffmpeg := exec.Command("ffmpeg", params...) + + // Open pipes + input, err := ffmpeg.StdinPipe() + if err != nil { + panic(err) + } + output, err := ffmpeg.StdoutPipe() + if err != nil { + panic(err) + } + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + panic(err) + } + ffmpegInstances[streamKey] = ffmpeg + ffmpegInputStreams[streamKey] = &input + + if err := ffmpeg.Start(); err != nil { + panic(err) + } + + // Log ffmpeg output + go func() { + scanner := bufio.NewScanner(output) + for scanner.Scan() { + log.Println("[FFMPEG " + streamKey + "] " + scanner.Text()) + } + }() + // Log also error output + go func() { + scanner := bufio.NewScanner(errOutput) + for scanner.Scan() { + log.Println("[FFMPEG ERROR " + streamKey + "] " + scanner.Text()) + } + }() } // SendPacket When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key func SendPacket(streamKey string, data []byte) { - for _, stdin := range ffmpegInputStreams[streamKey] { - _, err := (*stdin).Write(data) - if err != nil { - log.Println("Error while sending a packet to external streaming server for key "+streamKey, err) - } + stdin := ffmpegInputStreams[streamKey] + _, err := (*stdin).Write(data) + if err != nil { + log.Println("Error while sending a packet to external streaming server for key "+streamKey, err) } - } // CloseConnection When the stream is ended, close FFMPEG instances func CloseConnection(streamKey string) { - for _, ffmpeg := range ffmpegInstances[streamKey] { - if err := ffmpeg.Process.Kill(); err != nil { - panic(err) - } + ffmpeg := ffmpegInstances[streamKey] + if err := ffmpeg.Process.Kill(); err != nil { + panic(err) } delete(ffmpegInstances, streamKey) delete(ffmpegInputStreams, streamKey) From 5efbe309ac11af7486b58569a40f1054f19e1edd Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Wed, 30 Sep 2020 00:36:58 +0200 Subject: [PATCH 5/7] Optimize FFMPEG parameters to reduce CPU charge --- stream/multicast/muticast.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/stream/multicast/muticast.go b/stream/multicast/muticast.go index abac889..1502d70 100644 --- a/stream/multicast/muticast.go +++ b/stream/multicast/muticast.go @@ -31,10 +31,8 @@ func RegisterStream(streamKey string) { params := []string{"-re", "-i", "pipe:0"} for _, stream := range options.Outputs[streamKey] { - // TODO Set optimal parameters - params = append(params, "-f", "flv", "-c:v", "libx264", "-preset", - "veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac", - "-b:a", "160k", "-ac", "2", "-ar", "44100", stream) + params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency", + "-c", "copy", stream) } // Launch FFMPEG instance ffmpeg := exec.Command("ffmpeg", params...) From 4024ed5b2a0cf92f47f3b1dc498817fc87e805e4 Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Wed, 30 Sep 2020 08:50:54 +0200 Subject: [PATCH 6/7] Add a comment, fix linting --- stream/multicast/muticast.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stream/multicast/muticast.go b/stream/multicast/muticast.go index 1502d70..5b1d96c 100644 --- a/stream/multicast/muticast.go +++ b/stream/multicast/muticast.go @@ -7,6 +7,8 @@ import ( "os/exec" ) +// Options to configure the multicast: +//for each stream key, we can have several additional stream URL where the main stream is redirected to type Options struct { Outputs map[string][]string } From b647c761f3a40e47db1c54a97a959cdf70bcaa96 Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Wed, 30 Sep 2020 13:08:44 +0200 Subject: [PATCH 7/7] Test loading the configuration --- main_test.go | 27 +++++++++++++++++++++++++++ stream/multicast/multicast_test.go | 1 + 2 files changed, 28 insertions(+) create mode 100644 stream/multicast/multicast_test.go diff --git a/main_test.go b/main_test.go index 06ab7d0..7650bd7 100644 --- a/main_test.go +++ b/main_test.go @@ -1 +1,28 @@ package main + +import ( + "github.com/spf13/viper" + "gitlab.crans.org/nounous/ghostream/auth" + "gitlab.crans.org/nounous/ghostream/internal/monitoring" + "gitlab.crans.org/nounous/ghostream/stream/multicast" + "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream/webrtc" + "gitlab.crans.org/nounous/ghostream/web" + "testing" +) + +// TestLoadConfiguration tests the configuration file loading and the init of some parameters +func TestLoadConfiguration(t *testing.T) { + loadConfiguration() + cfg := struct { + Auth auth.Options + Monitoring monitoring.Options + Multicast multicast.Options + Srt srt.Options + Web web.Options + WebRTC webrtc.Options + }{} + if err := viper.Unmarshal(&cfg); err != nil { + t.Fatal("Failed to load settings", err) + } +} diff --git a/stream/multicast/multicast_test.go b/stream/multicast/multicast_test.go new file mode 100644 index 0000000..aa01d64 --- /dev/null +++ b/stream/multicast/multicast_test.go @@ -0,0 +1 @@ +package multicast