mirror of
				https://gitlab.crans.org/nounous/ghostream.git
				synced 2025-11-04 15:42:26 +01:00 
			
		
		
		
	Fix the forwarding package with the new messaging package
This commit is contained in:
		@@ -2,7 +2,9 @@
 | 
			
		||||
package forwarding
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os/exec"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"gitlab.crans.org/nounous/ghostream/stream"
 | 
			
		||||
@@ -40,107 +42,59 @@ func Serve(streams map[string]*stream.Stream, cfg Options) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Start a FFMPEG instance and redirect stream output to forwarded streams
 | 
			
		||||
func forward(st *stream.Stream, fwdCfg []string) {
 | 
			
		||||
	// FIXME
 | 
			
		||||
	/*ffmpegInstances := make(map[string]*exec.Cmd)
 | 
			
		||||
	ffmpegInputStreams := make(map[string]*io.WriteCloser)
 | 
			
		||||
	for {
 | 
			
		||||
		var err error = nil
 | 
			
		||||
		// Wait for packets
 | 
			
		||||
		// FIXME packet := <-inputChannel
 | 
			
		||||
		packet := srt.Packet{
 | 
			
		||||
			Data:       []byte{},
 | 
			
		||||
			PacketType: "nothing",
 | 
			
		||||
			StreamName: "demo",
 | 
			
		||||
		}
 | 
			
		||||
		switch packet.PacketType {
 | 
			
		||||
		case "register":
 | 
			
		||||
			err = registerStream(packet.StreamName, ffmpegInstances, ffmpegInputStreams, cfg)
 | 
			
		||||
			break
 | 
			
		||||
		case "sendData":
 | 
			
		||||
			err = sendPacket(packet.StreamName, ffmpegInputStreams, packet.Data)
 | 
			
		||||
			break
 | 
			
		||||
		case "close":
 | 
			
		||||
			err = close(packet.StreamName, ffmpegInstances, ffmpegInputStreams)
 | 
			
		||||
			break
 | 
			
		||||
		default:
 | 
			
		||||
			log.Println("Unknown SRT packet type:", packet.PacketType)
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Printf("Error occurred while receiving SRT packet of type %s: %s", packet.PacketType, err)
 | 
			
		||||
		}
 | 
			
		||||
	}*/
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
// registerStream creates ffmpeg instance associated with newly created stream
 | 
			
		||||
func registerStream(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams map[string]*io.WriteCloser, cfg Options) error {
 | 
			
		||||
	streams, exist := cfg[name]
 | 
			
		||||
	if !exist || len(streams) == 0 {
 | 
			
		||||
		// Nothing to do, not configured
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	output := make(chan []byte, 1024)
 | 
			
		||||
	st.Register(output)
 | 
			
		||||
 | 
			
		||||
	// Launch FFMPEG instance
 | 
			
		||||
	params := []string{"-hide_banner", "-loglevel", "error", "-re", "-i", "pipe:0"}
 | 
			
		||||
	for _, stream := range streams {
 | 
			
		||||
	for _, url := range fwdCfg {
 | 
			
		||||
		params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency",
 | 
			
		||||
			"-c", "copy", stream)
 | 
			
		||||
			"-c", "copy", url)
 | 
			
		||||
	}
 | 
			
		||||
	ffmpeg := exec.Command("ffmpeg", params...)
 | 
			
		||||
 | 
			
		||||
	// Open pipes
 | 
			
		||||
	input, err := ffmpeg.StdinPipe()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
		log.Printf("Error while opening forwarding ffmpeg input pipe: %s", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	errOutput, err := ffmpeg.StderrPipe()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
		log.Printf("Error while opening forwarding ffmpeg output pipe: %s", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	ffmpegInstances[name] = ffmpeg
 | 
			
		||||
	ffmpegInputStreams[name] = &input
 | 
			
		||||
 | 
			
		||||
	// Start FFMpeg
 | 
			
		||||
	if err := ffmpeg.Start(); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
		log.Printf("Error while starting forwarding ffmpeg instance: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Kill FFMPEG when stream is ended
 | 
			
		||||
	defer func() {
 | 
			
		||||
		_ = input.Close()
 | 
			
		||||
		_ = errOutput.Close()
 | 
			
		||||
		_ = ffmpeg.Process.Kill()
 | 
			
		||||
		st.Unregister(output)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Log standard error output
 | 
			
		||||
	go func() {
 | 
			
		||||
		scanner := bufio.NewScanner(errOutput)
 | 
			
		||||
		for scanner.Scan() {
 | 
			
		||||
			log.Printf("[FFMPEG %s] %s", name, scanner.Text())
 | 
			
		||||
			log.Printf("[FORWARDING FFMPEG] %s", scanner.Text())
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
	// Read stream output and redirect immediately to ffmpeg
 | 
			
		||||
	for data := range output {
 | 
			
		||||
		_, err := input.Write(data)
 | 
			
		||||
 | 
			
		||||
// sendPacket forwards data to the ffmpeg instance related to the stream name
 | 
			
		||||
func sendPacket(name string, ffmpegInputStreams map[string]*io.WriteCloser, data []byte) error {
 | 
			
		||||
	stdin := ffmpegInputStreams[name]
 | 
			
		||||
	if stdin == nil {
 | 
			
		||||
		// Don't need to forward stream
 | 
			
		||||
		return nil
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Printf("Error while writing to forwarded stream: %s", err)
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	_, err := (*stdin).Write(data)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// close ffmpeg instance associated with stream name
 | 
			
		||||
func close(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams map[string]*io.WriteCloser) error {
 | 
			
		||||
	ffmpeg := ffmpegInstances[name]
 | 
			
		||||
	if ffmpeg == nil {
 | 
			
		||||
		// No stream to close
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	if err := ffmpeg.Process.Kill(); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	delete(ffmpegInstances, name)
 | 
			
		||||
	delete(ffmpegInputStreams, name)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
*/
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user