diff options
author | Anton Khirnov <anton@khirnov.net> | 2012-06-02 07:26:41 +0200 |
---|---|---|
committer | Anton Khirnov <anton@khirnov.net> | 2012-06-10 08:12:23 +0200 |
commit | 5db5169e46a5f1676aafb82ec8c3f5dc6fb6bb6d (patch) | |
tree | 9664eb4606fa33694e438052328c9377154e5baf /avconv.c | |
parent | b0f0dfc4851da74e77d3d55bf83f3c8b8617e22a (diff) | |
download | ffmpeg-5db5169e46a5f1676aafb82ec8c3f5dc6fb6bb6d.tar.gz |
avconv: multithreaded demuxing.
When there are multiple input files, run demuxing for each input file in
a separate thread, so reading packets does not block.
This is useful for achieving low latency when reading from multiple
(possibly slow) input streams.
Diffstat (limited to 'avconv.c')
-rw-r--r-- | avconv.c | 157 |
1 files changed, 155 insertions, 2 deletions
@@ -69,6 +69,14 @@ #include <sys/select.h> #endif +#if HAVE_THREADS +#if HAVE_PTHREADS +#include <pthread.h> +#else +#include "libavcodec/w32pthreads.h" +#endif +#endif + #include <time.h> #include "cmdutils.h" @@ -140,6 +148,11 @@ static float dts_delta_threshold = 10; static int print_stats = 1; +#if HAVE_THREADS +/* signal to input threads that they should exit; set by the main thread */ +static int transcoding_finished; +#endif + #define DEFAULT_PASS_LOGFILENAME_PREFIX "av2pass" typedef struct InputFilter { @@ -219,6 +232,15 @@ typedef struct InputFile { int nb_streams; /* number of stream that avconv is aware of; may be different from ctx.nb_streams if new streams appear during av_read_frame() */ int rate_emu; + +#if HAVE_THREADS + pthread_t thread; /* thread reading from this file */ + int finished; /* the thread has exited */ + int joined; /* the thread has been joined */ + pthread_mutex_t fifo_lock; /* lock for access to fifo */ + pthread_cond_t fifo_cond; /* the main thread will signal on this cond after reading from fifo */ + AVFifoBuffer *fifo; /* demuxed packets are stored here; freed by the main thread */ +#endif } InputFile; typedef struct OutputStream { @@ -2765,6 +2787,125 @@ static int select_input_file(uint8_t *no_packet) return file_index; } +#if HAVE_THREADS +static void *input_thread(void *arg) +{ + InputFile *f = arg; + int ret = 0; + + while (!transcoding_finished && ret >= 0) { + AVPacket pkt; + ret = av_read_frame(f->ctx, &pkt); + + if (ret == AVERROR(EAGAIN)) { + usleep(10000); + ret = 0; + continue; + } else if (ret < 0) + break; + + pthread_mutex_lock(&f->fifo_lock); + while (!av_fifo_space(f->fifo)) + pthread_cond_wait(&f->fifo_cond, &f->fifo_lock); + + av_dup_packet(&pkt); + av_fifo_generic_write(f->fifo, &pkt, sizeof(pkt), NULL); + + pthread_mutex_unlock(&f->fifo_lock); + } + + f->finished = 1; + return NULL; +} + +static void free_input_threads(void) +{ + int i; + + if (nb_input_files == 1) + return; + + transcoding_finished = 1; + + for (i = 0; i < nb_input_files; i++) { + InputFile *f = input_files[i]; + AVPacket pkt; + + if (f->joined) + continue; + + pthread_mutex_lock(&f->fifo_lock); + while (av_fifo_size(f->fifo)) { + av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); + av_free_packet(&pkt); + } + pthread_cond_signal(&f->fifo_cond); + pthread_mutex_unlock(&f->fifo_lock); + + pthread_join(f->thread, NULL); + f->joined = 1; + + while (av_fifo_size(f->fifo)) { + av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); + av_free_packet(&pkt); + } + av_fifo_free(f->fifo); + } +} + +static int init_input_threads(void) +{ + int i, ret; + + if (nb_input_files == 1) + return 0; + + for (i = 0; i < nb_input_files; i++) { + InputFile *f = input_files[i]; + + if (!(f->fifo = av_fifo_alloc(8*sizeof(AVPacket)))) + return AVERROR(ENOMEM); + + pthread_mutex_init(&f->fifo_lock, NULL); + pthread_cond_init (&f->fifo_cond, NULL); + + if ((ret = pthread_create(&f->thread, NULL, input_thread, f))) + return AVERROR(ret); + } + return 0; +} + +static int get_input_packet_mt(InputFile *f, AVPacket *pkt) +{ + int ret = 0; + + pthread_mutex_lock(&f->fifo_lock); + + if (av_fifo_size(f->fifo)) { + av_fifo_generic_read(f->fifo, pkt, sizeof(*pkt), NULL); + pthread_cond_signal(&f->fifo_cond); + } else { + if (f->finished) + ret = AVERROR_EOF; + else + ret = AVERROR(EAGAIN); + } + + pthread_mutex_unlock(&f->fifo_lock); + + return ret; +} +#endif + +static int get_input_packet(InputFile *f, AVPacket *pkt) +{ +#if HAVE_THREADS + if (nb_input_files > 1) + return get_input_packet_mt(f, pkt); +#endif + return av_read_frame(f->ctx, pkt); +} + /* * The following code is the main loop of the file converter */ @@ -2790,6 +2931,11 @@ static int transcode(void) timer_start = av_gettime(); +#if HAVE_THREADS + if ((ret = init_input_threads()) < 0) + goto fail; +#endif + for (; received_sigterm == 0;) { int file_index, ist_index; AVPacket pkt; @@ -2810,12 +2956,13 @@ static int transcode(void) usleep(10000); continue; } + av_log(NULL, AV_LOG_VERBOSE, "No more inputs to read from, finishing.\n"); break; } - /* read a frame from it and output it in the fifo */ is = input_files[file_index]->ctx; - ret = av_read_frame(is, &pkt); + ret = get_input_packet(input_files[file_index], &pkt); + if (ret == AVERROR(EAGAIN)) { no_packet[file_index] = 1; no_packet_count++; @@ -2897,6 +3044,9 @@ static int transcode(void) /* dump report by using the output first video and audio streams */ print_report(0, timer_start); } +#if HAVE_THREADS + free_input_threads(); +#endif /* at the end of stream, we must flush the decoder buffers */ for (i = 0; i < nb_input_streams; i++) { @@ -2941,6 +3091,9 @@ static int transcode(void) fail: av_freep(&no_packet); +#if HAVE_THREADS + free_input_threads(); +#endif if (output_streams) { for (i = 0; i < nb_output_streams; i++) { |