From b9da2ab3a7f421bbb36a0874cdbbf93a053ceaad Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sun, 18 Oct 2020 16:05:28 +0200 Subject: [PATCH] Make webrtc and forwarding work with new messaging --- stream/forwarding/forwarding.go | 47 +++++-- stream/forwarding/forwarding_test.go | 12 +- stream/webrtc/ingest.go | 183 +++++++++++++-------------- stream/webrtc/webrtc.go | 12 +- stream/webrtc/webrtc_test.go | 8 +- transcoder/text/text.go | 2 +- 6 files changed, 144 insertions(+), 120 deletions(-) diff --git a/stream/forwarding/forwarding.go b/stream/forwarding/forwarding.go index fbcd687..d8fcf88 100644 --- a/stream/forwarding/forwarding.go +++ b/stream/forwarding/forwarding.go @@ -2,12 +2,10 @@ package forwarding import ( - "bufio" - "io" "log" - "os/exec" + "time" - "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream" ) // Options to configure the stream forwarding. @@ -15,21 +13,46 @@ import ( type Options map[string][]string // Serve handles incoming packets from SRT and forward them to other external services -func Serve(inputChannel chan srt.Packet, cfg Options) { +func Serve(streams map[string]*stream.Stream, cfg Options) { if len(cfg) < 1 { // No forwarding, ignore - for { - <-inputChannel // Clear input channel - } + return } log.Printf("Stream forwarding initialized") - ffmpegInstances := make(map[string]*exec.Cmd) + for { + for name, st := range streams { + fwdCfg, ok := cfg[name] + if !ok { + // Not configured + continue + } + + // Start forwarding + log.Printf("Starting forwarding for '%s'", name) + go forward(st, fwdCfg) + } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) + } +} + +func forward(st *stream.Stream, fwdCfg []string) { + // FIXME + /*ffmpegInstances := make(map[string]*exec.Cmd) ffmpegInputStreams := make(map[string]*io.WriteCloser) for { var err error = nil // Wait for packets - packet := <-inputChannel + // FIXME packet := <-inputChannel + packet := srt.Packet{ + Data: []byte{}, + PacketType: "nothing", + StreamName: "demo", + } switch packet.PacketType { case "register": err = registerStream(packet.StreamName, ffmpegInstances, ffmpegInputStreams, cfg) @@ -47,9 +70,10 @@ func Serve(inputChannel chan srt.Packet, cfg Options) { if err != nil { log.Printf("Error occurred while receiving SRT packet of type %s: %s", packet.PacketType, err) } - } + }*/ } +/* // registerStream creates ffmpeg instance associated with newly created stream func registerStream(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams map[string]*io.WriteCloser, cfg Options) error { streams, exist := cfg[name] @@ -119,3 +143,4 @@ func close(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams delete(ffmpegInputStreams, name) return nil } +*/ diff --git a/stream/forwarding/forwarding_test.go b/stream/forwarding/forwarding_test.go index bc9cb5e..1f6ea6a 100644 --- a/stream/forwarding/forwarding_test.go +++ b/stream/forwarding/forwarding_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/stream/srt" ) @@ -30,16 +31,15 @@ func TestForwardStream(t *testing.T) { } }() - forwardingList := make(map[string][]string) - forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} - - forwardingChannel := make(chan srt.Packet) + cfg := make(map[string][]string) + cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} // Register forwarding stream list - go Serve(forwardingChannel, forwardingList) + streams := make(map[string]*stream.Stream) + go Serve(streams, cfg) // Serve SRT Server without authentification backend - go srt.Serve(&srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2}, nil, forwardingChannel, nil) + go srt.Serve(streams, nil, &srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2}) ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", "-re", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10", diff --git a/stream/webrtc/ingest.go b/stream/webrtc/ingest.go index 66a4b0a..ccede0d 100644 --- a/stream/webrtc/ingest.go +++ b/stream/webrtc/ingest.go @@ -3,61 +3,53 @@ package webrtc import ( "bufio" - "fmt" - "io" "log" "net" "os/exec" + "strings" + "time" "github.com/pion/rtp" "github.com/pion/webrtc/v3" - "gitlab.crans.org/nounous/ghostream/stream/srt" - "gitlab.crans.org/nounous/ghostream/stream/telnet" + "gitlab.crans.org/nounous/ghostream/stream" ) var ( - ffmpeg = make(map[string]*exec.Cmd) - ffmpegInput = make(map[string]io.WriteCloser) + activeStream map[string]struct{} ) -func ingestFrom(inputChannel chan srt.Packet) { - // FIXME Clean code - +func autoIngest(streams map[string]*stream.Stream) { + // Regulary check existing streams + activeStream = make(map[string]struct{}) for { - var err error = nil - srtPacket := <-inputChannel - switch srtPacket.PacketType { - case "register": - go registerStream(&srtPacket) - break - case "sendData": - if _, ok := ffmpegInput[srtPacket.StreamName]; !ok { - break + for name, st := range streams { + if strings.Contains(name, "@") { + // Not a source stream, pass + continue } - // FIXME send to stream srtPacket.StreamName - if _, err := ffmpegInput[srtPacket.StreamName].Write(srtPacket.Data); err != nil { - log.Printf("Failed to write data to ffmpeg input: %s", err) + + if _, ok := activeStream[name]; ok { + // Stream is already ingested + continue } - break - case "close": - log.Printf("WebRTC CloseConnection %s", srtPacket.StreamName) - _ = ffmpeg[srtPacket.StreamName].Process.Kill() - _ = ffmpegInput[srtPacket.StreamName].Close() - delete(ffmpeg, srtPacket.StreamName) - delete(ffmpegInput, srtPacket.StreamName) - break - default: - log.Println("Unknown SRT srtPacket type:", srtPacket.PacketType) - break - } - if err != nil { - log.Printf("Error occurred while receiving SRT srtPacket of type %s: %s", srtPacket.PacketType, err) + + // Start ingestion + log.Printf("Starting webrtc for '%s'", name) + go ingest(name, st) } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) } } -func registerStream(srtPacket *srt.Packet) { - log.Printf("WebRTC RegisterStream %s", srtPacket.StreamName) +func ingest(name string, input *stream.Stream) { + // Register to get stream + videoInput := make(chan []byte, 1024) + input.Register(videoInput) + activeStream[name] = struct{}{} // Open a UDP Listener for RTP Packets on port 5004 videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) @@ -70,55 +62,12 @@ func registerStream(srtPacket *srt.Packet) { log.Printf("Faited to open UDP listener %s", err) return } - // FIXME Close UDP listeners at the end of the stream, not the end of the routine - /* defer func() { - if err = videoListener.Close(); err != nil { - log.Printf("Faited to close UDP listener %s", err) - } - if err = audioListener.Close(); err != nil { - log.Printf("Faited to close UDP listener %s", err) - } - }() */ - ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", - "-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality - "-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1", - "-auto-alt-ref", "1", - "-f", "rtp", "rtp://127.0.0.1:5004", - "-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1", - "-f", "rtp", "rtp://127.0.0.1:5005"} - - // Export stream to ascii art - if telnet.Cfg.Enabled { - bitrate := fmt.Sprintf("%dk", telnet.Cfg.Width*telnet.Cfg.Height/telnet.Cfg.Delay) - ffmpegArgs = append(ffmpegArgs, - "-an", "-vf", fmt.Sprintf("scale=%dx%d", telnet.Cfg.Width, telnet.Cfg.Height), - "-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, "-q", "42", "-pix_fmt", "gray", "-f", "rawvideo", "pipe:1") - } - - ffmpeg[srtPacket.StreamName] = exec.Command("ffmpeg", ffmpegArgs...) - - input, err := ffmpeg[srtPacket.StreamName].StdinPipe() + // Start ffmpag to convert videoInput to video and audio UDP + ffmpeg, err := startFFmpeg(videoInput) if err != nil { - panic(err) - } - ffmpegInput[srtPacket.StreamName] = input - errOutput, err := ffmpeg[srtPacket.StreamName].StderrPipe() - if err != nil { - panic(err) - } - - // Receive raw video output and convert it to ASCII art, then forward it TCP - if telnet.Cfg.Enabled { - output, err := ffmpeg[srtPacket.StreamName].StdoutPipe() - if err != nil { - panic(err) - } - go telnet.StartASCIIArtStream(srtPacket.StreamName, output) - } - - if err := ffmpeg[srtPacket.StreamName].Start(); err != nil { - panic(err) + log.Printf("Error while starting ffmpeg: %s", err) + return } // Receive video @@ -128,7 +77,7 @@ func registerStream(srtPacket *srt.Packet) { n, _, err := videoListener.ReadFromUDP(inboundRTPPacket) if err != nil { log.Printf("Failed to read from UDP: %s", err) - continue + break } packet := &rtp.Packet{} if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { @@ -136,13 +85,13 @@ func registerStream(srtPacket *srt.Packet) { continue } - if videoTracks[srtPacket.StreamName] == nil { - videoTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0) + if videoTracks[name] == nil { + videoTracks[name] = make([]*webrtc.Track, 0) } // Write RTP srtPacket to all video tracks // Adapt payload and SSRC to match destination - for _, videoTrack := range videoTracks[srtPacket.StreamName] { + for _, videoTrack := range videoTracks[name] { packet.Header.PayloadType = videoTrack.PayloadType() packet.Header.SSRC = videoTrack.SSRC() if writeErr := videoTrack.WriteRTP(packet); writeErr != nil { @@ -160,7 +109,7 @@ func registerStream(srtPacket *srt.Packet) { n, _, err := audioListener.ReadFromUDP(inboundRTPPacket) if err != nil { log.Printf("Failed to read from UDP: %s", err) - continue + break } packet := &rtp.Packet{} if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { @@ -168,13 +117,13 @@ func registerStream(srtPacket *srt.Packet) { continue } - if audioTracks[srtPacket.StreamName] == nil { - audioTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0) + if audioTracks[name] == nil { + audioTracks[name] = make([]*webrtc.Track, 0) } // Write RTP srtPacket to all audio tracks // Adapt payload and SSRC to match destination - for _, audioTrack := range audioTracks[srtPacket.StreamName] { + for _, audioTrack := range audioTracks[name] { packet.Header.PayloadType = audioTrack.PayloadType() packet.Header.SSRC = audioTrack.SSRC() if writeErr := audioTrack.WriteRTP(packet); writeErr != nil { @@ -185,10 +134,60 @@ func registerStream(srtPacket *srt.Packet) { } }() + // Wait for stopped ffmpeg + if err = ffmpeg.Wait(); err != nil { + log.Printf("Faited to wait for ffmpeg: %s", err) + } + + // Close UDP listeners + if err = videoListener.Close(); err != nil { + log.Printf("Faited to close UDP listener: %s", err) + } + if err = audioListener.Close(); err != nil { + log.Printf("Faited to close UDP listener: %s", err) + } + delete(activeStream, name) +} + +func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { + ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", + "-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality + "-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1", + "-auto-alt-ref", "1", + "-f", "rtp", "rtp://127.0.0.1:5004", + "-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1", + "-f", "rtp", "rtp://127.0.0.1:5005"} + ffmpeg = exec.Command("ffmpeg", ffmpegArgs...) + + // Handle errors output + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + return nil, err + } go func() { scanner := bufio.NewScanner(errOutput) for scanner.Scan() { log.Printf("[WEBRTC FFMPEG %s] %s", "demo", scanner.Text()) } }() + + // Handle stream input + input, err := ffmpeg.StdinPipe() + if err != nil { + return nil, err + } + go func() { + for data := range in { + if _, err := input.Write(data); err != nil { + log.Printf("Failed to write data to ffmpeg input: %s", err) + } + } + + // End of stream + ffmpeg.Process.Kill() + }() + + // Start process + err = ffmpeg.Start() + return ffmpeg, err } diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index 365ebc3..37390dd 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -8,7 +8,7 @@ import ( "github.com/pion/webrtc/v3" "gitlab.crans.org/nounous/ghostream/internal/monitoring" - "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds web package configuration @@ -182,12 +182,12 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa } // Serve WebRTC media streaming server -func Serve(remoteSdpChan chan struct { +func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct { StreamID string RemoteDescription webrtc.SessionDescription -}, localSdpChan chan webrtc.SessionDescription, inputChannel chan srt.Packet, cfg *Options) { +}, localSdpChan chan webrtc.SessionDescription, cfg *Options) { if !cfg.Enabled { - // SRT is not enabled, ignore + // WebRTC is not enabled, ignore return } @@ -197,8 +197,8 @@ func Serve(remoteSdpChan chan struct { videoTracks = make(map[string][]*webrtc.Track) audioTracks = make(map[string][]*webrtc.Track) - // Ingest data from SRT - go ingestFrom(inputChannel) + // Ingest data + go autoIngest(streams) // Handle new connections for { diff --git a/stream/webrtc/webrtc_test.go b/stream/webrtc/webrtc_test.go index 19f4b34..ee34ca5 100644 --- a/stream/webrtc/webrtc_test.go +++ b/stream/webrtc/webrtc_test.go @@ -5,24 +5,24 @@ import ( "testing" "github.com/pion/webrtc/v3" - "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream" ) func TestServe(t *testing.T) { - // Serve WebRTC server + // Init streams messaging and WebRTC server + streams := make(map[string]*stream.Stream) remoteSdpChan := make(chan struct { StreamID string RemoteDescription webrtc.SessionDescription }) localSdpChan := make(chan webrtc.SessionDescription) - webrtcChannel := make(chan srt.Packet, 64) cfg := Options{ Enabled: true, MinPortUDP: 10000, MaxPortUDP: 10005, STUNServers: []string{"stun:stun.l.google.com:19302"}, } - go Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg) + go Serve(streams, remoteSdpChan, localSdpChan, &cfg) // New client connection mediaEngine := webrtc.MediaEngine{} diff --git a/transcoder/text/text.go b/transcoder/text/text.go index 44114f4..02a0550 100644 --- a/transcoder/text/text.go +++ b/transcoder/text/text.go @@ -153,5 +153,5 @@ func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, err // Start process err = ffmpeg.Start() - return ffmpeg, &output, nil + return ffmpeg, &output, err }