Home > Blockchain >  python multiple threads redirecting stdout
python multiple threads redirecting stdout

Time:09-04

I'm building an icecast2 radio station which will restream existing stations in lower quality. This program will generate multiple FFmpeg processes restreaming 24/7. For troubleshooting purposes, I would like to have an output of every FFmpeg process redirected to the separate file.

import ffmpeg, csv
from threading import Thread

def run(name, mount, source):
icecast = "icecast://" ICECAST2_USER ":" ICECAST2_PASS "@localhost:" ICECAST2_PORT "/" mount
stream = (
        ffmpeg
        .input(source)
        .output(
            icecast,
            audio_bitrate=BITRATE, sample_rate=SAMPLE_RATE, format=FORMAT, acodec=CODEC,
            reconnect="1", reconnect_streamed="1", reconnect_at_eof="1", reconnect_delay_max="120",
            ice_name=name, ice_genre=source
            )
        )
return stream

with open('stations.csv', mode='r') as data:
for station in csv.DictReader(data):
    stream = run(station['name'], station['mount'], station['url'])
    thread = Thread(target=stream.run)
    thread.start()

As I understand I can't redirect stdout of each thread separately, I also can't use ffmpeg reporting which is only configured by an environment variable. Do I have any other options?

CodePudding user response:

You need to create a thread function of your own


def stream_runner(stream,id):
    # open a stream-specific log file to write to
    with open(f'stream_{id}.log','wt') as f:
        # block until ffmpeg is done
        sp.run(stream.compile(),stderr=f)

for i, station in enumerate(csv.DictReader(data)):
    stream = run(station['name'], station['mount'], station['url'])
    thread = Thread(target=stream_runner,args=(stream,i))
    thread.start()

Something like this should work.

CodePudding user response:

ffmpeg-python doesn't quite give you the tools to do this - you want to control one of the arguments to subprocess, stderr, but ffmpeg doesn't have an argument for this.

However, what ffmpeg-python does have, is the ability to show the command line arguments that it would have used. You can make your own call to subprocess after that.

You also don't need to use threads to do this - you can set up each ffmpeg subprocess, without waiting for it to complete, and check in on it each second. This example starts up two ffmpeg instances in parallel, and monitors each one by printing out the most recent line of output from each one every second, as well as tracking if they've exited.

I made two changes for testing:

  1. It gets the stations from a dictionary rather than a CSV file.
  2. It transcodes an MP4 file rather than an audio stream, since I don't have an icecast server. If you want to test it, it expects to have a file named 'sample.mp4' in the same directory.

Both should be pretty easy to change back.

import ffmpeg
import subprocess
import os
import time

stations = [
    {'name': 'foo1', 'input': 'sample.mp4', 'output': 'output.mp4'},
    {'name': 'foo2', 'input': 'sample.mp4', 'output': 'output2.mp4'},
]

class Transcoder():
    def __init__(self, arguments):
        self.arguments = arguments

    def run(self):
        stream = (
            ffmpeg
            .input(self.arguments['input'])
            .output(self.arguments['output'])
        )
        args = stream.compile(overwrite_output=True)
        with open(self.log_name(), 'ab') as logfile:
            self.subproc = subprocess.Popen(
                args,
                stdin=None,
                stdout=None,
                stderr=logfile,
            )
    
    def log_name(self):
        return self.arguments['name']   "-ffmpeg.log"

    def still_running(self):
        return self.subproc.poll() is None
    
    def last_log_line(self):
        with open(self.log_name(), 'rb') as f:
            try:  # catch OSError in case of a one line file 
                f.seek(-2, os.SEEK_END)
                while f.read(1) not in [b'\n', 'b\r']:
                    f.seek(-2, os.SEEK_CUR)
            except OSError:
                f.seek(0)
            last_line = f.readline().decode()
        last_line = last_line.split('\n')[-1]
        return last_line

    def name(self):
        return self.arguments['name']


transcoders = []
for station in stations:
    t = Transcoder(station)
    t.run()
    transcoders.append(t)

while True:
    for t in list(transcoders):
        if not t.still_running():
            print(f"{t.name()} has exited")
            transcoders.remove(t)
        print(t.name(), repr(t.last_log_line()))
    if len(transcoders) == 0:
        break
    time.sleep(1)
  • Related