diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index 9893602..ff09fac 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -1,11 +1,15 @@ package webrtc import ( + "bufio" "fmt" + "github.com/pion/rtp" "io" "log" "math/rand" + "net" "os" + "os/exec" "time" "github.com/pion/webrtc/v3" @@ -30,7 +34,7 @@ type SessionDescription = webrtc.SessionDescription const ( audioFileName = "output.ogg" - videoFileName = "output.ivf" + videoFileName = "toto.ivf" ) var ( @@ -244,16 +248,115 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa } func waitForPackets(inputChannel chan srt.Packet) { + // FIXME Clean code + var ffmpeg *exec.Cmd + var ffmpegInput io.WriteCloser for { var err error = nil packet := <-inputChannel switch packet.PacketType { case "register": log.Printf("WebRTC RegisterStream %s", packet.StreamName) + + // Copied from https://github.com/pion/webrtc/blob/master/examples/rtp-to-webrtc/main.go + + // Open a UDP Listener for RTP Packets on port 5004 + videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) + if err != nil { + panic(err) + } + audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005}) + if err != nil { + panic(err) + } + defer func() { + if err = videoListener.Close(); err != nil { + panic(err) + } + if err = audioListener.Close(); err != nil { + panic(err) + } + }() + + ffmpeg = exec.Command("ffmpeg", "-re", "-i", "pipe:0", + "-an", "-vcodec", "libvpx", //"-cpu-used", "5", "-deadline", "1", "-g", "10", "-error-resilient", "1", "-auto-alt-ref", "1", + "-f", "rtp", "rtp://127.0.0.1:5004", + "-vn", "-acodec", "libopus", //"-cpu-used", "5", "-deadline", "1", "-g", "10", "-error-resilient", "1", "-auto-alt-ref", "1", + "-f", "rtp", "rtp://127.0.0.1:5005") + + fmt.Println("Waiting for RTP Packets, please run GStreamer or ffmpeg now") + + input, err := ffmpeg.StdinPipe() + if err != nil { + panic(err) + } + ffmpegInput = input + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + panic(err) + } + + if err := ffmpeg.Start(); err != nil { + panic(err) + } + + // Receive video + go func() { + for { + // Listen for a single RTP Packet, we need this to determine the SSRC + inboundRTPPacket := make([]byte, 1500) // UDP MTU + n, _, err := videoListener.ReadFromUDP(inboundRTPPacket) + if err != nil { + panic(err) + } + packet := &rtp.Packet{} + if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { + panic(err) + } + log.Printf("[Video] %s", packet) + for _, videoTrack := range videoTracks { + if writeErr := videoTrack.WriteRTP(packet); writeErr != nil { + panic(err) + } + } + } + }() + + // Receive audio + go func() { + for { + // Listen for a single RTP Packet, we need this to determine the SSRC + inboundRTPPacket := make([]byte, 1500) // UDP MTU + n, _, err := audioListener.ReadFromUDP(inboundRTPPacket) + if err != nil { + panic(err) + } + packet := &rtp.Packet{} + if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { + panic(err) + } + log.Printf("[Audio] %s", packet) + for _, audioTrack := range audioTracks { + if writeErr := audioTrack.WriteRTP(packet); writeErr != nil { + panic(err) + } + } + } + }() + + go func() { + scanner := bufio.NewScanner(errOutput) + for scanner.Scan() { + log.Printf("[WEBRTC FFMPEG %s] %s", "demo", scanner.Text()) + } + }() break case "sendData": - log.Printf("WebRTC SendPacket %s", packet.StreamName) - // packet.Data + // log.Printf("WebRTC SendPacket %s", packet.StreamName) + _, err := ffmpegInput.Write(packet.Data) + if err != nil { + panic(err) + } break case "close": log.Printf("WebRTC CloseConnection %s", packet.StreamName) @@ -274,8 +377,8 @@ func Serve(remoteSdpChan, localSdpChan chan webrtc.SessionDescription, inputChan // FIXME: use data from inputChannel go waitForPackets(inputChannel) - go playVideo() - go playAudio() + // go playVideo() + // go playAudio() // Handle new connections for {