1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2025-07-01 11:11:13 +02:00

35 Commits

Author SHA1 Message Date
f394af9257 Update main page 2020-10-29 13:43:36 +01:00
5b40aa886f Use random UDP ports to able to have multiple concurrent streams 2020-10-29 13:11:30 +01:00
9fc3d37e72 Update srtgo to don't depend anymore on C library 2020-10-29 13:04:13 +01:00
9e7e1ec0b8 Stream with the H264 codec to have no CPU usage 2020-10-27 19:32:23 +01:00
cdb56c8bf5 Fix sendLocalDescription retry in web client 2020-10-23 08:11:55 +02:00
ff2ebd76f1 Make viewer able to change quality 2020-10-22 18:41:14 +02:00
4cbb1d8192 Better javascript messages 2020-10-22 18:21:42 +02:00
24478bdc7a Retry message sending when websocket not ready 2020-10-22 13:42:45 +02:00
0f4c57bcde Cache go modules in CI 2020-10-22 10:38:47 +02:00
c0820db244 Merge branch 'websocket' into 'dev'
Websocket

See merge request nounous/ghostream!7
2020-10-22 08:26:41 +02:00
a2a74761bb Parse JSON from server SDP 2020-10-22 08:23:35 +02:00
ba8bf426e0 Fix JSON decoding 2020-10-22 08:19:01 +02:00
90d7bd4760 Add package comment in websocket_handler.go 2020-10-21 22:43:28 +02:00
2928e8ae77 Rename main.js to viewer.js 2020-10-21 22:43:11 +02:00
e461c0b526 Fix some undefined this in js classes 2020-10-21 22:38:36 +02:00
9d162b13ed WebRTC JS module 2020-10-21 22:10:39 +02:00
91c4e9d14d Forwarding should not have the re option 2020-10-21 13:34:39 +02:00
5ea8a0913b Add ability to format output URL with the start time of the stream, if the stream is recorded. 2020-10-21 11:47:06 +02:00
0b3fb87fa2 Working javascript modules 2020-10-20 21:59:07 +02:00
c88f473ec0 Remove old JS 2020-10-20 21:45:26 +02:00
11231ceb84 viewerCounter and websocket JS modules 2020-10-20 21:29:41 +02:00
01efba3e3f Handle websocket 2020-10-20 19:12:15 +02:00
ac2f87e936 Add HTML viewport 2020-10-20 09:24:56 +02:00
cd63c93dce Fix web player overflow 2020-10-19 21:52:53 +02:00
4727b2bf64 Fix typo in outputted 2020-10-19 21:49:37 +02:00
e1f83a32df Put webrtc SDP inside Quality struct 2020-10-19 21:45:23 +02:00
e848d92a1a Fix viewer count 2020-10-19 20:05:20 +02:00
d263f743f7 Update package web with Quality structure 2020-10-19 19:57:04 +02:00
d03d4fed40 Update package text with Quality structure 2020-10-19 19:52:24 +02:00
34200afaed Update package webrtc with Quality structure 2020-10-19 19:48:44 +02:00
340d0447a8 Update package telnet with Quality structure 2020-10-19 19:44:30 +02:00
069b2155be Update package srt with Quality structure 2020-10-19 19:40:36 +02:00
c317d91b8d Update package forwarding with Quality structure 2020-10-19 19:28:30 +02:00
bb589a71ce Add method to get quality 2020-10-19 19:28:04 +02:00
f825d3d513 New Streams and Quality structures 2020-10-19 19:14:46 +02:00
40 changed files with 1024 additions and 584 deletions

6
.gitignore vendored
View File

@ -17,3 +17,9 @@ pkged.go
# Profiler and test files # Profiler and test files
*.prof *.prof
*.test *.test
# Javascript tools
.eslintrc.js
node_modules
package.json
package-lock.json

View File

@ -2,8 +2,18 @@ stages:
- test - test
- quality-assurance - quality-assurance
.go-cache:
variables:
GOPATH: $CI_PROJECT_DIR/.go
before_script:
- mkdir -p .go
cache:
paths:
- .go/pkg/mod/
unit_tests: unit_tests:
image: golang:1.15-alpine image: golang:1.15-alpine
extends: .go-cache
stage: test stage: test
before_script: before_script:
- apk add --no-cache -X http://dl-cdn.alpinelinux.org/alpine/edge/community build-base ffmpeg gcc libsrt-dev - apk add --no-cache -X http://dl-cdn.alpinelinux.org/alpine/edge/community build-base ffmpeg gcc libsrt-dev
@ -18,6 +28,7 @@ unit_tests:
linters: linters:
image: golang:1.15-alpine image: golang:1.15-alpine
extends: .go-cache
stage: quality-assurance stage: quality-assurance
script: script:
- go get -u golang.org/x/lint/golint - go get -u golang.org/x/lint/golint

View File

@ -38,13 +38,17 @@ auth:
## Stream forwarding ## ## Stream forwarding ##
# Forward an incoming stream to other servers # Forward an incoming stream to other servers
# The URL can be anything FFMpeg can accept as an stream output # The URL can be anything FFMpeg can accept as an stream output
# If a file is specified, the name may contains %Y, %m, %d, %H, %M or %S
# that will be replaced by the current date information.
forwarding: forwarding:
# By default nothing is forwarded. # By default nothing is forwarded.
# #
# This example forwards a stream named "demo" to Twitch and YouTube, # This example forwards a stream named "demo" to Twitch and YouTube,
# and save the record in a timestamped-file,
#demo: #demo:
# - rtmp://live-cdg.twitch.tv/app/STREAM_KEY # - rtmp://live-cdg.twitch.tv/app/STREAM_KEY
# - rtmp://a.rtmp.youtube.com/live2/STREAM_KEY # - rtmp://a.rtmp.youtube.com/live2/STREAM_KEY
# - /home/ghostream/lives/%name/live-%Y-%m-%d-%H-%M-%S.flv
## Prometheus monitoring ## ## Prometheus monitoring ##
# Expose a monitoring endpoint for Prometheus # Expose a monitoring endpoint for Prometheus

3
go.mod
View File

@ -4,7 +4,8 @@ go 1.13
require ( require (
github.com/go-ldap/ldap/v3 v3.2.3 github.com/go-ldap/ldap/v3 v3.2.3
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a github.com/gorilla/websocket v1.4.0
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a
github.com/markbates/pkger v0.17.1 github.com/markbates/pkger v0.17.1
github.com/pion/rtp v1.6.0 github.com/pion/rtp v1.6.0
github.com/pion/webrtc/v3 v3.0.0-beta.5 github.com/pion/webrtc/v3 v3.0.0-beta.5

5
go.sum
View File

@ -113,6 +113,7 @@ github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk
github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
@ -121,6 +122,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a h1:JliMkv/mAqM5+QzG6Hkw1XcVl1crU8yIQGnhppMv7s0= github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a h1:JliMkv/mAqM5+QzG6Hkw1XcVl1crU8yIQGnhppMv7s0=
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a/go.mod h1:yVZ4oACfcnUAcxrh+0b6IuIWfkHLK3IAQ99tuuhRx54= github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a/go.mod h1:yVZ4oACfcnUAcxrh+0b6IuIWfkHLK3IAQ99tuuhRx54=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a h1:54abJQezjMoiP+xMQ3ZQbcDXFjqytAYm/n0EVqrYeXg=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a/go.mod h1:7izzTiCO3zc9ZIVTFMjxUiYL+kgryFP9rl3bsweqdmc=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
@ -415,6 +418,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c h1:38q6VNPWR010vN82/SB121GujZNIfAUb4YttE2rhGuc=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

15
main.go
View File

@ -10,7 +10,7 @@ import (
"gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/internal/config" "gitlab.crans.org/nounous/ghostream/internal/config"
"gitlab.crans.org/nounous/ghostream/internal/monitoring" "gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream/forwarding" "gitlab.crans.org/nounous/ghostream/stream/forwarding"
"gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/telnet"
@ -38,15 +38,8 @@ func main() {
defer authBackend.Close() defer authBackend.Close()
} }
// WebRTC session description channels
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
// Init streams messaging // Init streams messaging
streams := make(map[string]*stream.Stream) streams := messaging.New()
// Start routines // Start routines
go transcoder.Init(streams, &cfg.Transcoder) go transcoder.Init(streams, &cfg.Transcoder)
@ -54,8 +47,8 @@ func main() {
go monitoring.Serve(&cfg.Monitoring) go monitoring.Serve(&cfg.Monitoring)
go srt.Serve(streams, authBackend, &cfg.Srt) go srt.Serve(streams, authBackend, &cfg.Srt)
go telnet.Serve(streams, &cfg.Telnet) go telnet.Serve(streams, &cfg.Telnet)
go web.Serve(streams, remoteSdpChan, localSdpChan, &cfg.Web) go web.Serve(streams, &cfg.Web)
go webrtc.Serve(streams, remoteSdpChan, localSdpChan, &cfg.WebRTC) go webrtc.Serve(streams, &cfg.WebRTC)
// Wait for routines // Wait for routines
select {} select {}

89
messaging/quality.go Normal file
View File

@ -0,0 +1,89 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"sync"
"github.com/pion/webrtc/v3"
)
// Quality holds a specific stream quality.
// It makes packages able to subscribe to an incoming stream.
type Quality struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Incoming data will be outputted to all those outputs.
// Use a map to be able to delete an item.
outputs map[chan []byte]struct{}
// Mutex to lock outputs map
lockOutputs sync.Mutex
// WebRTC session descriptor exchange.
// When new client connects, a SDP arrives on WebRtcRemoteSdp,
// then webrtc package answers on WebRtcLocalSdp.
WebRtcLocalSdp chan webrtc.SessionDescription
WebRtcRemoteSdp chan webrtc.SessionDescription
}
func newQuality() (q *Quality) {
q = &Quality{}
broadcast := make(chan []byte, 1024)
q.Broadcast = broadcast
q.outputs = make(map[chan []byte]struct{})
q.WebRtcLocalSdp = make(chan webrtc.SessionDescription, 1)
q.WebRtcRemoteSdp = make(chan webrtc.SessionDescription, 1)
go q.run(broadcast)
return q
}
func (q *Quality) run(broadcast <-chan []byte) {
for msg := range broadcast {
q.lockOutputs.Lock()
for output := range q.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
q.lockOutputs.Unlock()
}
// Incoming chan has been closed, close all outputs
q.lockOutputs.Lock()
for ch := range q.outputs {
delete(q.outputs, ch)
close(ch)
}
q.lockOutputs.Unlock()
}
// Close the incoming chan, this will also delete all outputs.
func (q *Quality) Close() {
close(q.Broadcast)
}
// Register a new output on a stream.
func (q *Quality) Register(output chan []byte) {
q.lockOutputs.Lock()
q.outputs[output] = struct{}{}
q.lockOutputs.Unlock()
}
// Unregister removes an output.
func (q *Quality) Unregister(output chan []byte) {
// Make sure we did not already close this output
q.lockOutputs.Lock()
_, ok := q.outputs[output]
if ok {
delete(q.outputs, output)
close(output)
}
defer q.lockOutputs.Unlock()
}

84
messaging/stream.go Normal file
View File

@ -0,0 +1,84 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"errors"
"sync"
)
// Stream makes packages able to subscribe to an incoming stream
type Stream struct {
// Different qualities of this stream
qualities map[string]*Quality
// Mutex to lock outputs map
lockQualities sync.Mutex
// Count clients for statistics
nbClients int
}
func newStream() (s *Stream) {
s = &Stream{}
s.qualities = make(map[string]*Quality)
s.nbClients = 0
return s
}
// Close stream.
func (s *Stream) Close() {
for quality := range s.qualities {
s.DeleteQuality(quality)
}
}
// CreateQuality creates a new quality associated with this stream.
func (s *Stream) CreateQuality(name string) (quality *Quality, err error) {
// If quality already exist, fail
if _, ok := s.qualities[name]; ok {
return nil, errors.New("quality already exists")
}
s.lockQualities.Lock()
quality = newQuality()
s.qualities[name] = quality
s.lockQualities.Unlock()
return quality, nil
}
// DeleteQuality removes a stream quality.
func (s *Stream) DeleteQuality(name string) {
// Make sure we did not already close this output
s.lockQualities.Lock()
if _, ok := s.qualities[name]; ok {
s.qualities[name].Close()
delete(s.qualities, name)
}
s.lockQualities.Unlock()
}
// GetQuality gets a specific stream quality.
func (s *Stream) GetQuality(name string) (quality *Quality, err error) {
s.lockQualities.Lock()
quality, ok := s.qualities[name]
s.lockQualities.Unlock()
if !ok {
return nil, errors.New("quality does not exist")
}
return quality, nil
}
// ClientCount returns the number of clients.
func (s *Stream) ClientCount() int {
return s.nbClients
}
// IncrementClientCount increments the number of clients.
func (s *Stream) IncrementClientCount() {
s.nbClients++
}
// DecrementClientCount decrements the number of clients.
func (s *Stream) DecrementClientCount() {
s.nbClients--
}

98
messaging/streams.go Normal file
View File

@ -0,0 +1,98 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"errors"
"log"
"sync"
)
// Streams hold all application streams.
type Streams struct {
// Associate each stream name to the stream
streams map[string]*Stream
// Mutex to lock streams
lockStreams sync.Mutex
// Subscribers get notified when a new stream is created
// Use a map to be able to delete a subscriber
eventSubscribers map[chan string]struct{}
// Mutex to lock eventSubscribers
lockSubscribers sync.Mutex
}
// New creates a new stream list.
func New() (l *Streams) {
l = &Streams{}
l.streams = make(map[string]*Stream)
l.eventSubscribers = make(map[chan string]struct{})
return l
}
// Subscribe to get notified on new stream.
func (l *Streams) Subscribe(output chan string) {
l.lockSubscribers.Lock()
l.eventSubscribers[output] = struct{}{}
l.lockSubscribers.Unlock()
}
// Unsubscribe to no longer get notified on new stream.
func (l *Streams) Unsubscribe(output chan string) {
// Make sure we did not already delete this subscriber
l.lockSubscribers.Lock()
if _, ok := l.eventSubscribers[output]; ok {
delete(l.eventSubscribers, output)
}
l.lockSubscribers.Unlock()
}
// Create a new stream.
func (l *Streams) Create(name string) (s *Stream, err error) {
// If stream already exist, fail
if _, ok := l.streams[name]; ok {
return nil, errors.New("stream already exists")
}
// Create stream
s = newStream()
l.lockStreams.Lock()
l.streams[name] = s
l.lockStreams.Unlock()
// Notify
l.lockSubscribers.Lock()
for sub := range l.eventSubscribers {
select {
case sub <- name:
default:
log.Printf("Failed to announce stream '%s' to subscriber", name)
}
}
l.lockSubscribers.Unlock()
return s, nil
}
// Get a stream.
func (l *Streams) Get(name string) (s *Stream, err error) {
// If stream does exist, return it
l.lockStreams.Lock()
s, ok := l.streams[name]
l.lockStreams.Unlock()
if !ok {
return nil, errors.New("stream does not exist")
}
return s, nil
}
// Delete a stream.
func (l *Streams) Delete(name string) {
// Make sure we did not already delete this stream
l.lockStreams.Lock()
if _, ok := l.streams[name]; ok {
l.streams[name].Close()
delete(l.streams, name)
}
l.lockStreams.Unlock()
}

55
messaging/streams_test.go Normal file
View File

@ -0,0 +1,55 @@
package messaging
import "testing"
func TestWithOneStream(t *testing.T) {
streams := New()
// Subscribe to new streams
event := make(chan string, 8)
streams.Subscribe(event)
// Create a stream
stream, err := streams.Create("demo")
if err != nil {
t.Errorf("Failed to create stream")
}
// Check that we receive the creation event
e := <-event
if e != "demo" {
t.Errorf("Message has wrong content: %s != demo", e)
}
// Create a quality
quality, err := stream.CreateQuality("source")
if err != nil {
t.Errorf("Failed to create quality")
}
// Register one output
output := make(chan []byte, 64)
quality.Register(output)
stream.IncrementClientCount()
// Try to pass one message
quality.Broadcast <- []byte("hello world")
msg := <-output
if string(msg) != "hello world" {
t.Errorf("Message has wrong content: %s != hello world", msg)
}
// Check client count
if count := stream.ClientCount(); count != 1 {
t.Errorf("Client counter returned %d, expected 1", count)
}
// Unregister
quality.Unregister(output)
stream.DecrementClientCount()
// Check client count
if count := stream.ClientCount(); count != 0 {
t.Errorf("Client counter returned %d, expected 0", count)
}
}

View File

@ -3,11 +3,13 @@ package forwarding
import ( import (
"bufio" "bufio"
"fmt"
"log" "log"
"os/exec" "os/exec"
"strings"
"time" "time"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// Options to configure the stream forwarding. // Options to configure the stream forwarding.
@ -15,43 +17,65 @@ import (
type Options map[string][]string type Options map[string][]string
// Serve handles incoming packets from SRT and forward them to other external services // Serve handles incoming packets from SRT and forward them to other external services
func Serve(streams map[string]*stream.Stream, cfg Options) { func Serve(streams *messaging.Streams, cfg Options) {
if len(cfg) < 1 { if len(cfg) < 1 {
// No forwarding, ignore // No forwarding, ignore
return return
} }
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
log.Printf("Stream forwarding initialized") log.Printf("Stream forwarding initialized")
for {
for name, st := range streams {
fwdCfg, ok := cfg[name]
if !ok {
// Not configured
continue
}
// Start forwarding // For each new stream
log.Printf("Starting forwarding for '%s'", name) for name := range event {
go forward(st, fwdCfg) streamCfg, ok := cfg[name]
if !ok {
// Not configured
continue
} }
// Regulary pull stream list, // Get stream
// it may be better to tweak the messaging system stream, err := streams.Get(name)
// to get an event on a new stream. if err != nil {
time.Sleep(time.Second) log.Printf("Failed to get stream '%s'", name)
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting forwarding for '%s' quality '%s'", name, qualityName)
go forward(name, quality, streamCfg)
} }
} }
// Start a FFMPEG instance and redirect stream output to forwarded streams // Start a FFMPEG instance and redirect stream output to forwarded streams
func forward(st *stream.Stream, fwdCfg []string) { func forward(streamName string, q *messaging.Quality, fwdCfg []string) {
output := make(chan []byte, 1024) output := make(chan []byte, 1024)
st.Register(output) q.Register(output)
// Launch FFMPEG instance // Launch FFMPEG instance
params := []string{"-hide_banner", "-loglevel", "error", "-re", "-i", "pipe:0"} params := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0"}
for _, url := range fwdCfg { for _, url := range fwdCfg {
// If the url should be date-formatted, replace special characters with the current time information
now := time.Now()
formattedURL := strings.ReplaceAll(url, "%Y", fmt.Sprintf("%04d", now.Year()))
formattedURL = strings.ReplaceAll(formattedURL, "%m", fmt.Sprintf("%02d", now.Month()))
formattedURL = strings.ReplaceAll(formattedURL, "%d", fmt.Sprintf("%02d", now.Day()))
formattedURL = strings.ReplaceAll(formattedURL, "%H", fmt.Sprintf("%02d", now.Hour()))
formattedURL = strings.ReplaceAll(formattedURL, "%M", fmt.Sprintf("%02d", now.Minute()))
formattedURL = strings.ReplaceAll(formattedURL, "%S", fmt.Sprintf("%02d", now.Second()))
formattedURL = strings.ReplaceAll(formattedURL, "%name", streamName)
params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency", params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency",
"-c", "copy", url) "-c", "copy", formattedURL)
} }
ffmpeg := exec.Command("ffmpeg", params...) ffmpeg := exec.Command("ffmpeg", params...)
@ -77,14 +101,14 @@ func forward(st *stream.Stream, fwdCfg []string) {
_ = input.Close() _ = input.Close()
_ = errOutput.Close() _ = errOutput.Close()
_ = ffmpeg.Process.Kill() _ = ffmpeg.Process.Kill()
st.Unregister(output) q.Unregister(output)
}() }()
// Log standard error output // Log standard error output
go func() { go func() {
scanner := bufio.NewScanner(errOutput) scanner := bufio.NewScanner(errOutput)
for scanner.Scan() { for scanner.Scan() {
log.Printf("[FORWARDING FFMPEG] %s", scanner.Text()) log.Printf("[FORWARDING FFMPEG %s] %s", streamName, scanner.Text())
} }
}() }()

View File

@ -6,7 +6,7 @@ import (
"testing" "testing"
"time" "time"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/srt"
) )
@ -35,7 +35,7 @@ func TestForwardStream(t *testing.T) {
cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
// Register forwarding stream list // Register forwarding stream list
streams := make(map[string]*stream.Stream) streams := messaging.New()
go Serve(streams, cfg) go Serve(streams, cfg)
// Serve SRT Server without authentification backend // Serve SRT Server without authentification backend

View File

@ -1,99 +0,0 @@
// Package stream defines a structure to communication between inputs and outputs
package stream
import (
"sync"
)
// Stream makes packages able to subscribe to an incoming stream
type Stream struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Use a map to be able to delete an item
outputs map[chan []byte]struct{}
// Count clients for statistics
nbClients int
// Mutex to lock outputs map
lock sync.Mutex
}
// New creates a new stream.
func New() *Stream {
s := &Stream{}
broadcast := make(chan []byte, 1024)
s.Broadcast = broadcast
s.outputs = make(map[chan []byte]struct{})
s.nbClients = 0
go s.run(broadcast)
return s
}
func (s *Stream) run(broadcast <-chan []byte) {
for msg := range broadcast {
s.lock.Lock()
for output := range s.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
s.lock.Unlock()
}
// Incoming chan has been closed, close all outputs
s.lock.Lock()
for ch := range s.outputs {
delete(s.outputs, ch)
close(ch)
}
s.lock.Unlock()
}
// Close the incoming chan, this will also delete all outputs
func (s *Stream) Close() {
close(s.Broadcast)
}
// Register a new output on a stream.
func (s *Stream) Register(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
s.outputs[output] = struct{}{}
}
// Unregister removes an output.
// If hidden in true, then do not count this client.
func (s *Stream) Unregister(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
// Make sure we did not already close this output
_, ok := s.outputs[output]
if ok {
delete(s.outputs, output)
close(output)
}
}
// ClientCount returns the number of clients
func (s *Stream) ClientCount() int {
return s.nbClients
}
// IncrementClientCount increments the number of clients
func (s *Stream) IncrementClientCount() {
s.nbClients++
}
// DecrementClientCount decrements the number of clients
func (s *Stream) DecrementClientCount() {
s.nbClients--
}

View File

@ -1,42 +0,0 @@
package stream
import (
"testing"
)
func TestWithoutOutputs(t *testing.T) {
stream := New()
defer stream.Close()
stream.Broadcast <- []byte("hello world")
}
func TestWithOneOutput(t *testing.T) {
stream := New()
defer stream.Close()
// Register one output
output := make(chan []byte, 64)
stream.Register(output)
stream.IncrementClientCount()
// Try to pass one message
stream.Broadcast <- []byte("hello world")
msg := <-output
if string(msg) != "hello world" {
t.Errorf("Message has wrong content: %s != hello world", msg)
}
// Check client count
if count := stream.ClientCount(); count != 1 {
t.Errorf("Client counter returned %d, expected 1", count)
}
// Unregister
stream.Unregister(output)
stream.DecrementClientCount()
// Check client count
if count := stream.ClientCount(); count != 0 {
t.Errorf("Client counter returned %d, expected 0", count)
}
}

View File

@ -5,21 +5,26 @@ import (
"log" "log"
"github.com/haivision/srtgo" "github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) { func handleStreamer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) {
// Check stream does not exist // Create stream
if _, ok := streams[name]; ok { stream, err := streams.Create(name)
log.Print("Stream already exists, refusing new streamer") if err != nil {
log.Printf("Error on stream creating: %s", err)
socket.Close() socket.Close()
return return
} }
// Create stream // Create source quality
log.Printf("New SRT streamer for stream %s", name) q, err := stream.CreateQuality("source")
st := stream.New() if err != nil {
streams[name] = st log.Printf("Error on quality creating: %s", err)
socket.Close()
return
}
log.Printf("New SRT streamer for stream '%s' quality 'source'", name)
// Read RTP packets forever and send them to the WebRTC Client // Read RTP packets forever and send them to the WebRTC Client
for { for {
@ -42,29 +47,38 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream,
// Send raw data to other streams // Send raw data to other streams
buff = buff[:n] buff = buff[:n]
st.Broadcast <- buff q.Broadcast <- buff
} }
// Close stream // Close stream
st.Close() streams.Delete(name)
socket.Close() socket.Close()
delete(streams, name)
} }
func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) { func handleViewer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) {
log.Printf("New SRT viewer for stream %s", name)
// Get requested stream // Get requested stream
st, ok := streams[name] stream, err := streams.Get(name)
if !ok { if err != nil {
log.Println("Stream does not exist, refusing new viewer") log.Printf("Failed to get stream: %s", err)
socket.Close()
return return
} }
// Get requested quality
// FIXME: make qualities available
qualityName := "source"
q, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality: %s", err)
socket.Close()
return
}
log.Printf("New SRT viewer for stream %s quality %s", name, qualityName)
// Register new output // Register new output
c := make(chan []byte, 1024) c := make(chan []byte, 1024)
st.Register(c) q.Register(c)
st.IncrementClientCount() stream.IncrementClientCount()
// Receive data and send them // Receive data and send them
for data := range c { for data := range c {
@ -74,7 +88,7 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name st
} }
// Send data // Send data
_, err := s.Write(data, 1000) _, err := socket.Write(data, 1000)
if err != nil { if err != nil {
log.Printf("Remove SRT viewer because of sending error, %s", err) log.Printf("Remove SRT viewer because of sending error, %s", err)
break break
@ -82,7 +96,7 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name st
} }
// Close output // Close output
st.Unregister(c) q.Unregister(c)
st.DecrementClientCount() stream.DecrementClientCount()
s.Close() socket.Close()
} }

View File

@ -1,9 +1,6 @@
// Package srt serves a SRT server // Package srt serves a SRT server
package srt package srt
// #include <srt/srt.h>
import "C"
import ( import (
"log" "log"
"net" "net"
@ -12,7 +9,7 @@ import (
"github.com/haivision/srtgo" "github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// Options holds web package configuration // Options holds web package configuration
@ -39,7 +36,7 @@ func splitHostPort(hostport string) (string, uint16, error) {
} }
// Serve SRT server // Serve SRT server
func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Options) { func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// SRT is not enabled, ignore // SRT is not enabled, ignore
return return
@ -62,7 +59,7 @@ func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Opt
for { for {
// Wait for new connection // Wait for new connection
s, err := sck.Accept() s, _, err := sck.Accept()
if err != nil { if err != nil {
// Something wrong happened // Something wrong happened
log.Println(err) log.Println(err)
@ -73,7 +70,7 @@ func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Opt
// Without this, the SRT buffer might get full before reading it // Without this, the SRT buffer might get full before reading it
// streamid can be "name:password" for streamer or "name" for viewer // streamid can be "name:password" for streamer or "name" for viewer
streamID, err := s.GetSockOptString(C.SRTO_STREAMID) streamID, err := s.GetSockOptString(srtgo.SRTO_STREAMID)
if err != nil { if err != nil {
log.Print("Failed to get socket streamid") log.Print("Failed to get socket streamid")
continue continue

View File

@ -6,7 +6,7 @@ import (
"testing" "testing"
"time" "time"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à // TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à
@ -58,7 +58,7 @@ func TestServeSRT(t *testing.T) {
} }
// Init streams messaging and SRT server // Init streams messaging and SRT server
streams := make(map[string]*stream.Stream) streams := messaging.New()
go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}) go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2})
ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",

View File

@ -7,7 +7,7 @@ import (
"strings" "strings"
"time" "time"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// Options holds telnet package configuration // Options holds telnet package configuration
@ -17,7 +17,7 @@ type Options struct {
} }
// Serve Telnet server // Serve Telnet server
func Serve(streams map[string]*stream.Stream, cfg *Options) { func Serve(streams *messaging.Streams, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// Telnet is not enabled, ignore // Telnet is not enabled, ignore
return return
@ -32,17 +32,17 @@ func Serve(streams map[string]*stream.Stream, cfg *Options) {
// Handle each new client // Handle each new client
for { for {
s, err := listener.Accept() socket, err := listener.Accept()
if err != nil { if err != nil {
log.Printf("Error while accepting TCP socket: %s", s) log.Printf("Error while accepting TCP socket: %s", err)
continue continue
} }
go handleViewer(s, streams, cfg) go handleViewer(socket, streams, cfg)
} }
} }
func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) { func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
// Prompt user about stream name // Prompt user about stream name
if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil { if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err) log.Printf("Error while writing to TCP socket: %s", err)
@ -56,7 +56,7 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
s.Close() s.Close()
return return
} }
name := strings.TrimSpace(string(buff[:n])) + "@text" name := strings.TrimSpace(string(buff[:n]))
if len(name) < 1 { if len(name) < 1 {
// Too short, exit // Too short, exit
s.Close() s.Close()
@ -67,9 +67,9 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
time.Sleep(time.Second) time.Sleep(time.Second)
// Get requested stream // Get requested stream
st, ok := streams[name] stream, err := streams.Get(name)
if !ok { if err != nil {
log.Println("Stream does not exist, kicking new Telnet viewer") log.Printf("Kicking new Telnet viewer: %s", err)
if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil { if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err) log.Printf("Error while writing to TCP socket: %s", err)
} }
@ -77,11 +77,23 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
return return
} }
// Get requested quality
qualityName := "text"
q, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Kicking new Telnet viewer: %s", err)
if _, err := s.Write([]byte("This stream is not converted to text.\n")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
}
s.Close()
return
}
log.Printf("New Telnet viewer for stream %s quality %s", name, qualityName)
// Register new client // Register new client
log.Printf("New Telnet viewer for stream '%s'", name)
c := make(chan []byte, 128) c := make(chan []byte, 128)
st.Register(c) q.Register(c)
st.IncrementClientCount() stream.IncrementClientCount()
// Hide terminal cursor // Hide terminal cursor
if _, err = s.Write([]byte("\033[?25l")); err != nil { if _, err = s.Write([]byte("\033[?25l")); err != nil {
@ -106,7 +118,7 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
} }
// Close output // Close output
st.Unregister(c) q.Unregister(c)
st.DecrementClientCount() stream.DecrementClientCount()
s.Close() s.Close()
} }

View File

@ -3,13 +3,13 @@ package telnet
import ( import (
"testing" "testing"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// TestTelnetOutput creates a TCP client that connects to the server and get one image. // TestTelnetOutput creates a TCP client that connects to the server and get one image.
func TestTelnetOutput(t *testing.T) { func TestTelnetOutput(t *testing.T) {
// Try to start Telnet server while it is disabled // Try to start Telnet server while it is disabled
streams := make(map[string]*stream.Stream) streams := messaging.New()
go Serve(streams, &Options{Enabled: false}) go Serve(streams, &Options{Enabled: false})
// FIXME test connect // FIXME test connect

View File

@ -3,68 +3,39 @@ package webrtc
import ( import (
"bufio" "bufio"
"fmt"
"log" "log"
"math/rand"
"net" "net"
"os/exec" "os/exec"
"strings"
"time"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
var ( func ingest(name string, q *messaging.Quality) {
activeStream map[string]struct{}
)
func autoIngest(streams map[string]*stream.Stream) {
// Regulary check existing streams
activeStream = make(map[string]struct{})
for {
for name, st := range streams {
if strings.Contains(name, "@") {
// Not a source stream, pass
continue
}
if _, ok := activeStream[name]; ok {
// Stream is already ingested
continue
}
// Start ingestion
log.Printf("Starting webrtc for '%s'", name)
go ingest(name, st)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
func ingest(name string, input *stream.Stream) {
// Register to get stream // Register to get stream
videoInput := make(chan []byte, 1024) videoInput := make(chan []byte, 1024)
input.Register(videoInput) q.Register(videoInput)
activeStream[name] = struct{}{}
// Open a UDP Listener for RTP Packets on port 5004 // FIXME Mux into RTP without having multiple UDP listeners
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) firstPort := int(rand.Int31n(63535)) + 2000
// Open UDP listeners for RTP Packets
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort})
if err != nil { if err != nil {
log.Printf("Faited to open UDP listener %s", err) log.Printf("Faited to open UDP listener %s", err)
return return
} }
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005}) videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort + 1})
if err != nil { if err != nil {
log.Printf("Faited to open UDP listener %s", err) log.Printf("Faited to open UDP listener %s", err)
return return
} }
// Start ffmpag to convert videoInput to video and audio UDP // Start ffmpag to convert videoInput to video and audio UDP
ffmpeg, err := startFFmpeg(videoInput) ffmpeg, err := startFFmpeg(videoInput, firstPort)
if err != nil { if err != nil {
log.Printf("Error while starting ffmpeg: %s", err) log.Printf("Error while starting ffmpeg: %s", err)
return return
@ -146,17 +117,17 @@ func ingest(name string, input *stream.Stream) {
if err = audioListener.Close(); err != nil { if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err) log.Printf("Faited to close UDP listener: %s", err)
} }
delete(activeStream, name) q.Unregister(videoInput)
} }
func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { func startFFmpeg(in <-chan []byte, listeningPort int) (ffmpeg *exec.Cmd, err error) {
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality // Audio
"-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1", "-vn", "-c:a", "libopus", "-b:a", "160k",
"-auto-alt-ref", "1", "-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort),
"-f", "rtp", "rtp://127.0.0.1:5004", // Source
"-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1", "-an", "-c:v", "copy", "-b:v", "3000k", "-maxrate", "5000k", "-bufsize", "5000k",
"-f", "rtp", "rtp://127.0.0.1:5005"} "-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort+1)}
ffmpeg = exec.Command("ffmpeg", ffmpegArgs...) ffmpeg = exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output // Handle errors output

View File

@ -8,7 +8,7 @@ import (
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/internal/monitoring" "gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// Options holds web package configuration // Options holds web package configuration
@ -45,13 +45,10 @@ func GetNumberConnectedSessions(streamID string) int {
// newPeerHandler is called when server receive a new session description // newPeerHandler is called when server receive a new session description
// this initiates a WebRTC connection and return server description // this initiates a WebRTC connection and return server description
func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struct { func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, remoteSdp webrtc.SessionDescription, cfg *Options) {
StreamID string
RemoteDescription webrtc.SessionDescription
}, cfg *Options) {
// Create media engine using client SDP // Create media engine using client SDP
mediaEngine := webrtc.MediaEngine{} mediaEngine := webrtc.MediaEngine{}
if err := mediaEngine.PopulateFromSDP(remoteSdp.RemoteDescription); err != nil { if err := mediaEngine.PopulateFromSDP(remoteSdp); err != nil {
log.Println("Failed to create new media engine", err) log.Println("Failed to create new media engine", err)
localSdpChan <- webrtc.SessionDescription{} localSdpChan <- webrtc.SessionDescription{}
return return
@ -78,7 +75,7 @@ func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struc
} }
// Create video track // Create video track
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8") codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "H264")
videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec) videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec)
if err != nil { if err != nil {
log.Println("Failed to create new video track", err) log.Println("Failed to create new video track", err)
@ -106,13 +103,13 @@ func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struc
} }
// Set the remote SessionDescription // Set the remote SessionDescription
if err = peerConnection.SetRemoteDescription(remoteSdp.RemoteDescription); err != nil { if err = peerConnection.SetRemoteDescription(remoteSdp); err != nil {
log.Println("Failed to set remote description", err) log.Println("Failed to set remote description", err)
localSdpChan <- webrtc.SessionDescription{} localSdpChan <- webrtc.SessionDescription{}
return return
} }
streamID := remoteSdp.StreamID streamID := name
split := strings.SplitN(streamID, "@", 2) split := strings.SplitN(streamID, "@", 2)
streamID = split[0] streamID = split[0]
quality := "source" quality := "source"
@ -182,10 +179,7 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa
} }
// Serve WebRTC media streaming server // Serve WebRTC media streaming server
func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct { func Serve(streams *messaging.Streams, cfg *Options) {
StreamID string
RemoteDescription webrtc.SessionDescription
}, localSdpChan chan webrtc.SessionDescription, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// WebRTC is not enabled, ignore // WebRTC is not enabled, ignore
return return
@ -193,17 +187,42 @@ func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct {
log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP) log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP)
// Allocate memory // WebRTC ingested tracks
videoTracks = make(map[string][]*webrtc.Track) videoTracks = make(map[string][]*webrtc.Track)
audioTracks = make(map[string][]*webrtc.Track) audioTracks = make(map[string][]*webrtc.Track)
// Ingest data // Subscribe to new stream event
go autoIngest(streams) event := make(chan string, 8)
streams.Subscribe(event)
// For each new stream
for name := range event {
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName)
go ingest(name, quality)
go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg)
}
}
func listenSdp(name string, localSdp, remoteSdp chan webrtc.SessionDescription, cfg *Options) {
// Handle new connections // Handle new connections
for { for {
// Wait for incoming session description // Wait for incoming session description
// then send the local description to browser // then send the local description to browser
newPeerHandler(localSdpChan, <-remoteSdpChan, cfg) newPeerHandler(name, localSdp, <-remoteSdp, cfg)
} }
} }

View File

@ -5,24 +5,19 @@ import (
"testing" "testing"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
func TestServe(t *testing.T) { func TestServe(t *testing.T) {
// Init streams messaging and WebRTC server // Init streams messaging and WebRTC server
streams := make(map[string]*stream.Stream) streams := messaging.New()
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
cfg := Options{ cfg := Options{
Enabled: true, Enabled: true,
MinPortUDP: 10000, MinPortUDP: 10000,
MaxPortUDP: 10005, MaxPortUDP: 10005,
STUNServers: []string{"stun:stun.l.google.com:19302"}, STUNServers: []string{"stun:stun.l.google.com:19302"},
} }
go Serve(streams, remoteSdpChan, localSdpChan, &cfg) go Serve(streams, &cfg)
// New client connection // New client connection
mediaEngine := webrtc.MediaEngine{} mediaEngine := webrtc.MediaEngine{}
@ -31,7 +26,7 @@ func TestServe(t *testing.T) {
peerConnection, _ := api.NewPeerConnection(webrtc.Configuration{}) peerConnection, _ := api.NewPeerConnection(webrtc.Configuration{})
// Create video track // Create video track
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8") codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "H264")
videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec) videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec)
if err != nil { if err != nil {
t.Error("Failed to create new video track", err) t.Error("Failed to create new video track", err)
@ -58,12 +53,6 @@ func TestServe(t *testing.T) {
peerConnection.SetLocalDescription(offer) peerConnection.SetLocalDescription(offer)
<-gatherComplete <-gatherComplete
// Send offer to server // FIXME: Send offer to server
remoteSdpChan <- struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}{"demo", *peerConnection.LocalDescription()}
_ = <-localSdpChan
// FIXME: verify connection did work // FIXME: verify connection did work
} }

View File

@ -8,10 +8,8 @@ import (
"io" "io"
"log" "log"
"os/exec" "os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// Options holds text package configuration // Options holds text package configuration
@ -23,45 +21,46 @@ type Options struct {
} }
// Init text transcoder // Init text transcoder
func Init(streams map[string]*stream.Stream, cfg *Options) { func Init(streams *messaging.Streams, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// Text transcode is not enabled, ignore // Text transcode is not enabled, ignore
return return
} }
// Regulary check existing streams // Subscribe to new stream event
for { event := make(chan string, 8)
for sourceName, sourceStream := range streams { streams.Subscribe(event)
if strings.Contains(sourceName, "@") {
// Not a source stream, pass
continue
}
// Check that the transcoded stream does not already exist // For each new stream
name := sourceName + "@text" for name := range event {
_, ok := streams[name] // Get stream
if ok { stream, err := streams.Get(name)
// Stream is already transcoded if err != nil {
continue log.Printf("Failed to get stream '%s'", name)
}
// Start conversion
log.Printf("Starting text transcode '%s'", name)
st := stream.New()
streams[name] = st
go transcode(sourceStream, st, cfg)
} }
// Regulary pull stream list, // Get specific quality
// it may be better to tweak the messaging system // FIXME: make it possible to forward other qualities
// to get an event on a new stream. qualityName := "source"
time.Sleep(time.Second) quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Create new text quality
outputQuality, err := stream.CreateQuality("text")
if err != nil {
log.Printf("Failed to create quality 'text': %s", err)
}
// Start forwarding
log.Printf("Starting text transcoder for '%s' quality '%s'", name, qualityName)
go transcode(quality, outputQuality, cfg)
} }
} }
// Convert video to ANSI text // Convert video to ANSI text
func transcode(input, output *stream.Stream, cfg *Options) { func transcode(input, output *messaging.Quality, cfg *Options) {
// Start ffmpeg to transcode video to rawvideo // Start ffmpeg to transcode video to rawvideo
videoInput := make(chan []byte, 1024) videoInput := make(chan []byte, 1024)
input.Register(videoInput) input.Register(videoInput)

View File

@ -2,7 +2,7 @@
package transcoder package transcoder
import ( import (
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/transcoder/text" "gitlab.crans.org/nounous/ghostream/transcoder/text"
) )
@ -12,6 +12,6 @@ type Options struct {
} }
// Init all transcoders // Init all transcoders
func Init(streams map[string]*stream.Stream, cfg *Options) { func Init(streams *messaging.Streams, cfg *Options) {
go text.Init(streams, &cfg.Text) go text.Init(streams, &cfg.Text)
} }

View File

@ -21,61 +21,20 @@ var (
validPath = regexp.MustCompile("^/[a-z0-9@_-]*$") validPath = regexp.MustCompile("^/[a-z0-9@_-]*$")
) )
// Handle WebRTC session description exchange via POST // Handle site index and viewer pages
func viewerPostHandler(w http.ResponseWriter, r *http.Request) { func viewerHandler(w http.ResponseWriter, r *http.Request) {
// Limit response body to 128KB // Validation on path
r.Body = http.MaxBytesReader(w, r.Body, 131072) if validPath.FindStringSubmatch(r.URL.Path) == nil {
http.NotFound(w, r)
// Get stream ID from URL, or from domain name log.Printf("Replied not found on %s", r.URL.Path)
path := r.URL.Path[1:]
host := r.Host
if strings.Contains(host, ":") {
realHost, _, err := net.SplitHostPort(r.Host)
if err != nil {
log.Printf("Failed to split host and port from %s", r.Host)
return
}
host = realHost
}
host = strings.Replace(host, ".", "-", -1)
if streamID, ok := cfg.MapDomainToStream[host]; ok {
path = streamID
}
// Decode client description
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
remoteDescription := webrtc.SessionDescription{}
if err := dec.Decode(&remoteDescription); err != nil {
http.Error(w, "The JSON WebRTC offer is malformed", http.StatusBadRequest)
return return
} }
// Exchange session descriptions with WebRTC stream server // Check method
remoteSdpChan <- struct { if r.Method != http.MethodGet {
StreamID string http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
RemoteDescription webrtc.SessionDescription
}{StreamID: path, RemoteDescription: remoteDescription}
localDescription := <-localSdpChan
// Send server description as JSON
jsonDesc, err := json.Marshal(localDescription)
if err != nil {
http.Error(w, "An error occurred while formating response", http.StatusInternalServerError)
log.Println("An error occurred while sending session description", err)
return
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonDesc)
if err != nil {
log.Println("An error occurred while sending session description", err)
} }
// Increment monitoring
monitoring.WebSessions.Inc()
}
func viewerGetHandler(w http.ResponseWriter, r *http.Request) {
// Get stream ID from URL, or from domain name // Get stream ID from URL, or from domain name
path := r.URL.Path[1:] path := r.URL.Path[1:]
host := r.Host host := r.Host
@ -122,27 +81,6 @@ func viewerGetHandler(w http.ResponseWriter, r *http.Request) {
monitoring.WebViewerServed.Inc() monitoring.WebViewerServed.Inc()
} }
// Handle site index and viewer pages
// POST requests are used to exchange WebRTC session descriptions
func viewerHandler(w http.ResponseWriter, r *http.Request) {
// Validation on path
if validPath.FindStringSubmatch(r.URL.Path) == nil {
http.NotFound(w, r)
log.Printf("Replied not found on %s", r.URL.Path)
return
}
// Route depending on HTTP method
switch r.Method {
case http.MethodGet:
viewerGetHandler(w, r)
case http.MethodPost:
viewerPostHandler(w, r)
default:
http.Error(w, "Sorry, only GET and POST methods are supported.", http.StatusBadRequest)
}
}
func staticHandler() http.Handler { func staticHandler() http.Handler {
// Set up static files server // Set up static files server
staticFs := http.FileServer(pkger.Dir("/web/static")) staticFs := http.FileServer(pkger.Dir("/web/static"))
@ -153,19 +91,16 @@ func statisticsHandler(w http.ResponseWriter, r *http.Request) {
name := strings.SplitN(strings.Replace(r.URL.Path[7:], "/", "", -1), "@", 2)[0] name := strings.SplitN(strings.Replace(r.URL.Path[7:], "/", "", -1), "@", 2)[0]
userCount := 0 userCount := 0
// Get all substreams // Get requested stream
for _, outputType := range []string{"", "@720p", "@480p", "@360p", "@240p", "@text"} { stream, err := streams.Get(name)
// Get requested stream if err == nil {
stream, ok := streams[name+outputType] userCount = stream.ClientCount()
if ok { userCount += webrtc.GetNumberConnectedSessions(name)
// Get number of output channels
userCount += stream.ClientCount()
}
} }
// Display connected users statistics // Display connected users statistics
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
err := enc.Encode(struct{ ConnectedViewers int }{userCount}) err = enc.Encode(struct{ ConnectedViewers int }{userCount})
if err != nil { if err != nil {
http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError) http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError)
log.Printf("Failed to generate JSON: %s", err) log.Printf("Failed to generate JSON: %s", err)

View File

@ -4,6 +4,8 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"gitlab.crans.org/nounous/ghostream/messaging"
) )
func TestViewerPageGET(t *testing.T) { func TestViewerPageGET(t *testing.T) {
@ -12,6 +14,9 @@ func TestViewerPageGET(t *testing.T) {
t.Errorf("Failed to load templates: %v", err) t.Errorf("Failed to load templates: %v", err)
} }
// Init streams messaging
streams = messaging.New()
cfg = &Options{} cfg = &Options{}
// Test GET request // Test GET request

View File

@ -2,7 +2,7 @@
video { video {
display: block; display: block;
flex-grow: 1; flex-grow: 1;
width: 100%; max-width: 100%;
/* Black borders around video */ /* Black borders around video */
background-color: #000; background-color: #000;

View File

@ -0,0 +1,29 @@
/**
* ViewerCounter show the number of active viewers
*/
export class ViewerCounter {
/**
* @param {HTMLElement} element
* @param {String} streamName
*/
constructor(element, streamName) {
this.element = element;
this.url = "/_stats/" + streamName;
}
/**
* Regulary update counter
*
* @param {Number} updatePeriod
*/
regularUpdate(updatePeriod) {
setInterval(() => this.refreshViewersCounter(), updatePeriod);
}
refreshViewersCounter() {
fetch(this.url)
.then(response => response.json())
.then((data) => this.element.innerText = data.ConnectedViewers)
.catch(console.log);
}
}

View File

@ -0,0 +1,99 @@
/**
* GsWebRTC to connect to Ghostream
*/
export class GsWebRTC {
/**
* @param {list} stunServers STUN servers
* @param {HTMLElement} viewer Video HTML element
* @param {HTMLElement} connectionIndicator Connection indicator element
*/
constructor(stunServers, viewer, connectionIndicator) {
this.viewer = viewer;
this.connectionIndicator = connectionIndicator;
this.pc = new RTCPeerConnection({
iceServers: [{ urls: stunServers }]
});
// We want to receive audio and video
this.pc.addTransceiver("video", { "direction": "sendrecv" });
this.pc.addTransceiver("audio", { "direction": "sendrecv" });
// Configure events
this.pc.oniceconnectionstatechange = () => this._onConnectionStateChange();
this.pc.ontrack = (e) => this._onTrack(e);
}
/**
* On connection change, log it and change indicator.
* If connection closed or failed, try to reconnect.
*/
_onConnectionStateChange() {
console.log("[WebRTC] ICE connection state changed to " + this.pc.iceConnectionState);
switch (this.pc.iceConnectionState) {
case "disconnected":
this.connectionIndicator.style.fill = "#dc3545";
break;
case "checking":
this.connectionIndicator.style.fill = "#ffc107";
break;
case "connected":
this.connectionIndicator.style.fill = "#28a745";
break;
case "closed":
case "failed":
console.log("[WebRTC] Connection closed, restarting...");
/*peerConnection.close();
peerConnection = null;
setTimeout(startPeerConnection, 1000);*/
break;
}
}
/**
* On new track, add it to the player
* @param {Event} event
*/
_onTrack(event) {
console.log(`[WebRTC] New ${event.track.kind} track`);
if (event.track.kind === "video") {
this.viewer.srcObject = event.streams[0];
}
}
/**
* Create an offer and set local description.
* After that the browser will fire onicecandidate events.
*/
createOffer() {
this.pc.createOffer().then(offer => {
this.pc.setLocalDescription(offer);
console.log("[WebRTC] WebRTC offer created");
}).catch(console.log);
}
/**
* Register a function to call to send local descriptions
* @param {Function} sendFunction Called with a local description to send.
*/
onICECandidate(sendFunction) {
// When candidate is null, ICE layer has run out of potential configurations to suggest
// so let's send the offer to the server.
// FIXME: Send offers progressively to do Trickle ICE
this.pc.onicecandidate = event => {
if (event.candidate === null) {
// Send offer to server
console.log("[WebRTC] Sending session description to server");
sendFunction(this.pc.localDescription);
}
};
}
/**
* Set WebRTC remote description
* After that, the connection will be established and ontrack will be fired.
* @param {RTCSessionDescription} sdp Session description data
*/
setRemoteDescription(sdp) {
this.pc.setRemoteDescription(sdp);
}
}

View File

@ -0,0 +1,62 @@
/**
* GsWebSocket to do Ghostream signalling
*/
export class GsWebSocket {
constructor() {
const protocol = (window.location.protocol === "https:") ? "wss://" : "ws://";
this.url = protocol + window.location.host + "/_ws/";
// Open WebSocket
this._open();
// Configure events
this.socket.addEventListener("open", () => {
console.log("[WebSocket] Connection established");
});
this.socket.addEventListener("close", () => {
console.log("[WebSocket] Connection closed, retrying connection in 1s...");
setTimeout(() => this._open(), 1000);
});
this.socket.addEventListener("error", () => {
console.log("[WebSocket] Connection errored, retrying connection in 1s...");
setTimeout(() => this._open(), 1000);
});
}
_open() {
console.log(`[WebSocket] Connecting to ${this.url}...`);
this.socket = new WebSocket(this.url);
}
/**
* Send local WebRTC session description to remote.
* @param {SessionDescription} localDescription WebRTC local SDP
* @param {string} stream Name of the stream
* @param {string} quality Requested quality
*/
sendLocalDescription(localDescription, stream, quality) {
if (this.socket.readyState !== 1) {
console.log("[WebSocket] Waiting for connection to send data...");
setTimeout(() => this.sendLocalDescription(localDescription, stream, quality), 100);
return;
}
console.log(`[WebSocket] Sending WebRTC local session description for stream ${stream} quality ${quality}`);
this.socket.send(JSON.stringify({
"webRtcSdp": localDescription,
"stream": stream,
"quality": quality
}));
}
/**
* Set callback function on new remote session description.
* @param {Function} callback Function called when data is received
*/
onRemoteDescription(callback) {
this.socket.addEventListener("message", (event) => {
console.log("[WebSocket] Received WebRTC remote session description");
const sdp = new RTCSessionDescription(JSON.parse(event.data));
callback(sdp);
});
}
}

View File

@ -1,12 +0,0 @@
// Side widget toggler
const sideWidgetToggle = document.getElementById("sideWidgetToggle")
sideWidgetToggle.addEventListener("click", function () {
const sideWidget = document.getElementById("sideWidget")
if (sideWidget.style.display === "none") {
sideWidget.style.display = "block"
sideWidgetToggle.textContent = "»"
} else {
sideWidget.style.display = "none"
sideWidgetToggle.textContent = "«"
}
})

View File

@ -1,9 +0,0 @@
document.getElementById("quality").addEventListener("change", (event) => {
console.log(`Stream quality changed to ${event.target.value}`)
// Restart the connection with a new quality
peerConnection.close()
peerConnection = null
streamPath = window.location.href + event.target.value
startPeerConnection()
})

View File

@ -1,97 +1,87 @@
let peerConnection import { GsWebSocket } from "./modules/websocket.js";
let streamPath = window.location.href import { ViewerCounter } from "./modules/viewerCounter.js";
import { GsWebRTC } from "./modules/webrtc.js";
startPeerConnection = () => { /**
// Init peer connection * Initialize viewer page
peerConnection = new RTCPeerConnection({ *
iceServers: [{ urls: stunServers }] * @param {String} stream
}) * @param {List} stunServers
* @param {Number} viewersCounterRefreshPeriod
*/
export function initViewerPage(stream, stunServers, viewersCounterRefreshPeriod) {
// Viewer element
const viewer = document.getElementById("viewer");
// On connection change, change indicator color // Default quality
// if connection failed, restart peer connection let quality = "source";
peerConnection.oniceconnectionstatechange = e => {
console.log("ICE connection state changed, " + peerConnection.iceConnectionState)
switch (peerConnection.iceConnectionState) {
case "disconnected":
document.getElementById("connectionIndicator").style.fill = "#dc3545"
break
case "checking":
document.getElementById("connectionIndicator").style.fill = "#ffc107"
break
case "connected":
document.getElementById("connectionIndicator").style.fill = "#28a745"
break
case "closed":
case "failed":
console.log("Connection failed, restarting...")
peerConnection.close()
peerConnection = null
setTimeout(startPeerConnection, 1000)
break
}
}
// We want to receive audio and video // Create WebSocket and WebRTC
peerConnection.addTransceiver('video', { 'direction': 'sendrecv' }) const websocket = new GsWebSocket();
peerConnection.addTransceiver('audio', { 'direction': 'sendrecv' }) const webrtc = new GsWebRTC(
stunServers,
viewer,
document.getElementById("connectionIndicator"),
);
webrtc.createOffer();
webrtc.onICECandidate(localDescription => {
websocket.sendLocalDescription(localDescription, stream, quality);
});
websocket.onRemoteDescription(sdp => {
webrtc.setRemoteDescription(sdp);
});
// Create offer and set local description // Register keyboard events
peerConnection.createOffer().then(offer => { window.addEventListener("keydown", (event) => {
// After setLocalDescription, the browser will fire onicecandidate events switch (event.key) {
peerConnection.setLocalDescription(offer) case "f":
}).catch(console.log)
// When candidate is null, ICE layer has run out of potential configurations to suggest
// so let's send the offer to the server
peerConnection.onicecandidate = event => {
if (event.candidate === null) {
// Send offer to server
// The server know the stream name from the url
// The server replies with its description
// After setRemoteDescription, the browser will fire ontrack events
console.log("Sending session description to server")
fetch(streamPath, {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify(peerConnection.localDescription)
})
.then(response => response.json())
.then((data) => peerConnection.setRemoteDescription(new RTCSessionDescription(data)))
.catch(console.log)
}
}
// When video track is received, configure player
peerConnection.ontrack = function (event) {
console.log(`New ${event.track.kind} track`)
if (event.track.kind === "video") {
const viewer = document.getElementById('viewer')
viewer.srcObject = event.streams[0]
}
}
}
// Register keyboard events
let viewer = document.getElementById("viewer")
window.addEventListener("keydown", (event) => {
switch (event.key) {
case 'f':
// F key put player in fullscreen // F key put player in fullscreen
if (document.fullscreenElement !== null) { if (document.fullscreenElement !== null) {
document.exitFullscreen() document.exitFullscreen();
} else { } else {
viewer.requestFullscreen() viewer.requestFullscreen();
} }
break break;
case 'm': case "m":
case ' ': case " ":
// M and space key mute player // M and space key mute player
viewer.muted = !viewer.muted viewer.muted = !viewer.muted;
event.preventDefault() event.preventDefault();
viewer.play() viewer.play();
break break;
}
});
// Create viewer counter
const viewerCounter = new ViewerCounter(
document.getElementById("connected-people"),
stream,
);
viewerCounter.regularUpdate(viewersCounterRefreshPeriod);
viewerCounter.refreshViewersCounter();
// Side widget toggler
const sideWidgetToggle = document.getElementById("sideWidgetToggle");
const sideWidget = document.getElementById("sideWidget");
if (sideWidgetToggle !== null && sideWidget !== null) {
// On click, toggle side widget visibility
sideWidgetToggle.addEventListener("click", function () {
if (sideWidget.style.display === "none") {
sideWidget.style.display = "block";
sideWidgetToggle.textContent = "»";
} else {
sideWidget.style.display = "none";
sideWidgetToggle.textContent = "«";
}
});
} }
})
// Video quality toggler
document.getElementById("quality").addEventListener("change", (event) => {
quality = event.target.value;
console.log(`Stream quality changed to ${quality}`);
// Restart WebRTC negociation
webrtc.createOffer();
});
}

View File

@ -1,12 +0,0 @@
// Refresh viewer count by pulling metric from server
function refreshViewersCounter(streamID, period) {
// Distinguish oneDomainPerStream mode
fetch("/_stats/" + streamID)
.then(response => response.json())
.then((data) => document.getElementById("connected-people").innerText = data.ConnectedViewers)
.catch(console.log)
setTimeout(() => {
refreshViewersCounter(streamID, period)
}, period)
}

View File

@ -4,6 +4,7 @@
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>{{if .Path}}{{.Path}} - {{end}}{{.Cfg.Name}}</title> <title>{{if .Path}}{{.Path}} - {{end}}{{.Cfg.Name}}</title>
<link rel="stylesheet" href="static/css/style.css"> <link rel="stylesheet" href="static/css/style.css">
<link rel="stylesheet" href="static/css/player.css"> <link rel="stylesheet" href="static/css/player.css">

View File

@ -21,11 +21,7 @@
<ul> <ul>
<li> <li>
<b>Serveur :</b> <b>Serveur :</b>
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}</code>, <code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?IDENTIFIANT:MOT_DE_PASS</code>,
</li>
<li>
<b>Clé de stream :</b>
<code>IDENTIFIANT:MOT_DE_PASSE</code>
avec <code>IDENTIFIANT</code> et <code>MOT_DE_PASSE</code> avec <code>IDENTIFIANT</code> et <code>MOT_DE_PASSE</code>
vos identifiants. vos identifiants.
</li> </li>
@ -52,6 +48,81 @@
</code> </code>
</p> </p>
<h2>Comment lire un flux depuis un lecteur externe ?</h2>
<p>
À l'heure actuelle, la plupart des lecteurs vidéos ne supportent
pas le protocole SRT, ou le supportent mal. Un travail est en
cours pour les rendre un maximum compatibles. Liste non exhaustive
des lecteurs vidéos testés :
</p>
<h3>FFPlay</h3>
<p>
Si FFMpeg est installé sur votre machine, il est accompagné d'un
lecteur vidéo nommé <code>ffplay</code>. Si FFMpeg est compilé
avec le support de SRT (c'est le cas sur la plupart des distributions,
sauf cas ci-dessous), il vous suffira d'exécuter :
</p>
<p>
<code>
ffplay -fflags nobuffer srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>MPV</h3>
<p>
MPV supporte officiellement SRT depuis le 16 octobre 2020.
Néanmoins, la version stable de MPV est beaucoup plus vieille.
Vous devez alors utiliser une version de développement pour
pouvoir lire un flux SRT depuis MPV. L'installation se fait
depuis <a href="https://mpv.io/installation/"> cette page</a>.
Sous Arch Linux, il vous suffit de récupérer le paquet
<code>mpv-git</code> dans l'AUR. Pour lire le flux, il suffit
d'exécuter :
</p>
<p>
<code>
mpv srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>VLC Media Player</h3>
<p>
Bien que VLC supporte officiellement le protocole SRT,
toutes les options ne sont pas encore implémentées,
notamment l'option pour choisir son stream.
<a href="https://patches.videolan.org/patch/30299/">Un patch</a>
a été soumis et est en attente d'acceptation.
Une fois le patch accepté, il sera appliqué dans les versions
de développement de VLC. Sous Arch Linux, il suffit de récupérer
le paquet <code>vlc-git</code> de l'AUR. Avec un VLC à jour,
il suffit d'exécuter :
</p>
<p>
<code>
vlc srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<p>
Ou bien d'aller dans Média -> Ouvrir un flux réseau et d'entrer l'URL
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT</code>.
</p>
<h3>Le protocole n'existe pas ou n'est pas supporté.</h3>
<p>
La technologie SRT est très récente et n'est pas supportée par
les dépôts stables. Ainsi, si vous avez Ubuntu &le; 20.04 ou
Debian &le; Buster, vous ne pourrez pas utiliser de lecteur vidéo
ni même diffuser avec votre machine. Vous devrez vous mettre à
jour vers Ubuntu 20.10 ou Debian Bullseye.
</p>
<h2>Mentions légales</h2> <h2>Mentions légales</h2>
<p> <p>
Le service de diffusion vidéo du Crans est un service d'hébergement Le service de diffusion vidéo du Crans est un service d'hébergement

View File

@ -8,10 +8,10 @@
<div class="controls"> <div class="controls">
<span class="control-quality"> <span class="control-quality">
<select id="quality"> <select id="quality">
<option value="">Source</option> <option value="source">Source</option>
<option value="@720p">720p</option> <option value="720p">720p</option>
<option value="@480p">480p</option> <option value="480p">480p</option>
<option value="@240p">240p</option> <option value="240p">240p</option>
</select> </select>
</span> </span>
<code class="control-srt-link">srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid={{.Path}}</code> <code class="control-srt-link">srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid={{.Path}}</code>
@ -34,21 +34,17 @@
{{end}} {{end}}
</div> </div>
{{if .WidgetURL}}<script src="/static/js/sideWidget.js"></script>{{end}} <script type="module">
<script src="/static/js/videoQuality.js"></script> import { initViewerPage } from "/static/js/viewer.js";
<script src="/static/js/viewer.js"></script>
<script src="/static/js/viewersCounter.js"></script> // Some variables that need to be fixed by web page
<script> const viewersCounterRefreshPeriod = Number("{{.Cfg.ViewersCounterRefreshPeriod}}");
const stream = "{{.Path}}";
const stunServers = [ const stunServers = [
{{range $id, $value := .Cfg.STUNServers}} {{range $id, $value := .Cfg.STUNServers}}
'{{$value}}', "{{$value}}",
{{end}} {{end}}
] ]
startPeerConnection() initViewerPage(stream, stunServers, viewersCounterRefreshPeriod)
// Wait a bit before pulling viewers counter for the first time
setTimeout(() => {
refreshViewersCounter("{{.Path}}", {{.Cfg.ViewersCounterRefreshPeriod}})
}, 1000)
</script> </script>
{{end}} {{end}}

View File

@ -10,8 +10,7 @@ import (
"strings" "strings"
"github.com/markbates/pkger" "github.com/markbates/pkger"
"github.com/pion/webrtc/v3" "gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
) )
// Options holds web package configuration // Options holds web package configuration
@ -33,18 +32,11 @@ type Options struct {
var ( var (
cfg *Options cfg *Options
// WebRTC session description channels
remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}
localSdpChan chan webrtc.SessionDescription
// Preload templates // Preload templates
templates *template.Template templates *template.Template
// Streams to get statistics // Streams to get statistics
streams map[string]*stream.Stream streams *messaging.Streams
) )
// Load templates with pkger // Load templates with pkger
@ -78,13 +70,8 @@ func loadTemplates() error {
} }
// Serve HTTP server // Serve HTTP server
func Serve(s map[string]*stream.Stream, rSdpChan chan struct { func Serve(s *messaging.Streams, c *Options) {
StreamID string
RemoteDescription webrtc.SessionDescription
}, lSdpChan chan webrtc.SessionDescription, c *Options) {
streams = s streams = s
remoteSdpChan = rSdpChan
localSdpChan = lSdpChan
cfg = c cfg = c
if !cfg.Enabled { if !cfg.Enabled {
@ -101,6 +88,7 @@ func Serve(s map[string]*stream.Stream, rSdpChan chan struct {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/", viewerHandler) mux.HandleFunc("/", viewerHandler)
mux.Handle("/static/", staticHandler()) mux.Handle("/static/", staticHandler())
mux.HandleFunc("/_ws/", websocketHandler)
mux.HandleFunc("/_stats/", statisticsHandler) mux.HandleFunc("/_stats/", statisticsHandler)
log.Printf("HTTP server listening on %s", cfg.ListenAddress) log.Printf("HTTP server listening on %s", cfg.ListenAddress)
log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux)) log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux))

View File

@ -5,16 +5,16 @@ import (
"testing" "testing"
"time" "time"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// TestHTTPServe tries to serve a real HTTP server and load some pages // TestHTTPServe tries to serve a real HTTP server and load some pages
func TestHTTPServe(t *testing.T) { func TestHTTPServe(t *testing.T) {
// Init streams messaging // Init streams messaging
streams := make(map[string]*stream.Stream) streams := messaging.New()
// Create a disabled web server // Create a disabled web server
go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) go Serve(streams, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
@ -26,7 +26,7 @@ func TestHTTPServe(t *testing.T) {
} }
// Now let's really start the web server // Now let's really start the web server
go Serve(streams, nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"}) go Serve(streams, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)

67
web/websocket_handler.go Normal file
View File

@ -0,0 +1,67 @@
// Package web serves the JavaScript player and WebRTC negotiation
package web
import (
"log"
"net/http"
"github.com/gorilla/websocket"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// clientDescription is sent by new client
type clientDescription struct {
WebRtcSdp webrtc.SessionDescription
Stream string
Quality string
}
// websocketHandler exchanges WebRTC SDP and viewer count
func websocketHandler(w http.ResponseWriter, r *http.Request) {
// Upgrade client connection to WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Failed to upgrade client to websocket: %s", err)
return
}
for {
// Get client description
c := &clientDescription{}
err = conn.ReadJSON(c)
if err != nil {
log.Printf("Failed to receive client description: %s", err)
continue
}
// Get requested stream
stream, err := streams.Get(c.Stream)
if err != nil {
log.Printf("Stream not found: %s", c.Stream)
continue
}
// Get requested quality
q, err := stream.GetQuality(c.Quality)
if err != nil {
log.Printf("Quality not found: %s", c.Quality)
continue
}
// Exchange session descriptions with WebRTC stream server
// FIXME: Add trickle ICE support
q.WebRtcRemoteSdp <- c.WebRtcSdp
localDescription := <-q.WebRtcLocalSdp
// Send new local description
if err := conn.WriteJSON(localDescription); err != nil {
log.Println(err)
continue
}
}
}