diff options
author | Anton Khirnov <anton@khirnov.net> | 2022-03-31 17:33:48 +0200 |
---|---|---|
committer | Anton Khirnov <anton@khirnov.net> | 2022-07-23 11:53:19 +0200 |
commit | 2d924b3a630869c65fe0c76568910500f54ed057 (patch) | |
tree | 046e6484309205ca7715ec0a66e5b77732cf536b /fftools | |
parent | 37c764df6730e8299c468dd7636c45da6e158ef3 (diff) | |
download | ffmpeg-2d924b3a630869c65fe0c76568910500f54ed057.tar.gz |
fftools/ffmpeg: move each muxer to a separate thread
Diffstat (limited to 'fftools')
-rw-r--r-- | fftools/ffmpeg.c | 18 | ||||
-rw-r--r-- | fftools/ffmpeg.h | 7 | ||||
-rw-r--r-- | fftools/ffmpeg_mux.c | 299 | ||||
-rw-r--r-- | fftools/ffmpeg_opt.c | 4 |
4 files changed, 229 insertions, 99 deletions
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 4b651f9224..632ac25cb2 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -746,9 +746,6 @@ static void output_packet(OutputFile *of, AVPacket *pkt, goto mux_fail; } - if (eof) - ost->finished |= MUXER_FINISHED; - return; mux_fail: @@ -1532,7 +1529,7 @@ static void print_final_stats(int64_t total_size) enum AVMediaType type = ost->enc_ctx->codec_type; total_size += ost->data_size; - total_packets += ost->packets_written; + total_packets += atomic_load(&ost->packets_written); av_log(NULL, AV_LOG_VERBOSE, " Output stream #%d:%d (%s): ", i, j, av_get_media_type_string(type)); @@ -1545,7 +1542,7 @@ static void print_final_stats(int64_t total_size) } av_log(NULL, AV_LOG_VERBOSE, "%"PRIu64" packets muxed (%"PRIu64" bytes); ", - ost->packets_written, ost->data_size); + atomic_load(&ost->packets_written), ost->data_size); av_log(NULL, AV_LOG_VERBOSE, "\n"); } @@ -1613,7 +1610,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti } if (!vid && enc->codec_type == AVMEDIA_TYPE_VIDEO) { float fps; - uint64_t frame_number = ost->packets_written; + uint64_t frame_number = atomic_load(&ost->packets_written); fps = t > 1 ? frame_number / t : 0; av_bprintf(&buf, "frame=%5"PRId64" fps=%3.*f q=%3.1f ", @@ -3491,9 +3488,8 @@ static int need_output(void) for (i = 0; i < nb_output_streams; i++) { OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - if (ost->finished || of_finished(of)) + if (ost->finished) continue; return 1; @@ -4412,9 +4408,11 @@ static int transcode(void) /* close each encoder */ for (i = 0; i < nb_output_streams; i++) { + uint64_t packets_written; ost = output_streams[i]; - total_packets_written += ost->packets_written; - if (!ost->packets_written && (abort_on_flags & ABORT_ON_FLAG_EMPTY_OUTPUT_STREAM)) { + packets_written = atomic_load(&ost->packets_written); + total_packets_written += packets_written; + if (!packets_written && (abort_on_flags & ABORT_ON_FLAG_EMPTY_OUTPUT_STREAM)) { av_log(NULL, AV_LOG_FATAL, "Empty output on stream %d.\n", i); exit_program(1); } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index d12577e992..0c9498c23e 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -21,6 +21,7 @@ #include "config.h" +#include <stdatomic.h> #include <stdint.h> #include <stdio.h> #include <signal.h> @@ -557,7 +558,7 @@ typedef struct OutputStream { // combined size of all the packets written uint64_t data_size; // number of packets send to the muxer - uint64_t packets_written; + atomic_uint_least64_t packets_written; // number of frames/samples sent to the encoder uint64_t frames_encoded; uint64_t samples_encoded; @@ -699,14 +700,14 @@ int hw_device_setup_for_filter(FilterGraph *fg); int hwaccel_decode_init(AVCodecContext *avctx); int of_muxer_init(OutputFile *of, AVFormatContext *fc, - AVDictionary *opts, int64_t limit_filesize); + AVDictionary *opts, int64_t limit_filesize, + int thread_queue_size); /* open the muxer when all the streams are initialized */ int of_check_init(OutputFile *of); int of_write_trailer(OutputFile *of); void of_close(OutputFile **pof); int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost); -int of_finished(OutputFile *of); int64_t of_filesize(OutputFile *of); AVChapter * const * of_get_chapters(OutputFile *of, unsigned int *nb_chapters); diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 2abadd3f9b..df9cb73d0e 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -16,17 +16,21 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include <stdatomic.h> #include <stdio.h> #include <string.h> #include "ffmpeg.h" +#include "objpool.h" #include "sync_queue.h" +#include "thread_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" #include "libavutil/log.h" #include "libavutil/mem.h" #include "libavutil/timestamp.h" +#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -51,13 +55,18 @@ typedef struct MuxStream { struct Muxer { AVFormatContext *fc; + pthread_t thread; + ThreadQueue *tq; + MuxStream *streams; AVDictionary *opts; + int thread_queue_size; + /* filesize limit expressed in bytes */ int64_t limit_filesize; - int64_t final_filesize; + atomic_int_least64_t last_filesize; int header_written; AVPacket *sq_pkt; @@ -65,15 +74,6 @@ struct Muxer { static int want_sdp = 1; -static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream, OSTFinished others) -{ - int i; - for (i = 0; i < nb_output_streams; i++) { - OutputStream *ost2 = output_streams[i]; - ost2->finished |= ost == ost2 ? this_stream : others; - } -} - static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; @@ -116,13 +116,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) return 0; } +static int64_t filesize(AVIOContext *pb) +{ + int64_t ret = -1; + + if (pb) { + ret = avio_size(pb); + if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too + ret = avio_tell(pb); + } + + return ret; +} + static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; AVFormatContext *s = of->mux->fc; AVStream *st = ost->st; + int64_t fs; int ret; + fs = filesize(s->pb); + atomic_store(&of->mux->last_filesize, fs); + if (fs >= of->mux->limit_filesize) + return AVERROR_EOF; + if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->vsync_method == VSYNC_DROP) || (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0)) pkt->pts = pkt->dts = AV_NOPTS_VALUE; @@ -175,7 +194,7 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) ms->last_mux_dts = pkt->dts; ost->data_size += pkt->size; - ost->packets_written++; + atomic_fetch_add(&ost->packets_written, 1); pkt->stream_index = ost->index; @@ -193,66 +212,81 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) ret = av_interleaved_write_frame(s, pkt); if (ret < 0) { print_error("av_interleaved_write_frame()", ret); - main_return_code = 1; - close_all_output_streams(ost, MUXER_FINISHED | ENCODER_FINISHED, ENCODER_FINISHED); return ret; } return 0; } -static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) +static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt) { if (ost->sq_idx_mux >= 0) { int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt)); - if (ret < 0) { - if (pkt) - av_packet_unref(pkt); - if (ret == AVERROR_EOF) { - ost->finished |= MUXER_FINISHED; - return 0; - } else - return ret; - } + if (ret < 0) + return ret; while (1) { ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt)); - if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) - return 0; - else if (ret < 0) - return ret; + if (ret < 0) + return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret; ret = write_packet(of, output_streams[of->ost_index + ret], of->mux->sq_pkt); if (ret < 0) return ret; } - } else { - if (pkt) - return write_packet(of, ost, pkt); - - ost->finished |= MUXER_FINISHED; - } + } else if (pkt) + return write_packet(of, ost, pkt); return 0; } -int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) +static void *muxer_thread(void *arg) { - int ret; + OutputFile *of = arg; + Muxer *mux = of->mux; + AVPacket *pkt = NULL; + int ret = 0; + + pkt = av_packet_alloc(); + if (!pkt) { + ret = AVERROR(ENOMEM); + goto finish; + } - if (of->mux->header_written) { - return submit_packet(of, ost, pkt); - } else { - /* the muxer is not initialized yet, buffer the packet */ - ret = queue_packet(of, ost, pkt); - if (ret < 0) { - av_packet_unref(pkt); - return ret; + while (1) { + OutputStream *ost; + int stream_idx; + + ret = tq_receive(mux->tq, &stream_idx, pkt); + if (stream_idx < 0) { + av_log(NULL, AV_LOG_VERBOSE, + "All streams finished for output file #%d\n", of->index); + ret = 0; + break; + } + + ost = output_streams[of->ost_index + stream_idx]; + ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt); + av_packet_unref(pkt); + if (ret == AVERROR_EOF) + tq_receive_finish(mux->tq, stream_idx); + else if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Error muxing a packet for output file #%d\n", of->index); + break; } } - return 0; +finish: + av_packet_free(&pkt); + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) + tq_receive_finish(mux->tq, i); + + av_log(NULL, AV_LOG_VERBOSE, "Terminating muxer thread %d\n", of->index); + + return (void*)(intptr_t)ret; } static int print_sdp(void) @@ -303,11 +337,125 @@ static int print_sdp(void) av_freep(&sdp_filename); } + // SDP successfully written, allow muxer threads to start + ret = 1; + fail: av_freep(&avc); return ret; } +static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) +{ + Muxer *mux = of->mux; + int ret = 0; + + if (!pkt || ost->finished & MUXER_FINISHED) + goto finish; + + ret = tq_send(mux->tq, ost->index, pkt); + if (ret < 0) + goto finish; + + return 0; + +finish: + if (pkt) + av_packet_unref(pkt); + + ost->finished |= MUXER_FINISHED; + tq_send_finish(mux->tq, ost->index); + return ret == AVERROR_EOF ? 0 : ret; +} + +int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) +{ + int ret; + + if (of->mux->tq) { + return submit_packet(of, ost, pkt); + } else { + /* the muxer is not initialized yet, buffer the packet */ + ret = queue_packet(of, ost, pkt); + if (ret < 0) { + av_packet_unref(pkt); + return ret; + } + } + + return 0; +} + +static int thread_stop(OutputFile *of) +{ + Muxer *mux = of->mux; + void *ret; + + if (!mux || !mux->tq) + return 0; + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) + tq_send_finish(mux->tq, i); + + pthread_join(mux->thread, &ret); + + tq_free(&mux->tq); + + return (int)(intptr_t)ret; +} + +static void pkt_move(void *dst, void *src) +{ + av_packet_move_ref(dst, src); +} + +static int thread_start(OutputFile *of) +{ + Muxer *mux = of->mux; + AVFormatContext *fc = mux->fc; + ObjPool *op; + int ret; + + op = objpool_alloc_packets(); + if (!op) + return AVERROR(ENOMEM); + + mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move); + if (!mux->tq) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of); + if (ret) { + tq_free(&mux->tq); + return AVERROR(ret); + } + + /* flush the muxing queues */ + for (int i = 0; i < fc->nb_streams; i++) { + MuxStream *ms = &of->mux->streams[i]; + OutputStream *ost = output_streams[of->ost_index + i]; + AVPacket *pkt; + + /* try to improve muxing time_base (only possible if nothing has been written yet) */ + if (!av_fifo_can_read(ms->muxing_queue)) + ost->mux_timebase = ost->st->time_base; + + while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { + ret = submit_packet(of, ost, pkt); + if (pkt) { + ms->muxing_queue_data_size -= pkt->size; + av_packet_free(&pkt); + } + if (ret < 0) + return ret; + } + } + + return 0; +} + /* open the muxer when all the streams are initialized */ int of_check_init(OutputFile *of) { @@ -339,28 +487,19 @@ int of_check_init(OutputFile *of) if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); return ret; - } - } - - /* flush the muxing queues */ - for (i = 0; i < fc->nb_streams; i++) { - MuxStream *ms = &of->mux->streams[i]; - OutputStream *ost = output_streams[of->ost_index + i]; - AVPacket *pkt; - - /* try to improve muxing time_base (only possible if nothing has been written yet) */ - if (!av_fifo_can_read(ms->muxing_queue)) - ost->mux_timebase = ost->st->time_base; - - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { - ret = submit_packet(of, ost, pkt); - if (pkt) { - ms->muxing_queue_data_size -= pkt->size; - av_packet_free(&pkt); + } else if (ret == 1) { + /* SDP is written only after all the muxers are ready, so now we + * start ALL the threads */ + for (i = 0; i < nb_output_files; i++) { + ret = thread_start(output_files[i]); + if (ret < 0) + return ret; } - if (ret < 0) - return ret; } + } else { + ret = thread_start(of); + if (ret < 0) + return ret; } return 0; @@ -371,7 +510,7 @@ int of_write_trailer(OutputFile *of) AVFormatContext *fc = of->mux->fc; int ret; - if (!of->mux->header_written) { + if (!of->mux->tq) { av_log(NULL, AV_LOG_ERROR, "Nothing was written into output file %d (%s), because " "at least one of its streams received no packets.\n", @@ -379,13 +518,17 @@ int of_write_trailer(OutputFile *of) return AVERROR(EINVAL); } + ret = thread_stop(of); + if (ret < 0) + main_return_code = ret; + ret = av_write_trailer(fc); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error writing trailer of %s: %s\n", fc->url, av_err2str(ret)); return ret; } - of->mux->final_filesize = of_filesize(of); + of->mux->last_filesize = filesize(fc->pb); if (!(of->format->flags & AVFMT_NOFILE)) { ret = avio_closep(&fc->pb); @@ -448,6 +591,8 @@ void of_close(OutputFile **pof) if (!of) return; + thread_stop(of); + sq_free(&of->sq_encode); sq_free(&of->sq_mux); @@ -457,7 +602,8 @@ void of_close(OutputFile **pof) } int of_muxer_init(OutputFile *of, AVFormatContext *fc, - AVDictionary *opts, int64_t limit_filesize) + AVDictionary *opts, int64_t limit_filesize, + int thread_queue_size) { Muxer *mux = av_mallocz(sizeof(*mux)); int ret = 0; @@ -487,6 +633,7 @@ int of_muxer_init(OutputFile *of, AVFormatContext *fc, ms->last_mux_dts = AV_NOPTS_VALUE; } + mux->thread_queue_size = thread_queue_size > 0 ? thread_queue_size : 8; mux->limit_filesize = limit_filesize; mux->opts = opts; @@ -515,25 +662,9 @@ fail: return ret; } -int of_finished(OutputFile *of) -{ - return of_filesize(of) >= of->mux->limit_filesize; -} - int64_t of_filesize(OutputFile *of) { - AVIOContext *pb = of->mux->fc->pb; - int64_t ret = -1; - - if (of->mux->final_filesize) - ret = of->mux->final_filesize; - else if (pb) { - ret = avio_size(pb); - if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too - ret = avio_tell(pb); - } - - return ret; + return atomic_load(&of->mux->last_filesize); } AVChapter * const * diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index 01eedbeb34..8ac73c0efc 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -3116,7 +3116,7 @@ loop_end: of->nb_streams = oc->nb_streams; of->url = filename; - err = of_muxer_init(of, oc, format_opts, o->limit_filesize); + err = of_muxer_init(of, oc, format_opts, o->limit_filesize, o->thread_queue_size); if (err < 0) { av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n"); exit_program(1); @@ -3907,7 +3907,7 @@ const OptionDef options[] = { { "disposition", OPT_STRING | HAS_ARG | OPT_SPEC | OPT_OUTPUT, { .off = OFFSET(disposition) }, "disposition", "" }, - { "thread_queue_size", HAS_ARG | OPT_INT | OPT_OFFSET | OPT_EXPERT | OPT_INPUT, + { "thread_queue_size", HAS_ARG | OPT_INT | OPT_OFFSET | OPT_EXPERT | OPT_INPUT | OPT_OUTPUT, { .off = OFFSET(thread_queue_size) }, "set the maximum number of queued packets from the demuxer" }, { "find_stream_info", OPT_BOOL | OPT_PERFILE | OPT_INPUT | OPT_EXPERT, { &find_stream_info }, |