1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2025-07-04 12:52:13 +02:00

1 Commits

Author SHA1 Message Date
86dac0f929 WebRTC offers multiple quality 2020-10-29 00:10:25 +01:00
11 changed files with 132 additions and 176 deletions

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.13
require (
github.com/go-ldap/ldap/v3 v3.2.3
github.com/gorilla/websocket v1.4.0
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a
github.com/markbates/pkger v0.17.1
github.com/pion/rtp v1.6.0
github.com/pion/webrtc/v3 v3.0.0-beta.5

4
go.sum
View File

@ -122,8 +122,6 @@ github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a h1:JliMkv/mAqM5+QzG6Hkw1XcVl1crU8yIQGnhppMv7s0=
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a/go.mod h1:yVZ4oACfcnUAcxrh+0b6IuIWfkHLK3IAQ99tuuhRx54=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a h1:54abJQezjMoiP+xMQ3ZQbcDXFjqytAYm/n0EVqrYeXg=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a/go.mod h1:7izzTiCO3zc9ZIVTFMjxUiYL+kgryFP9rl3bsweqdmc=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
@ -418,8 +416,6 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c h1:38q6VNPWR010vN82/SB121GujZNIfAUb4YttE2rhGuc=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@ -10,6 +10,12 @@ import (
// Quality holds a specific stream quality.
// It makes packages able to subscribe to an incoming stream.
type Quality struct {
// Type of the quality
Name string
// Source Stream
Stream *Stream
// Incoming data come from this channel
Broadcast chan<- []byte
@ -27,8 +33,9 @@ type Quality struct {
WebRtcRemoteSdp chan webrtc.SessionDescription
}
func newQuality() (q *Quality) {
q = &Quality{}
func newQuality(name string, stream *Stream) (q *Quality) {
q = &Quality{Name: name}
q.Stream = stream
broadcast := make(chan []byte, 1024)
q.Broadcast = broadcast
q.outputs = make(map[chan []byte]struct{})

View File

@ -40,7 +40,7 @@ func (s *Stream) CreateQuality(name string) (quality *Quality, err error) {
}
s.lockQualities.Lock()
quality = newQuality()
quality = newQuality(name, s)
s.qualities[name] = quality
s.lockQualities.Unlock()
return quality, nil

View File

@ -24,6 +24,17 @@ func handleStreamer(socket *srtgo.SrtSocket, streams *messaging.Streams, name st
socket.Close()
return
}
// Create sub-qualities
for _, qualityName := range []string{"audio", "480p", "360p", "240p"} {
_, err := stream.CreateQuality(qualityName)
if err != nil {
log.Printf("Error on quality creating: %s", err)
socket.Close()
return
}
}
log.Printf("New SRT streamer for stream '%s' quality 'source'", name)
// Read RTP packets forever and send them to the WebRTC Client

View File

@ -1,6 +1,9 @@
// Package srt serves a SRT server
package srt
// #include <srt/srt.h>
import "C"
import (
"log"
"net"
@ -59,7 +62,7 @@ func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
for {
// Wait for new connection
s, _, err := sck.Accept()
s, err := sck.Accept()
if err != nil {
// Something wrong happened
log.Println(err)
@ -70,7 +73,7 @@ func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
// Without this, the SRT buffer might get full before reading it
// streamid can be "name:password" for streamer or "name" for viewer
streamID, err := s.GetSockOptString(srtgo.SRTO_STREAMID)
streamID, err := s.GetSockOptString(C.SRTO_STREAMID)
if err != nil {
log.Print("Failed to get socket streamid")
continue

View File

@ -3,9 +3,7 @@ package webrtc
import (
"bufio"
"fmt"
"log"
"math/rand"
"net"
"os/exec"
@ -16,36 +14,61 @@ import (
func ingest(name string, q *messaging.Quality) {
// Register to get stream
videoInput := make(chan []byte, 1024)
q.Register(videoInput)
input := make(chan []byte, 1024)
// FIXME Stream data should already be transcoded
source, _ := q.Stream.GetQuality("source")
source.Register(input)
// FIXME Mux into RTP without having multiple UDP listeners
firstPort := int(rand.Int31n(63535)) + 2000
// Open UDP listeners for RTP Packets
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
// FIXME Bad code
port := 5000
var tracks map[string][]*webrtc.Track
qualityName := ""
switch q.Name {
case "audio":
port = 5004
tracks = audioTracks
break
case "source":
port = 5005
tracks = videoTracks
qualityName = "@source"
break
case "480p":
port = 5006
tracks = videoTracks
qualityName = "@480p"
break
case "360p":
port = 5007
tracks = videoTracks
qualityName = "@360p"
break
case "240p":
port = 5008
tracks = videoTracks
qualityName = "@240p"
break
}
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort + 1})
// Open a UDP Listener for RTP Packets
listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
// Start ffmpag to convert videoInput to video and audio UDP
ffmpeg, err := startFFmpeg(videoInput, firstPort)
// Start ffmpag to convert input to video and audio UDP
ffmpeg, err := startFFmpeg(q, input)
if err != nil {
log.Printf("Error while starting ffmpeg: %s", err)
return
}
// Receive video
// Receive stream
go func() {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
for {
n, _, err := videoListener.ReadFromUDP(inboundRTPPacket)
n, _, err := listener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
break
@ -56,49 +79,13 @@ func ingest(name string, q *messaging.Quality) {
continue
}
if videoTracks[name] == nil {
videoTracks[name] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all video tracks
// Write RTP srtPacket to all tracks
// Adapt payload and SSRC to match destination
for _, videoTrack := range videoTracks[name] {
packet.Header.PayloadType = videoTrack.PayloadType()
packet.Header.SSRC = videoTrack.SSRC()
if writeErr := videoTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to video track: %s", err)
continue
}
}
}
}()
// Receive audio
go func() {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
for {
n, _, err := audioListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
break
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
if audioTracks[name] == nil {
audioTracks[name] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination
for _, audioTrack := range audioTracks[name] {
packet.Header.PayloadType = audioTrack.PayloadType()
packet.Header.SSRC = audioTrack.SSRC()
if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to audio track: %s", err)
for _, track := range tracks[name+qualityName] {
packet.Header.PayloadType = track.PayloadType()
packet.Header.SSRC = track.SSRC()
if writeErr := track.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to track: %s", writeErr)
continue
}
}
@ -110,24 +97,47 @@ func ingest(name string, q *messaging.Quality) {
log.Printf("Faited to wait for ffmpeg: %s", err)
}
// Close UDP listeners
if err = videoListener.Close(); err != nil {
// Close UDP listener
if err = listener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err)
}
if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err)
}
q.Unregister(videoInput)
q.Unregister(input)
}
func startFFmpeg(in <-chan []byte, listeningPort int) (ffmpeg *exec.Cmd, err error) {
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
// Audio
"-vn", "-c:a", "libopus", "-b:a", "160k",
"-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort),
// Source
"-an", "-c:v", "copy", "-b:v", "3000k", "-maxrate", "5000k", "-bufsize", "5000k",
"-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort+1)}
func startFFmpeg(q *messaging.Quality, in <-chan []byte) (ffmpeg *exec.Cmd, err error) {
// FIXME Use transcoders to downscale, then remux in RTP
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0"}
switch q.Name {
case "audio":
ffmpegArgs = append(ffmpegArgs, "-vn", "-c:a", "libopus", "-b:a", "160k",
"-f", "rtp", "rtp://127.0.0.1:5004")
break
case "source":
ffmpegArgs = append(ffmpegArgs, "-an", "-c:v", "copy",
"-f", "rtp", "rtp://127.0.0.1:5005")
break
case "480p":
ffmpegArgs = append(ffmpegArgs,
"-an", "-c:v", "libx264", "-b:v", "1200k", "-maxrate", "2000k", "-bufsize", "3000k",
"-preset", "ultrafast", "-profile", "main", "-tune", "zerolatency",
"-vf", "scale=854:480",
"-f", "rtp", "rtp://127.0.0.1:5006")
break
case "360p":
ffmpegArgs = append(ffmpegArgs,
"-an", "-c:v", "libx264", "-b:v", "800k", "-maxrate", "1200k", "-bufsize", "1500k",
"-preset", "ultrafast", "-profile", "main", "-tune", "zerolatency",
"-vf", "scale=480:360",
"-f", "rtp", "rtp://127.0.0.1:5007")
break
case "240p":
ffmpegArgs = append(ffmpegArgs,
"-an", "-c:v", "libx264", "-b:v", "500k", "-maxrate", "800k", "-bufsize", "1000k",
"-preset", "ultrafast", "-profile", "main", "-tune", "zerolatency",
"-vf", "scale=360:240",
"-f", "rtp", "rtp://127.0.0.1:5008")
break
}
ffmpeg = exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output

View File

@ -40,7 +40,7 @@ func removeTrack(tracks []*webrtc.Track, track *webrtc.Track) []*webrtc.Track {
// GetNumberConnectedSessions get the number of currently connected clients
func GetNumberConnectedSessions(streamID string) int {
return len(videoTracks[streamID])
return len(audioTracks[streamID])
}
// newPeerHandler is called when server receive a new session description
@ -117,21 +117,20 @@ func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, re
quality = split[1]
}
log.Printf("New WebRTC session for stream %s, quality %s", streamID, quality)
// TODO Consider the quality
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
log.Printf("Connection State has changed %s \n", connectionState.String())
if videoTracks[streamID] == nil {
videoTracks[streamID] = make([]*webrtc.Track, 0, 1)
if videoTracks[streamID+"@"+quality] == nil {
videoTracks[streamID+"@"+quality] = make([]*webrtc.Track, 0, 1)
}
if audioTracks[streamID] == nil {
audioTracks[streamID] = make([]*webrtc.Track, 0, 1)
}
if connectionState == webrtc.ICEConnectionStateConnected {
// Register tracks
videoTracks[streamID] = append(videoTracks[streamID], videoTrack)
videoTracks[streamID+"@"+quality] = append(videoTracks[streamID+"@"+quality], videoTrack)
audioTracks[streamID] = append(audioTracks[streamID], audioTrack)
monitoring.WebRTCConnectedSessions.Inc()
} else if connectionState == webrtc.ICEConnectionStateDisconnected {
@ -205,16 +204,17 @@ func Serve(streams *messaging.Streams, cfg *Options) {
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
for _, qualityName := range []string{"source", "audio", "480p", "360p", "240p"} {
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName)
go ingest(name, quality)
go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg)
// Start forwarding
log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName)
go ingest(name, quality)
go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg)
}
}
}

View File

@ -14,7 +14,7 @@ export function initViewerPage(stream, stunServers, viewersCounterRefreshPeriod)
const viewer = document.getElementById("viewer");
// Default quality
let quality = "source";
let quality = "240p";
// Create WebSocket and WebRTC
const websocket = new GsWebSocket();

View File

@ -21,7 +21,11 @@
<ul>
<li>
<b>Serveur :</b>
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?IDENTIFIANT:MOT_DE_PASS</code>,
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}</code>,
</li>
<li>
<b>Clé de stream :</b>
<code>IDENTIFIANT:MOT_DE_PASSE</code>
avec <code>IDENTIFIANT</code> et <code>MOT_DE_PASSE</code>
vos identifiants.
</li>
@ -48,81 +52,6 @@
</code>
</p>
<h2>Comment lire un flux depuis un lecteur externe ?</h2>
<p>
À l'heure actuelle, la plupart des lecteurs vidéos ne supportent
pas le protocole SRT, ou le supportent mal. Un travail est en
cours pour les rendre un maximum compatibles. Liste non exhaustive
des lecteurs vidéos testés :
</p>
<h3>FFPlay</h3>
<p>
Si FFMpeg est installé sur votre machine, il est accompagné d'un
lecteur vidéo nommé <code>ffplay</code>. Si FFMpeg est compilé
avec le support de SRT (c'est le cas sur la plupart des distributions,
sauf cas ci-dessous), il vous suffira d'exécuter :
</p>
<p>
<code>
ffplay -fflags nobuffer srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>MPV</h3>
<p>
MPV supporte officiellement SRT depuis le 16 octobre 2020.
Néanmoins, la version stable de MPV est beaucoup plus vieille.
Vous devez alors utiliser une version de développement pour
pouvoir lire un flux SRT depuis MPV. L'installation se fait
depuis <a href="https://mpv.io/installation/"> cette page</a>.
Sous Arch Linux, il vous suffit de récupérer le paquet
<code>mpv-git</code> dans l'AUR. Pour lire le flux, il suffit
d'exécuter :
</p>
<p>
<code>
mpv srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>VLC Media Player</h3>
<p>
Bien que VLC supporte officiellement le protocole SRT,
toutes les options ne sont pas encore implémentées,
notamment l'option pour choisir son stream.
<a href="https://patches.videolan.org/patch/30299/">Un patch</a>
a été soumis et est en attente d'acceptation.
Une fois le patch accepté, il sera appliqué dans les versions
de développement de VLC. Sous Arch Linux, il suffit de récupérer
le paquet <code>vlc-git</code> de l'AUR. Avec un VLC à jour,
il suffit d'exécuter :
</p>
<p>
<code>
vlc srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<p>
Ou bien d'aller dans Média -> Ouvrir un flux réseau et d'entrer l'URL
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT</code>.
</p>
<h3>Le protocole n'existe pas ou n'est pas supporté.</h3>
<p>
La technologie SRT est très récente et n'est pas supportée par
les dépôts stables. Ainsi, si vous avez Ubuntu &le; 20.04 ou
Debian &le; Buster, vous ne pourrez pas utiliser de lecteur vidéo
ni même diffuser avec votre machine. Vous devrez vous mettre à
jour vers Ubuntu 20.10 ou Debian Bullseye.
</p>
<h2>Mentions légales</h2>
<p>
Le service de diffusion vidéo du Crans est un service d'hébergement

View File

@ -8,9 +8,9 @@
<div class="controls">
<span class="control-quality">
<select id="quality">
<option value="source">Source</option>
<option value="720p">720p</option>
<option value="240p">Source</option>
<option value="480p">480p</option>
<option value="360p">360p</option>
<option value="240p">240p</option>
</select>
</span>