mirror of
				https://gitlab.crans.org/nounous/ghostream.git
				synced 2025-11-04 15:42:26 +01:00 
			
		
		
		
	Update package srt with Quality structure
This commit is contained in:
		@@ -25,13 +25,25 @@ func newStream() (s *Stream) {
 | 
			
		||||
	return s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close stream.
 | 
			
		||||
func (s *Stream) Close() {
 | 
			
		||||
	for quality := range s.qualities {
 | 
			
		||||
		s.DeleteQuality(quality)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CreateQuality creates a new quality associated with this stream.
 | 
			
		||||
func (s *Stream) CreateQuality(name string) (quality *Quality) {
 | 
			
		||||
func (s *Stream) CreateQuality(name string) (quality *Quality, err error) {
 | 
			
		||||
	// If quality already exist, fail
 | 
			
		||||
	if _, ok := s.qualities[name]; ok {
 | 
			
		||||
		return nil, errors.New("quality already exists")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.lockQualities.Lock()
 | 
			
		||||
	quality = newQuality()
 | 
			
		||||
	s.qualities[name] = quality
 | 
			
		||||
	s.lockQualities.Unlock()
 | 
			
		||||
	return quality
 | 
			
		||||
	return quality, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeleteQuality removes a stream quality.
 | 
			
		||||
 
 | 
			
		||||
@@ -91,6 +91,7 @@ func (l *Streams) Delete(name string) {
 | 
			
		||||
	// Make sure we did not already delete this stream
 | 
			
		||||
	l.lockStreams.Lock()
 | 
			
		||||
	if _, ok := l.streams[name]; ok {
 | 
			
		||||
		l.streams[name].Close()
 | 
			
		||||
		delete(l.streams, name)
 | 
			
		||||
	}
 | 
			
		||||
	l.lockStreams.Unlock()
 | 
			
		||||
 
 | 
			
		||||
@@ -22,7 +22,10 @@ func TestWithOneStream(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Create a quality
 | 
			
		||||
	quality := stream.CreateQuality("source")
 | 
			
		||||
	quality, err := stream.CreateQuality("source")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("Failed to create quality")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Register one output
 | 
			
		||||
	output := make(chan []byte, 64)
 | 
			
		||||
 
 | 
			
		||||
@@ -5,21 +5,26 @@ import (
 | 
			
		||||
	"log"
 | 
			
		||||
 | 
			
		||||
	"github.com/haivision/srtgo"
 | 
			
		||||
	"gitlab.crans.org/nounous/ghostream/stream"
 | 
			
		||||
	"gitlab.crans.org/nounous/ghostream/messaging"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
 | 
			
		||||
	// Check stream does not exist
 | 
			
		||||
	if _, ok := streams[name]; ok {
 | 
			
		||||
		log.Print("Stream already exists, refusing new streamer")
 | 
			
		||||
func handleStreamer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) {
 | 
			
		||||
	// Create stream
 | 
			
		||||
	stream, err := streams.Create(name)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Printf("Error on stream creating: %s", err)
 | 
			
		||||
		socket.Close()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Create stream
 | 
			
		||||
	log.Printf("New SRT streamer for stream %s", name)
 | 
			
		||||
	st := stream.New()
 | 
			
		||||
	streams[name] = st
 | 
			
		||||
	// Create source quality
 | 
			
		||||
	q, err := stream.CreateQuality("source")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Printf("Error on quality creating: %s", err)
 | 
			
		||||
		socket.Close()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	log.Printf("New SRT streamer for stream '%s' quality 'source'", name)
 | 
			
		||||
 | 
			
		||||
	// Read RTP packets forever and send them to the WebRTC Client
 | 
			
		||||
	for {
 | 
			
		||||
@@ -42,29 +47,38 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream,
 | 
			
		||||
 | 
			
		||||
		// Send raw data to other streams
 | 
			
		||||
		buff = buff[:n]
 | 
			
		||||
		st.Broadcast <- buff
 | 
			
		||||
		q.Broadcast <- buff
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Close stream
 | 
			
		||||
	st.Close()
 | 
			
		||||
	streams.Delete(name)
 | 
			
		||||
	socket.Close()
 | 
			
		||||
	delete(streams, name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
 | 
			
		||||
	log.Printf("New SRT viewer for stream %s", name)
 | 
			
		||||
 | 
			
		||||
func handleViewer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) {
 | 
			
		||||
	// Get requested stream
 | 
			
		||||
	st, ok := streams[name]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		log.Println("Stream does not exist, refusing new viewer")
 | 
			
		||||
	stream, err := streams.Get(name)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Printf("Failed to get stream: %s", err)
 | 
			
		||||
		socket.Close()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Get requested quality
 | 
			
		||||
	// FIXME: make qualities available
 | 
			
		||||
	qualityName := "source"
 | 
			
		||||
	q, err := stream.GetQuality(qualityName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Printf("Failed to get quality: %s", err)
 | 
			
		||||
		socket.Close()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	log.Printf("New SRT viewer for stream %s quality %s", name, qualityName)
 | 
			
		||||
 | 
			
		||||
	// Register new output
 | 
			
		||||
	c := make(chan []byte, 1024)
 | 
			
		||||
	st.Register(c)
 | 
			
		||||
	st.IncrementClientCount()
 | 
			
		||||
	q.Register(c)
 | 
			
		||||
	stream.IncrementClientCount()
 | 
			
		||||
 | 
			
		||||
	// Receive data and send them
 | 
			
		||||
	for data := range c {
 | 
			
		||||
@@ -74,7 +88,7 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name st
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Send data
 | 
			
		||||
		_, err := s.Write(data, 1000)
 | 
			
		||||
		_, err := socket.Write(data, 1000)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Printf("Remove SRT viewer because of sending error, %s", err)
 | 
			
		||||
			break
 | 
			
		||||
@@ -82,7 +96,7 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name st
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Close output
 | 
			
		||||
	st.Unregister(c)
 | 
			
		||||
	st.DecrementClientCount()
 | 
			
		||||
	s.Close()
 | 
			
		||||
	q.Unregister(c)
 | 
			
		||||
	stream.DecrementClientCount()
 | 
			
		||||
	socket.Close()
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -12,7 +12,7 @@ import (
 | 
			
		||||
 | 
			
		||||
	"github.com/haivision/srtgo"
 | 
			
		||||
	"gitlab.crans.org/nounous/ghostream/auth"
 | 
			
		||||
	"gitlab.crans.org/nounous/ghostream/stream"
 | 
			
		||||
	"gitlab.crans.org/nounous/ghostream/messaging"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Options holds web package configuration
 | 
			
		||||
@@ -39,7 +39,7 @@ func splitHostPort(hostport string) (string, uint16, error) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Serve SRT server
 | 
			
		||||
func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Options) {
 | 
			
		||||
func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
 | 
			
		||||
	if !cfg.Enabled {
 | 
			
		||||
		// SRT is not enabled, ignore
 | 
			
		||||
		return
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,7 @@ import (
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"gitlab.crans.org/nounous/ghostream/stream"
 | 
			
		||||
	"gitlab.crans.org/nounous/ghostream/messaging"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à
 | 
			
		||||
@@ -58,7 +58,7 @@ func TestServeSRT(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Init streams messaging and SRT server
 | 
			
		||||
	streams := make(map[string]*stream.Stream)
 | 
			
		||||
	streams := messaging.New()
 | 
			
		||||
	go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2})
 | 
			
		||||
 | 
			
		||||
	ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user