diff options
author | Nicolas George <george@nsup.org> | 2014-02-20 14:14:53 +0100 |
---|---|---|
committer | Nicolas George <george@nsup.org> | 2014-05-26 11:40:21 +0200 |
commit | fc9c857c2d1b61dbbd104750ca1533c4a4658655 (patch) | |
tree | 643b1d75a132d076f11321a26d93d5c6b30be288 | |
parent | 55cc60cd6d9429270f5b3479c45a1abada72c05f (diff) | |
download | ffmpeg-fc9c857c2d1b61dbbd104750ca1533c4a4658655.tar.gz |
ffmpeg: use thread message API.
-rw-r--r-- | ffmpeg.c | 98 | ||||
-rw-r--r-- | ffmpeg.h | 6 |
2 files changed, 29 insertions, 75 deletions
@@ -59,6 +59,7 @@ #include "libavutil/timestamp.h" #include "libavutil/bprint.h" #include "libavutil/time.h" +#include "libavutil/threadmessage.h" #include "libavformat/os_support.h" #include "libavformat/ffm.h" // not public API @@ -132,11 +133,6 @@ AVIOContext *progress_avio = NULL; static uint8_t *subtitle_out; -#if HAVE_PTHREADS -/* signal to input threads that they should exit; set by the main thread */ -static int transcoding_finished; -#endif - #define DEFAULT_PASS_LOGFILENAME_PREFIX "ffmpeg2pass" InputStream **input_streams = NULL; @@ -3105,32 +3101,31 @@ static void *input_thread(void *arg) InputFile *f = arg; int ret = 0; - while (!transcoding_finished && ret >= 0) { + while (1) { AVPacket pkt; ret = av_read_frame(f->ctx, &pkt); if (ret == AVERROR(EAGAIN)) { av_usleep(10000); - ret = 0; continue; - } else if (ret < 0) + } + if (ret < 0) { + av_thread_message_queue_set_err_recv(f->in_thread_queue, ret); break; - - pthread_mutex_lock(&f->fifo_lock); - while (!av_fifo_space(f->fifo)) - pthread_cond_wait(&f->fifo_cond, &f->fifo_lock); - + } av_dup_packet(&pkt); - av_fifo_generic_write(f->fifo, &pkt, sizeof(pkt), NULL); - pthread_cond_signal(&f->fifo_cond); - - pthread_mutex_unlock(&f->fifo_lock); + ret = av_thread_message_queue_send(f->in_thread_queue, &pkt, 0); + if (ret < 0) { + if (ret != AVERROR_EOF) + av_log(f->ctx, AV_LOG_ERROR, + "Unable to send packet to main thread: %s\n", + av_err2str(ret)); + av_free_packet(&pkt); + av_thread_message_queue_set_err_recv(f->in_thread_queue, ret); + break; + } } - pthread_mutex_lock(&f->fifo_lock); - f->finished = 1; - pthread_cond_signal(&f->fifo_cond); - pthread_mutex_unlock(&f->fifo_lock); return NULL; } @@ -3138,34 +3133,19 @@ static void free_input_threads(void) { int i; - if (nb_input_files == 1) - return; - - transcoding_finished = 1; - for (i = 0; i < nb_input_files; i++) { InputFile *f = input_files[i]; AVPacket pkt; - if (!f->fifo || f->joined) + if (!f->in_thread_queue) continue; - - pthread_mutex_lock(&f->fifo_lock); - while (av_fifo_size(f->fifo)) { - av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); + av_thread_message_queue_set_err_send(f->in_thread_queue, AVERROR_EOF); + while (av_thread_message_queue_recv(f->in_thread_queue, &pkt, 0) >= 0) av_free_packet(&pkt); - } - pthread_cond_signal(&f->fifo_cond); - pthread_mutex_unlock(&f->fifo_lock); pthread_join(f->thread, NULL); f->joined = 1; - - while (av_fifo_size(f->fifo)) { - av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); - av_free_packet(&pkt); - } - av_fifo_freep(&f->fifo); + av_thread_message_queue_free(&f->in_thread_queue); } } @@ -3179,15 +3159,13 @@ static int init_input_threads(void) for (i = 0; i < nb_input_files; i++) { InputFile *f = input_files[i]; - if (!(f->fifo = av_fifo_alloc_array(8, sizeof(AVPacket)))) - return AVERROR(ENOMEM); - if (f->ctx->pb ? !f->ctx->pb->seekable : strcmp(f->ctx->iformat->name, "lavfi")) f->non_blocking = 1; - - pthread_mutex_init(&f->fifo_lock, NULL); - pthread_cond_init (&f->fifo_cond, NULL); + ret = av_thread_message_queue_alloc(&f->in_thread_queue, + 8, sizeof(AVPacket)); + if (ret < 0) + return ret; if ((ret = pthread_create(&f->thread, NULL, input_thread, f))) return AVERROR(ret); @@ -3197,31 +3175,9 @@ static int init_input_threads(void) static int get_input_packet_mt(InputFile *f, AVPacket *pkt) { - int ret = 0; - - pthread_mutex_lock(&f->fifo_lock); - - while (1) { - if (av_fifo_size(f->fifo)) { - av_fifo_generic_read(f->fifo, pkt, sizeof(*pkt), NULL); - pthread_cond_signal(&f->fifo_cond); - break; - } else { - if (f->finished) { - ret = AVERROR_EOF; - break; - } - if (f->non_blocking) { - ret = AVERROR(EAGAIN); - break; - } - pthread_cond_wait(&f->fifo_cond, &f->fifo_lock); - } - } - - pthread_mutex_unlock(&f->fifo_lock); - - return ret; + return av_thread_message_queue_recv(f->in_thread_queue, pkt, + f->non_blocking ? + AV_THREAD_MESSAGE_NONBLOCK : 0); } #endif @@ -44,6 +44,7 @@ #include "libavutil/fifo.h" #include "libavutil/pixfmt.h" #include "libavutil/rational.h" +#include "libavutil/threadmessage.h" #include "libswresample/swresample.h" @@ -336,13 +337,10 @@ typedef struct InputFile { int accurate_seek; #if HAVE_PTHREADS + AVThreadMessageQueue *in_thread_queue; pthread_t thread; /* thread reading from this file */ int non_blocking; /* reading packets from the thread should not block */ - int finished; /* the thread has exited */ int joined; /* the thread has been joined */ - pthread_mutex_t fifo_lock; /* lock for access to fifo */ - pthread_cond_t fifo_cond; /* the main thread will signal on this cond after reading from fifo */ - AVFifoBuffer *fifo; /* demuxed packets are stored here; freed by the main thread */ #endif } InputFile; |