aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2023-07-18 16:37:52 +0200
committerAnton Khirnov <anton@khirnov.net>2023-12-12 08:24:18 +0100
commitd119ae2fd82a494d9430ff4d4fc262961a68c598 (patch)
tree0a9e5d9e4c852febe72d095d569875445ef05765
parent9b8cc36ce0b2417469d78c68780c8796886c202e (diff)
downloadffmpeg-d119ae2fd82a494d9430ff4d4fc262961a68c598.tar.gz
fftools/ffmpeg: convert to a threaded architecture
Change the main loop and every component (demuxers, decoders, filters, encoders, muxers) to use the previously added transcode scheduler. Every instance of every such component was already running in a separate thread, but now they can actually run in parallel. Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by JEEB to be more correct and deterministic.
-rw-r--r--Changelog2
-rw-r--r--fftools/ffmpeg.c374
-rw-r--r--fftools/ffmpeg.h97
-rw-r--r--fftools/ffmpeg_dec.c313
-rw-r--r--fftools/ffmpeg_demux.c268
-rw-r--r--fftools/ffmpeg_enc.c377
-rw-r--r--fftools/ffmpeg_filter.c714
-rw-r--r--fftools/ffmpeg_mux.c324
-rw-r--r--fftools/ffmpeg_mux.h24
-rw-r--r--fftools/ffmpeg_mux_init.c88
-rw-r--r--fftools/ffmpeg_opt.c6
-rw-r--r--tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat34
12 files changed, 600 insertions, 2021 deletions
diff --git a/Changelog b/Changelog
index f00bc27ca4..67ef92eb02 100644
--- a/Changelog
+++ b/Changelog
@@ -7,6 +7,8 @@ version <next>:
- EVC encoding using external library libxeve
- QOA decoder and demuxer
- aap filter
+- demuxing, decoding, filtering, encoding, and muxing in the
+ ffmpeg CLI now all run in parallel
version 6.1:
- libaribcaption decoder
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index b8a97258a0..30b594fd97 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -117,7 +117,7 @@ typedef struct BenchmarkTimeStamps {
static BenchmarkTimeStamps get_benchmark_time_stamps(void);
static int64_t getmaxrss(void);
-unsigned nb_output_dumped = 0;
+atomic_uint nb_output_dumped = 0;
static BenchmarkTimeStamps current_time;
AVIOContext *progress_avio = NULL;
@@ -138,30 +138,6 @@ static struct termios oldtty;
static int restore_tty;
#endif
-/* sub2video hack:
- Convert subtitles to video with alpha to insert them in filter graphs.
- This is a temporary solution until libavfilter gets real subtitles support.
- */
-
-static void sub2video_heartbeat(InputFile *infile, int64_t pts, AVRational tb)
-{
- /* When a frame is read from a file, examine all sub2video streams in
- the same file and send the sub2video frame again. Otherwise, decoded
- video frames could be accumulating in the filter graph while a filter
- (possibly overlay) is desperately waiting for a subtitle frame. */
- for (int i = 0; i < infile->nb_streams; i++) {
- InputStream *ist = infile->streams[i];
-
- if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
- continue;
-
- for (int j = 0; j < ist->nb_filters; j++)
- ifilter_sub2video_heartbeat(ist->filters[j], pts, tb);
- }
-}
-
-/* end of sub2video hack */
-
static void term_exit_sigsafe(void)
{
#if HAVE_TERMIOS_H
@@ -499,23 +475,13 @@ void update_benchmark(const char *fmt, ...)
}
}
-void close_output_stream(OutputStream *ost)
-{
- OutputFile *of = output_files[ost->file_index];
- ost->finished |= ENCODER_FINISHED;
-
- if (ost->sq_idx_encode >= 0)
- sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-}
-
-static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time)
+static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time, int64_t pts)
{
AVBPrint buf, buf_script;
int64_t total_size = of_filesize(output_files[0]);
int vid;
double bitrate;
double speed;
- int64_t pts = AV_NOPTS_VALUE;
static int64_t last_time = -1;
static int first_report = 1;
uint64_t nb_frames_dup = 0, nb_frames_drop = 0;
@@ -533,7 +499,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
last_time = cur_time;
}
if (((cur_time - last_time) < stats_period && !first_report) ||
- (first_report && nb_output_dumped < nb_output_files))
+ (first_report && atomic_load(&nb_output_dumped) < nb_output_files))
return;
last_time = cur_time;
}
@@ -544,7 +510,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC);
av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC);
for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
- const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA : -1;
+ const float q = ost->enc ? atomic_load(&ost->quality) / (float) FF_QP2LAMBDA : -1;
if (vid && ost->type == AVMEDIA_TYPE_VIDEO) {
av_bprintf(&buf, "q=%2.1f ", q);
@@ -565,22 +531,18 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
if (is_last_report)
av_bprintf(&buf, "L");
- nb_frames_dup = ost->filter->nb_frames_dup;
- nb_frames_drop = ost->filter->nb_frames_drop;
+ nb_frames_dup = atomic_load(&ost->filter->nb_frames_dup);
+ nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop);
vid = 1;
}
- /* compute min output value */
- if (ost->last_mux_dts != AV_NOPTS_VALUE) {
- if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts)
- pts = ost->last_mux_dts;
- if (copy_ts) {
- if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
- copy_ts_first_pts = pts;
- if (copy_ts_first_pts != AV_NOPTS_VALUE)
- pts -= copy_ts_first_pts;
- }
- }
+ }
+
+ if (copy_ts) {
+ if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
+ copy_ts_first_pts = pts;
+ if (copy_ts_first_pts != AV_NOPTS_VALUE)
+ pts -= copy_ts_first_pts;
}
us = FFABS64U(pts) % AV_TIME_BASE;
@@ -783,81 +745,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy)
return 0;
}
-int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt)
-{
- OutputFile *of = output_files[ost->file_index];
- int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base,
- AV_TIME_BASE_Q);
-
- if (!ost->fix_sub_duration_heartbeat || !(pkt->flags & AV_PKT_FLAG_KEY))
- // we are only interested in heartbeats on streams configured, and
- // only on random access points.
- return 0;
-
- for (int i = 0; i < of->nb_streams; i++) {
- OutputStream *iter_ost = of->streams[i];
- InputStream *ist = iter_ost->ist;
- int ret = AVERROR_BUG;
-
- if (iter_ost == ost || !ist || !ist->decoding_needed ||
- ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
- // We wish to skip the stream that causes the heartbeat,
- // output streams without an input stream, streams not decoded
- // (as fix_sub_duration is only done for decoded subtitles) as
- // well as non-subtitle streams.
- continue;
-
- if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0)
- return ret;
- }
-
- return 0;
-}
-
-/* pkt = NULL means EOF (needed to flush decoder buffers) */
-static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
- InputFile *f = input_files[ist->file_index];
- int64_t dts_est = AV_NOPTS_VALUE;
- int ret = 0;
- int eof_reached = 0;
-
- if (ist->decoding_needed) {
- ret = dec_packet(ist, pkt, no_eof);
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
- }
- if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
- eof_reached = 1;
-
- if (pkt && pkt->opaque_ref) {
- DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
- dts_est = pd->dts_est;
- }
-
- if (f->recording_time != INT64_MAX) {
- int64_t start_time = 0;
- if (copy_ts) {
- start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
- start_time += start_at_zero ? 0 : f->start_time_effective;
- }
- if (dts_est >= f->recording_time + start_time)
- pkt = NULL;
- }
-
- for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
- OutputStream *ost = ist->outputs[oidx];
- if (ost->enc || (!pkt && no_eof))
- continue;
-
- ret = of_streamcopy(ost, pkt, dts_est);
- if (ret < 0)
- return ret;
- }
-
- return !eof_reached;
-}
-
static void print_stream_maps(void)
{
av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
@@ -934,43 +821,6 @@ static void print_stream_maps(void)
}
}
-/**
- * Select the output stream to process.
- *
- * @retval 0 an output stream was selected
- * @retval AVERROR(EAGAIN) need to wait until more input is available
- * @retval AVERROR_EOF no more streams need output
- */
-static int choose_output(OutputStream **post)
-{
- int64_t opts_min = INT64_MAX;
- OutputStream *ost_min = NULL;
-
- for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
- int64_t opts;
-
- if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) {
- opts = ost->filter->last_pts;
- } else {
- opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
- INT64_MIN : ost->last_mux_dts;
- }
-
- if (!ost->initialized && !ost->finished) {
- ost_min = ost;
- break;
- }
- if (!ost->finished && opts < opts_min) {
- opts_min = opts;
- ost_min = ost;
- }
- }
- if (!ost_min)
- return AVERROR_EOF;
- *post = ost_min;
- return ost_min->unavailable ? AVERROR(EAGAIN) : 0;
-}
-
static void set_tty_echo(int on)
{
#if HAVE_TERMIOS_H
@@ -1042,149 +892,21 @@ static int check_keyboard_interaction(int64_t cur_time)
return 0;
}
-static void reset_eagain(void)
-{
- for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost))
- ost->unavailable = 0;
-}
-
-static void decode_flush(InputFile *ifile)
-{
- for (int i = 0; i < ifile->nb_streams; i++) {
- InputStream *ist = ifile->streams[i];
-
- if (ist->discard || !ist->decoding_needed)
- continue;
-
- dec_packet(ist, NULL, 1);
- }
-}
-
-/*
- * Return
- * - 0 -- one packet was read and processed
- * - AVERROR(EAGAIN) -- no packets were available for selected file,
- * this function should be called again
- * - AVERROR_EOF -- this function should not be called again
- */
-static int process_input(int file_index, AVPacket *pkt)
-{
- InputFile *ifile = input_files[file_index];
- InputStream *ist;
- int ret, i;
-
- ret = ifile_get_packet(ifile, pkt);
-
- if (ret == 1) {
- /* the input file is looped: flush the decoders */
- decode_flush(ifile);
- return AVERROR(EAGAIN);
- }
- if (ret < 0) {
- if (ret != AVERROR_EOF) {
- av_log(ifile, AV_LOG_ERROR,
- "Error retrieving a packet from demuxer: %s\n", av_err2str(ret));
- if (exit_on_error)
- return ret;
- }
-
- for (i = 0; i < ifile->nb_streams; i++) {
- ist = ifile->streams[i];
- if (!ist->discard) {
- ret = process_input_packet(ist, NULL, 0);
- if (ret>0)
- return 0;
- else if (ret < 0)
- return ret;
- }
-
- /* mark all outputs that don't go through lavfi as finished */
- for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
- OutputStream *ost = ist->outputs[oidx];
- OutputFile *of = output_files[ost->file_index];
-
- ret = of_output_packet(of, ost, NULL);
- if (ret < 0)
- return ret;
- }
- }
-
- ifile->eof_reached = 1;
- return AVERROR(EAGAIN);
- }
-
- reset_eagain();
-
- ist = ifile->streams[pkt->stream_index];
-
- sub2video_heartbeat(ifile, pkt->pts, pkt->time_base);
-
- ret = process_input_packet(ist, pkt, 0);
-
- av_packet_unref(pkt);
-
- return ret < 0 ? ret : 0;
-}
-
-/**
- * Run a single step of transcoding.
- *
- * @return 0 for success, <0 for error
- */
-static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
-{
- InputStream *ist = NULL;
- int ret;
-
- if (ost->filter) {
- if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0)
- return ret;
- if (!ist)
- return 0;
- } else {
- ist = ost->ist;
- av_assert0(ist);
- }
-
- ret = process_input(ist->file_index, demux_pkt);
- if (ret == AVERROR(EAGAIN)) {
- return 0;
- }
-
- if (ret < 0)
- return ret == AVERROR_EOF ? 0 : ret;
-
- // process_input() above might have caused output to become available
- // in multiple filtergraphs, so we process all of them
- for (int i = 0; i < nb_filtergraphs; i++) {
- ret = reap_filters(filtergraphs[i], 0);
- if (ret < 0)
- return ret;
- }
-
- return 0;
-}
-
/*
* The following code is the main loop of the file converter
*/
-static int transcode(Scheduler *sch, int *err_rate_exceeded)
+static int transcode(Scheduler *sch)
{
int ret = 0, i;
- InputStream *ist;
- int64_t timer_start;
- AVPacket *demux_pkt = NULL;
+ int64_t timer_start, transcode_ts = 0;
print_stream_maps();
- *err_rate_exceeded = 0;
atomic_store(&transcode_init_done, 1);
- demux_pkt = av_packet_alloc();
- if (!demux_pkt) {
- ret = AVERROR(ENOMEM);
- goto fail;
- }
+ ret = sch_start(sch);
+ if (ret < 0)
+ return ret;
if (stdin_interaction) {
av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n");
@@ -1192,8 +914,7 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded)
timer_start = av_gettime_relative();
- while (!received_sigterm) {
- OutputStream *ost;
+ while (!sch_wait(sch, stats_period, &transcode_ts)) {
int64_t cur_time= av_gettime_relative();
/* if 'q' pressed, exits */
@@ -1201,49 +922,11 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded)
if (check_keyboard_interaction(cur_time) < 0)
break;
- ret = choose_output(&ost);
- if (ret == AVERROR(EAGAIN)) {
- reset_eagain();
- av_usleep(10000);
- ret = 0;
- continue;
- } else if (ret < 0) {
- av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write to, finishing.\n");
- ret = 0;
- break;
- }
-
- ret = transcode_step(ost, demux_pkt);
- if (ret < 0 && ret != AVERROR_EOF) {
- av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
- break;
- }
-
/* dump report by using the output first video and audio streams */
- print_report(0, timer_start, cur_time);
- }
-
- /* at the end of stream, we must flush the decoder buffers */
- for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) {
- float err_rate;
-
- if (!input_files[ist->file_index]->eof_reached) {
- int err = process_input_packet(ist, NULL, 0);
- ret = err_merge(ret, err);
- }
-
- err_rate = (ist->frames_decoded || ist->decode_errors) ?
- ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
- if (err_rate > max_error_rate) {
- av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
- err_rate, max_error_rate);
- *err_rate_exceeded = 1;
- } else if (err_rate)
- av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
+ print_report(0, timer_start, cur_time, transcode_ts);
}
- ret = err_merge(ret, enc_flush());
- term_exit();
+ ret = sch_stop(sch);
/* write the trailer if needed */
for (i = 0; i < nb_output_files; i++) {
@@ -1251,11 +934,10 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded)
ret = err_merge(ret, err);
}
- /* dump report by using the first video and audio streams */
- print_report(1, timer_start, av_gettime_relative());
+ term_exit();
-fail:
- av_packet_free(&demux_pkt);
+ /* dump report by using the first video and audio streams */
+ print_report(1, timer_start, av_gettime_relative(), transcode_ts);
return ret;
}
@@ -1308,7 +990,7 @@ int main(int argc, char **argv)
{
Scheduler *sch = NULL;
- int ret, err_rate_exceeded;
+ int ret;
BenchmarkTimeStamps ti;
init_dynload();
@@ -1350,7 +1032,7 @@ int main(int argc, char **argv)
}
current_time = ti = get_benchmark_time_stamps();
- ret = transcode(sch, &err_rate_exceeded);
+ ret = transcode(sch);
if (ret >= 0 && do_benchmark) {
int64_t utime, stime, rtime;
current_time = get_benchmark_time_stamps();
@@ -1362,8 +1044,8 @@ int main(int argc, char **argv)
utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0);
}
- ret = received_nb_signals ? 255 :
- err_rate_exceeded ? 69 : ret;
+ ret = received_nb_signals ? 255 :
+ (ret == FFMPEG_ERROR_RATE_EXCEEDED) ? 69 : ret;
finish:
if (ret == AVERROR_EXIT)
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index a89038b765..ba82b7490d 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -61,6 +61,8 @@
#define FFMPEG_OPT_TOP 1
#define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1
+#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D')
+
enum VideoSyncMethod {
VSYNC_AUTO = -1,
VSYNC_PASSTHROUGH,
@@ -82,13 +84,16 @@ enum HWAccelID {
};
enum FrameOpaque {
- FRAME_OPAQUE_REAP_FILTERS = 1,
- FRAME_OPAQUE_CHOOSE_INPUT,
- FRAME_OPAQUE_SUB_HEARTBEAT,
+ FRAME_OPAQUE_SUB_HEARTBEAT = 1,
FRAME_OPAQUE_EOF,
FRAME_OPAQUE_SEND_COMMAND,
};
+enum PacketOpaque {
+ PKT_OPAQUE_SUB_HEARTBEAT = 1,
+ PKT_OPAQUE_FIX_SUB_DURATION,
+};
+
typedef struct HWDevice {
const char *name;
enum AVHWDeviceType type;
@@ -309,11 +314,8 @@ typedef struct OutputFilter {
enum AVMediaType type;
- /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */
- int64_t last_pts;
-
- uint64_t nb_frames_dup;
- uint64_t nb_frames_drop;
+ atomic_uint_least64_t nb_frames_dup;
+ atomic_uint_least64_t nb_frames_drop;
} OutputFilter;
typedef struct FilterGraph {
@@ -426,11 +428,6 @@ typedef struct InputFile {
float readrate;
int accurate_seek;
-
- /* when looping the input file, this queue is used by decoders to report
- * the last frame timestamp back to the demuxer thread */
- AVThreadMessageQueue *audio_ts_queue;
- int audio_ts_queue_size;
} InputFile;
enum forced_keyframes_const {
@@ -532,8 +529,6 @@ typedef struct OutputStream {
InputStream *ist;
AVStream *st; /* stream in the output file */
- /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q */
- int64_t last_mux_dts;
AVRational enc_timebase;
@@ -578,13 +573,6 @@ typedef struct OutputStream {
AVDictionary *sws_dict;
AVDictionary *swr_opts;
char *apad;
- OSTFinished finished; /* no more packets should be written for this stream */
- int unavailable; /* true if the steram is unavailable (possibly temporarily) */
-
- // init_output_stream() has been called for this stream
- // The encoder and the bitstream filters have been initialized and the stream
- // parameters are set in the AVStream.
- int initialized;
const char *attachment_filename;
@@ -598,9 +586,8 @@ typedef struct OutputStream {
uint64_t samples_encoded;
/* packet quality factor */
- int quality;
+ atomic_int quality;
- int sq_idx_encode;
int sq_idx_mux;
EncStats enc_stats_pre;
@@ -658,7 +645,6 @@ extern FilterGraph **filtergraphs;
extern int nb_filtergraphs;
extern char *vstats_filename;
-extern char *sdp_filename;
extern float dts_delta_threshold;
extern float dts_error_threshold;
@@ -691,7 +677,7 @@ extern const AVIOInterruptCB int_cb;
extern const OptionDef options[];
extern HWDevice *filter_hw_device;
-extern unsigned nb_output_dumped;
+extern atomic_uint nb_output_dumped;
extern int ignore_unknown_streams;
extern int copy_unknown_streams;
@@ -737,10 +723,6 @@ FrameData *frame_data(AVFrame *frame);
const FrameData *frame_data_c(AVFrame *frame);
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
-
/**
* Set up fallback filtering parameters from a decoder context. They will only
* be used if no frames are ever sent on this input, otherwise the actual
@@ -761,26 +743,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
void fg_free(FilterGraph **pfg);
-/**
- * Perform a step of transcoding for the specified filter graph.
- *
- * @param[in] graph filter graph to consider
- * @param[out] best_ist input stream where a frame would allow to continue
- * @return 0 for success, <0 for error
- */
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
-
void fg_send_command(FilterGraph *fg, double time, const char *target,
const char *command, const char *arg, int all_filters);
-/**
- * Get and encode new output from specified filtergraph, without causing
- * activity.
- *
- * @return 0 for success, <0 for severe errors
- */
-int reap_filters(FilterGraph *fg, int flush);
-
int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
void enc_stats_write(OutputStream *ost, EncStats *es,
@@ -807,25 +772,11 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
void dec_free(Decoder **pdec);
-/**
- * Submit a packet for decoding
- *
- * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and
- * mark all downstreams as finished.
- *
- * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush
- * decoders and await further input.
- */
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
-
int enc_alloc(Encoder **penc, const AVCodec *codec,
Scheduler *sch, unsigned sch_idx);
void enc_free(Encoder **penc);
-int enc_open(OutputStream *ost, const AVFrame *frame);
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub);
-int enc_frame(OutputStream *ost, AVFrame *frame);
-int enc_flush(void);
+int enc_open(void *opaque, const AVFrame *frame);
/*
* Initialize muxing state for the given stream, should be called
@@ -840,30 +791,11 @@ void of_free(OutputFile **pof);
void of_enc_stats_close(void);
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt);
-
-/**
- * @param dts predicted packet dts in AV_TIME_BASE_Q
- */
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
-
int64_t of_filesize(OutputFile *of);
int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
void ifile_close(InputFile **f);
-/**
- * Get next input packet from the demuxer.
- *
- * @param pkt the packet is written here when this function returns 0
- * @return
- * - 0 when a packet has been read successfully
- * - 1 when stream end was reached, but the stream is looped;
- * caller should flush decoders and read from this demuxer again
- * - a negative error code on failure
- */
-int ifile_get_packet(InputFile *f, AVPacket *pkt);
-
int ist_output_add(InputStream *ist, OutputStream *ost);
int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple);
@@ -880,9 +812,6 @@ InputStream *ist_iter(InputStream *prev);
* pass NULL to start iteration */
OutputStream *ost_iter(OutputStream *prev);
-void close_output_stream(OutputStream *ost);
-int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt);
-int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts);
void update_benchmark(const char *fmt, ...);
#define SPECIFIER_OPT_FMT_str "%s"
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 90ea0d6d93..5dde82a276 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -54,24 +54,6 @@ struct Decoder {
Scheduler *sch;
unsigned sch_idx;
-
- pthread_t thread;
- /**
- * Queue for sending coded packets from the main thread to
- * the decoder thread.
- *
- * An empty packet is sent to flush the decoder without terminating
- * decoding.
- */
- ThreadQueue *queue_in;
- /**
- * Queue for sending decoded frames from the decoder thread
- * to the main thread.
- *
- * An empty frame is sent to signal that a single packet has been fully
- * processed.
- */
- ThreadQueue *queue_out;
};
// data that is local to the decoder thread and not visible outside of it
@@ -80,24 +62,6 @@ typedef struct DecThreadContext {
AVPacket *pkt;
} DecThreadContext;
-static int dec_thread_stop(Decoder *d)
-{
- void *ret;
-
- if (!d->queue_in)
- return 0;
-
- tq_send_finish(d->queue_in, 0);
- tq_receive_finish(d->queue_out, 0);
-
- pthread_join(d->thread, &ret);
-
- tq_free(&d->queue_in);
- tq_free(&d->queue_out);
-
- return (intptr_t)ret;
-}
-
void dec_free(Decoder **pdec)
{
Decoder *dec = *pdec;
@@ -105,8 +69,6 @@ void dec_free(Decoder **pdec)
if (!dec)
return;
- dec_thread_stop(dec);
-
av_frame_free(&dec->frame);
av_packet_free(&dec->pkt);
@@ -148,25 +110,6 @@ fail:
return AVERROR(ENOMEM);
}
-static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
-{
- int i, ret = 0;
-
- for (i = 0; i < ist->nb_filters; i++) {
- ret = ifilter_send_frame(ist->filters[i], decoded_frame,
- i < ist->nb_filters - 1 ||
- ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
- if (ret == AVERROR_EOF)
- ret = 0; /* ignore */
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Failed to inject frame into filter network: %s\n", av_err2str(ret));
- break;
- }
- }
- return ret;
-}
-
static AVRational audio_samplerate_update(void *logctx, Decoder *d,
const AVFrame *frame)
{
@@ -421,28 +364,14 @@ static int process_subtitle(InputStream *ist, AVFrame *frame)
if (!subtitle)
return 0;
- ret = send_frame_to_filters(ist, frame);
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
if (ret < 0)
- return ret;
-
- subtitle = (AVSubtitle*)frame->buf[0]->data;
- if (!subtitle->num_rects)
- return 0;
-
- for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
- OutputStream *ost = ist->outputs[oidx];
- if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE)
- continue;
-
- ret = enc_subtitle(output_files[ost->file_index], ost, subtitle);
- if (ret < 0)
- return ret;
- }
+ av_frame_unref(frame);
- return 0;
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
}
-int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
+static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
{
Decoder *d = ist->decoder;
int ret = AVERROR_BUG;
@@ -468,12 +397,24 @@ int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
AVFrame *frame)
{
- Decoder *d = ist->decoder;
+ Decoder *d = ist->decoder;
AVPacket *flush_pkt = NULL;
AVSubtitle subtitle;
int got_output;
int ret;
+ if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) {
+ frame->pts = pkt->pts;
+ frame->time_base = pkt->time_base;
+ frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
+
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+ } else if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION) {
+ return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts, pkt->time_base,
+ AV_TIME_BASE_Q));
+ }
+
if (!pkt) {
flush_pkt = av_packet_alloc();
if (!flush_pkt)
@@ -496,7 +437,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
ist->frames_decoded++;
- // XXX the queue for transferring data back to the main thread runs
+ // XXX the queue for transferring data to consumers runs
// on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
// inside the frame
// eventually, subtitles should be switched to use AVFrames natively
@@ -509,26 +450,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
frame->width = ist->dec_ctx->width;
frame->height = ist->dec_ctx->height;
- ret = tq_send(d->queue_out, 0, frame);
- if (ret < 0)
- av_frame_unref(frame);
-
- return ret;
-}
-
-static int send_filter_eof(InputStream *ist)
-{
- Decoder *d = ist->decoder;
- int i, ret;
-
- for (i = 0; i < ist->nb_filters; i++) {
- int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
- d->last_frame_pts + d->last_frame_duration_est;
- ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb);
- if (ret < 0)
- return ret;
- }
- return 0;
+ return process_subtitle(ist, frame);
}
static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
@@ -635,9 +557,11 @@ static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
ist->frames_decoded++;
- ret = tq_send(d->queue_out, 0, frame);
- if (ret < 0)
- return ret;
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
+ if (ret < 0) {
+ av_frame_unref(frame);
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+ }
}
}
@@ -679,7 +603,6 @@ fail:
void *decoder_thread(void *arg)
{
InputStream *ist = arg;
- InputFile *ifile = input_files[ist->file_index];
Decoder *d = ist->decoder;
DecThreadContext dt;
int ret = 0, input_status = 0;
@@ -691,19 +614,31 @@ void *decoder_thread(void *arg)
dec_thread_set_name(ist);
while (!input_status) {
- int dummy, flush_buffers;
-
- input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
- flush_buffers = input_status >= 0 && !dt.pkt->buf;
- if (!dt.pkt->buf)
+ int flush_buffers, have_data;
+
+ input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
+ have_data = input_status >= 0 &&
+ (dt.pkt->buf || dt.pkt->side_data_elems ||
+ (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT ||
+ (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION);
+ flush_buffers = input_status >= 0 && !have_data;
+ if (!have_data)
av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
flush_buffers ? "flush" : "EOF");
- ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
+ ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame);
av_packet_unref(dt.pkt);
av_frame_unref(dt.frame);
+ // AVERROR_EOF - EOF from the decoder
+ // AVERROR_EXIT - EOF from the scheduler
+ // we treat them differently when flushing
+ if (ret == AVERROR_EXIT) {
+ ret = AVERROR_EOF;
+ flush_buffers = 0;
+ }
+
if (ret == AVERROR_EOF) {
av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
flush_buffers ? "resetting" : "finishing");
@@ -711,11 +646,10 @@ void *decoder_thread(void *arg)
if (!flush_buffers)
break;
- /* report last frame duration to the demuxer thread */
+ /* report last frame duration to the scheduler */
if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
- Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est,
- .tb = d->last_frame_tb };
- av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0);
+ dt.pkt->pts = d->last_frame_pts + d->last_frame_duration_est;
+ dt.pkt->time_base = d->last_frame_tb;
}
avcodec_flush_buffers(ist->dec_ctx);
@@ -724,147 +658,45 @@ void *decoder_thread(void *arg)
av_err2str(ret));
break;
}
-
- // signal to the consumer thread that the entire packet was processed
- ret = tq_send(d->queue_out, 0, dt.frame);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
- break;
- }
}
// EOF is normal thread termination
if (ret == AVERROR_EOF)
ret = 0;
-finish:
- tq_receive_finish(d->queue_in, 0);
- tq_send_finish (d->queue_out, 0);
-
- // make sure the demuxer does not get stuck waiting for audio durations
- // that will never arrive
- if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
- av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF);
-
- dec_thread_uninit(&dt);
-
- av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
-
- return (void*)(intptr_t)ret;
-}
-
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
- Decoder *d = ist->decoder;
- int ret = 0, thread_ret;
-
- // thread already joined
- if (!d->queue_in)
- return AVERROR_EOF;
+ // on success send EOF timestamp to our downstreams
+ if (ret >= 0) {
+ float err_rate;
- // send the packet/flush request/EOF to the decoder thread
- if (pkt || no_eof) {
- av_packet_unref(d->pkt);
-
- if (pkt) {
- ret = av_packet_ref(d->pkt, pkt);
- if (ret < 0)
- goto finish;
- }
-
- ret = tq_send(d->queue_in, 0, d->pkt);
- if (ret < 0)
- goto finish;
- } else
- tq_send_finish(d->queue_in, 0);
+ av_frame_unref(dt.frame);
- // retrieve all decoded data for the packet
- while (1) {
- int dummy;
+ dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF;
+ dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
+ d->last_frame_pts + d->last_frame_duration_est;
+ dt.frame->time_base = d->last_frame_tb;
- ret = tq_receive(d->queue_out, &dummy, d->frame);
- if (ret < 0)
+ ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
+ if (ret < 0 && ret != AVERROR_EOF) {
+ av_log(NULL, AV_LOG_FATAL,
+ "Error signalling EOF timestamp: %s\n", av_err2str(ret));
goto finish;
-
- // packet fully processed
- if (!d->frame->buf[0])
- return 0;
-
- // process the decoded frame
- if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
- ret = process_subtitle(ist, d->frame);
- } else {
- ret = send_frame_to_filters(ist, d->frame);
}
- av_frame_unref(d->frame);
- if (ret < 0)
- goto finish;
- }
-
-finish:
- thread_ret = dec_thread_stop(d);
- if (thread_ret < 0) {
- av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
- av_err2str(thread_ret));
- ret = err_merge(ret, thread_ret);
- }
- // non-EOF errors here are all fatal
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
-
- // signal EOF to our downstreams
- ret = send_filter_eof(ist);
- if (ret < 0) {
- av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
- return ret;
- }
-
- return AVERROR_EOF;
-}
-
-static int dec_thread_start(InputStream *ist)
-{
- Decoder *d = ist->decoder;
- ObjPool *op;
- int ret = 0;
-
- op = objpool_alloc_packets();
- if (!op)
- return AVERROR(ENOMEM);
-
- d->queue_in = tq_alloc(1, 1, op, pkt_move);
- if (!d->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- op = objpool_alloc_frames();
- if (!op)
- goto fail;
-
- d->queue_out = tq_alloc(1, 4, op, frame_move);
- if (!d->queue_out) {
- objpool_free(&op);
- goto fail;
- }
+ ret = 0;
- ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
- if (ret) {
- ret = AVERROR(ret);
- av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
- av_err2str(ret));
- goto fail;
+ err_rate = (ist->frames_decoded || ist->decode_errors) ?
+ ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
+ if (err_rate > max_error_rate) {
+ av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
+ err_rate, max_error_rate);
+ ret = FFMPEG_ERROR_RATE_EXCEEDED;
+ } else if (err_rate)
+ av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
}
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
+finish:
+ dec_thread_uninit(&dt);
- tq_free(&d->queue_in);
- tq_free(&d->queue_out);
- return ret;
+ return (void*)(intptr_t)ret;
}
static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
@@ -1118,12 +950,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
if (ret < 0)
return ret;
- ret = dec_thread_start(ist);
- if (ret < 0) {
- av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
- av_err2str(ret));
- return ret;
- }
-
return 0;
}
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index 2234dbe076..91cd7a1125 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -22,8 +22,6 @@
#include "ffmpeg.h"
#include "ffmpeg_sched.h"
#include "ffmpeg_utils.h"
-#include "objpool.h"
-#include "thread_queue.h"
#include "libavutil/avassert.h"
#include "libavutil/avstring.h"
@@ -35,7 +33,6 @@
#include "libavutil/pixdesc.h"
#include "libavutil/time.h"
#include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
#include "libavcodec/packet.h"
@@ -66,7 +63,11 @@ typedef struct DemuxStream {
double ts_scale;
+ // scheduler returned EOF for this stream
+ int finished;
+
int streamcopy_needed;
+ int have_sub2video;
int wrap_correction_done;
int saw_first_ts;
@@ -101,6 +102,7 @@ typedef struct Demuxer {
/* number of times input stream should be looped */
int loop;
+ int have_audio_dec;
/* duration of the looped segment of the input file */
Timestamp duration;
/* pts with the smallest/largest values ever seen */
@@ -113,11 +115,12 @@ typedef struct Demuxer {
double readrate_initial_burst;
Scheduler *sch;
- ThreadQueue *thread_queue;
- int thread_queue_size;
- pthread_t thread;
+
+ AVPacket *pkt_heartbeat;
int read_started;
+ int nb_streams_used;
+ int nb_streams_finished;
} Demuxer;
static DemuxStream *ds_from_ist(InputStream *ist)
@@ -153,7 +156,7 @@ static void report_new_stream(Demuxer *d, const AVPacket *pkt)
d->nb_streams_warn = pkt->stream_index + 1;
}
-static int seek_to_start(Demuxer *d)
+static int seek_to_start(Demuxer *d, Timestamp end_pts)
{
InputFile *ifile = &d->f;
AVFormatContext *is = ifile->ctx;
@@ -163,21 +166,10 @@ static int seek_to_start(Demuxer *d)
if (ret < 0)
return ret;
- if (ifile->audio_ts_queue_size) {
- int got_ts = 0;
-
- while (got_ts < ifile->audio_ts_queue_size) {
- Timestamp ts;
- ret = av_thread_message_queue_recv(ifile->audio_ts_queue, &ts, 0);
- if (ret < 0)
- return ret;
- got_ts++;
-
- if (d->max_pts.ts == AV_NOPTS_VALUE ||
- av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb) < 0)
- d->max_pts = ts;
- }
- }
+ if (end_pts.ts != AV_NOPTS_VALUE &&
+ (d->max_pts.ts == AV_NOPTS_VALUE ||
+ av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts, end_pts.tb) < 0))
+ d->max_pts = end_pts;
if (d->max_pts.ts != AV_NOPTS_VALUE) {
int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 : d->min_pts.ts;
@@ -404,7 +396,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
duration = av_rescale_q(d->duration.ts, d->duration.tb, pkt->time_base);
if (pkt->pts != AV_NOPTS_VALUE) {
// audio decoders take precedence for estimating total file duration
- int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 : pkt->duration;
+ int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration;
pkt->pts += duration;
@@ -440,7 +432,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
return 0;
}
-static int input_packet_process(Demuxer *d, AVPacket *pkt)
+static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned *send_flags)
{
InputFile *f = &d->f;
InputStream *ist = f->streams[pkt->stream_index];
@@ -451,6 +443,16 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt)
if (ret < 0)
return ret;
+ if (f->recording_time != INT64_MAX) {
+ int64_t start_time = 0;
+ if (copy_ts) {
+ start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
+ start_time += start_at_zero ? 0 : f->start_time_effective;
+ }
+ if (ds->dts >= f->recording_time + start_time)
+ *send_flags |= DEMUX_SEND_STREAMCOPY_EOF;
+ }
+
ds->data_size += pkt->size;
ds->nb_packets++;
@@ -465,6 +467,8 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt)
av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q));
}
+ pkt->stream_index = ds->sch_idx_stream;
+
return 0;
}
@@ -488,6 +492,65 @@ static void readrate_sleep(Demuxer *d)
}
}
+static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags,
+ const char *pkt_desc)
+{
+ int ret;
+
+ ret = sch_demux_send(d->sch, d->f.index, pkt, flags);
+ if (ret == AVERROR_EOF) {
+ av_packet_unref(pkt);
+
+ av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are done\n");
+ ds->finished = 1;
+
+ if (++d->nb_streams_finished == d->nb_streams_used) {
+ av_log(d, AV_LOG_VERBOSE, "All consumers are done\n");
+ return AVERROR_EOF;
+ }
+ } else if (ret < 0) {
+ if (ret != AVERROR_EXIT)
+ av_log(d, AV_LOG_ERROR,
+ "Unable to send %s packet to consumers: %s\n",
+ pkt_desc, av_err2str(ret));
+ return ret;
+ }
+
+ return 0;
+}
+
+static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags)
+{
+ InputFile *f = &d->f;
+ int ret;
+
+ // send heartbeat for sub2video streams
+ if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) {
+ for (int i = 0; i < f->nb_streams; i++) {
+ DemuxStream *ds1 = ds_from_ist(f->streams[i]);
+
+ if (ds1->finished || !ds1->have_sub2video)
+ continue;
+
+ d->pkt_heartbeat->pts = pkt->pts;
+ d->pkt_heartbeat->time_base = pkt->time_base;
+ d->pkt_heartbeat->stream_index = ds1->sch_idx_stream;
+ d->pkt_heartbeat->opaque = (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT;
+
+ ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat");
+ if (ret < 0)
+ return ret;
+ }
+ }
+
+ ret = do_send(d, ds, pkt, flags, "demuxed");
+ if (ret < 0)
+ return ret;
+
+
+ return 0;
+}
+
static void discard_unused_programs(InputFile *ifile)
{
for (int j = 0; j < ifile->ctx->nb_programs; j++) {
@@ -527,9 +590,13 @@ static void *input_thread(void *arg)
discard_unused_programs(f);
+ d->read_started = 1;
d->wallclock_start = av_gettime_relative();
while (1) {
+ DemuxStream *ds;
+ unsigned send_flags = 0;
+
ret = av_read_frame(f->ctx, pkt);
if (ret == AVERROR(EAGAIN)) {
@@ -538,11 +605,13 @@ static void *input_thread(void *arg)
}
if (ret < 0) {
if (d->loop) {
- /* signal looping to the consumer thread */
+ /* signal looping to our consumers */
pkt->stream_index = -1;
- ret = tq_send(d->thread_queue, 0, pkt);
+
+ ret = sch_demux_send(d->sch, f->index, pkt, 0);
if (ret >= 0)
- ret = seek_to_start(d);
+ ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts,
+ .tb = pkt->time_base });
if (ret >= 0)
continue;
@@ -551,9 +620,11 @@ static void *input_thread(void *arg)
if (ret == AVERROR_EOF)
av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n");
- else
+ else {
av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n",
av_err2str(ret));
+ ret = exit_on_error ? ret : 0;
+ }
break;
}
@@ -565,8 +636,9 @@ static void *input_thread(void *arg)
/* the following test is needed in case new streams appear
dynamically in stream : we ignore them */
- if (pkt->stream_index >= f->nb_streams ||
- f->streams[pkt->stream_index]->discard) {
+ ds = pkt->stream_index < f->nb_streams ?
+ ds_from_ist(f->streams[pkt->stream_index]) : NULL;
+ if (!ds || ds->ist.discard || ds->finished) {
report_new_stream(d, pkt);
av_packet_unref(pkt);
continue;
@@ -583,122 +655,26 @@ static void *input_thread(void *arg)
}
}
- ret = input_packet_process(d, pkt);
+ ret = input_packet_process(d, pkt, &send_flags);
if (ret < 0)
break;
if (f->readrate)
readrate_sleep(d);
- ret = tq_send(d->thread_queue, 0, pkt);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(f, AV_LOG_ERROR,
- "Unable to send packet to main thread: %s\n",
- av_err2str(ret));
+ ret = demux_send(d, ds, pkt, send_flags);
+ if (ret < 0)
break;
- }
}
-finish:
- av_assert0(ret < 0);
- tq_send_finish(d->thread_queue, 0);
+ // EOF/EXIT is normal termination
+ if (ret == AVERROR_EOF || ret == AVERROR_EXIT)
+ ret = 0;
+finish:
av_packet_free(&pkt);
- av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n");
-
- return NULL;
-}
-
-static void thread_stop(Demuxer *d)
-{
- InputFile *f = &d->f;
-
- if (!d->thread_queue)
- return;
-
- tq_receive_finish(d->thread_queue, 0);
-
- pthread_join(d->thread, NULL);
-
- tq_free(&d->thread_queue);
-
- av_thread_message_queue_free(&f->audio_ts_queue);
-}
-
-static int thread_start(Demuxer *d)
-{
- int ret;
- InputFile *f = &d->f;
- ObjPool *op;
-
- if (d->thread_queue_size <= 0)
- d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
-
- op = objpool_alloc_packets();
- if (!op)
- return AVERROR(ENOMEM);
-
- d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move);
- if (!d->thread_queue) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- if (d->loop) {
- int nb_audio_dec = 0;
-
- for (int i = 0; i < f->nb_streams; i++) {
- InputStream *ist = f->streams[i];
- nb_audio_dec += !!(ist->decoding_needed &&
- ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO);
- }
-
- if (nb_audio_dec) {
- ret = av_thread_message_queue_alloc(&f->audio_ts_queue,
- nb_audio_dec, sizeof(Timestamp));
- if (ret < 0)
- goto fail;
- f->audio_ts_queue_size = nb_audio_dec;
- }
- }
-
- if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) {
- av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
- ret = AVERROR(ret);
- goto fail;
- }
-
- d->read_started = 1;
-
- return 0;
-fail:
- tq_free(&d->thread_queue);
- return ret;
-}
-
-int ifile_get_packet(InputFile *f, AVPacket *pkt)
-{
- Demuxer *d = demuxer_from_ifile(f);
- int ret, dummy;
-
- if (!d->thread_queue) {
- ret = thread_start(d);
- if (ret < 0)
- return ret;
- }
-
- ret = tq_receive(d->thread_queue, &dummy, pkt);
- if (ret < 0)
- return ret;
-
- if (pkt->stream_index == -1) {
- av_assert0(!pkt->data && !pkt->side_data_elems);
- return 1;
- }
-
- return 0;
+ return (void*)(intptr_t)ret;
}
static void demux_final_stats(Demuxer *d)
@@ -769,8 +745,6 @@ void ifile_close(InputFile **pf)
if (!f)
return;
- thread_stop(d);
-
if (d->read_started)
demux_final_stats(d);
@@ -780,6 +754,8 @@ void ifile_close(InputFile **pf)
avformat_close_input(&f->ctx);
+ av_packet_free(&d->pkt_heartbeat);
+
av_freep(pf);
}
@@ -802,7 +778,11 @@ static int ist_use(InputStream *ist, int decoding_needed)
ds->sch_idx_stream = ret;
}
- ist->discard = 0;
+ if (ist->discard) {
+ ist->discard = 0;
+ d->nb_streams_used++;
+ }
+
ist->st->discard = ist->user_set_discard;
ist->decoding_needed |= decoding_needed;
ds->streamcopy_needed |= !decoding_needed;
@@ -823,6 +803,8 @@ static int ist_use(InputStream *ist, int decoding_needed)
ret = dec_open(ist, d->sch, ds->sch_idx_dec);
if (ret < 0)
return ret;
+
+ d->have_audio_dec |= is_audio;
}
return 0;
@@ -848,6 +830,7 @@ int ist_output_add(InputStream *ist, OutputStream *ost)
int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
{
+ Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]);
DemuxStream *ds = ds_from_ist(ist);
int ret;
@@ -866,6 +849,15 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
if (ret < 0)
return ret;
+ if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) {
+ if (!d->pkt_heartbeat) {
+ d->pkt_heartbeat = av_packet_alloc();
+ if (!d->pkt_heartbeat)
+ return AVERROR(ENOMEM);
+ }
+ ds->have_sub2video = 1;
+ }
+
return ds->sch_idx_dec;
}
@@ -1607,8 +1599,6 @@ int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch)
"since neither -readrate nor -re were given\n");
}
- d->thread_queue_size = o->thread_queue_size;
-
/* Add all the streams from the given input file to the demuxer */
for (int i = 0; i < ic->nb_streams; i++) {
ret = ist_add(o, d, ic->streams[i]);
diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c
index 9871381c0e..320df2dee7 100644
--- a/fftools/ffmpeg_enc.c
+++ b/fftools/ffmpeg_enc.c
@@ -41,12 +41,6 @@
#include "libavformat/avformat.h"
struct Encoder {
- AVFrame *sq_frame;
-
- // packet for receiving encoded output
- AVPacket *pkt;
- AVFrame *sub_frame;
-
// combined size of all the packets received from the encoder
uint64_t data_size;
@@ -54,25 +48,9 @@ struct Encoder {
uint64_t packets_encoded;
int opened;
- int finished;
Scheduler *sch;
unsigned sch_idx;
-
- pthread_t thread;
- /**
- * Queue for sending frames from the main thread to
- * the encoder thread.
- */
- ThreadQueue *queue_in;
- /**
- * Queue for sending encoded packets from the encoder thread
- * to the main thread.
- *
- * An empty packet is sent to signal that a previously sent
- * frame has been fully processed.
- */
- ThreadQueue *queue_out;
};
// data that is local to the decoder thread and not visible outside of it
@@ -81,24 +59,6 @@ typedef struct EncoderThread {
AVPacket *pkt;
} EncoderThread;
-static int enc_thread_stop(Encoder *e)
-{
- void *ret;
-
- if (!e->queue_in)
- return 0;
-
- tq_send_finish(e->queue_in, 0);
- tq_receive_finish(e->queue_out, 0);
-
- pthread_join(e->thread, &ret);
-
- tq_free(&e->queue_in);
- tq_free(&e->queue_out);
-
- return (int)(intptr_t)ret;
-}
-
void enc_free(Encoder **penc)
{
Encoder *enc = *penc;
@@ -106,13 +66,6 @@ void enc_free(Encoder **penc)
if (!enc)
return;
- enc_thread_stop(enc);
-
- av_frame_free(&enc->sq_frame);
- av_frame_free(&enc->sub_frame);
-
- av_packet_free(&enc->pkt);
-
av_freep(penc);
}
@@ -127,25 +80,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec,
if (!enc)
return AVERROR(ENOMEM);
- if (codec->type == AVMEDIA_TYPE_SUBTITLE) {
- enc->sub_frame = av_frame_alloc();
- if (!enc->sub_frame)
- goto fail;
- }
-
- enc->pkt = av_packet_alloc();
- if (!enc->pkt)
- goto fail;
-
enc->sch = sch;
enc->sch_idx = sch_idx;
*penc = enc;
return 0;
-fail:
- enc_free(&enc);
- return AVERROR(ENOMEM);
}
static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef *frames_ref)
@@ -224,52 +164,9 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost)
return 0;
}
-static int enc_thread_start(OutputStream *ost)
-{
- Encoder *e = ost->enc;
- ObjPool *op;
- int ret = 0;
-
- op = objpool_alloc_frames();
- if (!op)
- return AVERROR(ENOMEM);
-
- e->queue_in = tq_alloc(1, 1, op, frame_move);
- if (!e->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- op = objpool_alloc_packets();
- if (!op)
- goto fail;
-
- e->queue_out = tq_alloc(1, 4, op, pkt_move);
- if (!e->queue_out) {
- objpool_free(&op);
- goto fail;
- }
-
- ret = pthread_create(&e->thread, NULL, encoder_thread, ost);
- if (ret) {
- ret = AVERROR(ret);
- av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n",
- av_err2str(ret));
- goto fail;
- }
-
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
-
- tq_free(&e->queue_in);
- tq_free(&e->queue_out);
- return ret;
-}
-
-int enc_open(OutputStream *ost, const AVFrame *frame)
+int enc_open(void *opaque, const AVFrame *frame)
{
+ OutputStream *ost = opaque;
InputStream *ist = ost->ist;
Encoder *e = ost->enc;
AVCodecContext *enc_ctx = ost->enc_ctx;
@@ -277,6 +174,7 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
const AVCodec *enc = enc_ctx->codec;
OutputFile *of = output_files[ost->file_index];
FrameData *fd;
+ int frame_samples = 0;
int ret;
if (e->opened)
@@ -420,17 +318,8 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
e->opened = 1;
- if (ost->sq_idx_encode >= 0) {
- e->sq_frame = av_frame_alloc();
- if (!e->sq_frame)
- return AVERROR(ENOMEM);
- }
-
- if (ost->enc_ctx->frame_size) {
- av_assert0(ost->sq_idx_encode >= 0);
- sq_frame_samples(output_files[ost->file_index]->sq_encode,
- ost->sq_idx_encode, ost->enc_ctx->frame_size);
- }
+ if (ost->enc_ctx->frame_size)
+ frame_samples = ost->enc_ctx->frame_size;
ret = check_avoptions(ost->encoder_opts);
if (ret < 0)
@@ -476,18 +365,11 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0)
ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1});
- ret = enc_thread_start(ost);
- if (ret < 0) {
- av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n",
- av_err2str(ret));
- return ret;
- }
-
ret = of_stream_init(of, ost);
if (ret < 0)
return ret;
- return 0;
+ return frame_samples;
}
static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb)
@@ -514,8 +396,7 @@ static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n");
return exit_on_error ? AVERROR(EINVAL) : 0;
}
- if (ost->finished ||
- (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
+ if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
return 0;
enc = ost->enc_ctx;
@@ -579,7 +460,7 @@ static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
}
pkt->dts = pkt->pts;
- ret = tq_send(e->queue_out, 0, pkt);
+ ret = sch_enc_send(e->sch, e->sch_idx, pkt);
if (ret < 0) {
av_packet_unref(pkt);
return ret;
@@ -671,10 +552,13 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
int64_t frame_number;
double ti1, bitrate, avg_bitrate;
double psnr_val = -1;
+ int quality;
- ost->quality = sd ? AV_RL32(sd) : -1;
+ quality = sd ? AV_RL32(sd) : -1;
pict_type = sd ? sd[4] : AV_PICTURE_TYPE_NONE;
+ atomic_store(&ost->quality, quality);
+
if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) {
// FIXME the scaling assumes 8bit
double error = AV_RL64(sd + 8) / (enc->width * enc->height * 255.0 * 255.0);
@@ -697,10 +581,10 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
frame_number = e->packets_encoded;
if (vstats_version <= 1) {
fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number,
- ost->quality / (float)FF_QP2LAMBDA);
+ quality / (float)FF_QP2LAMBDA);
} else {
fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f ", ost->file_index, ost->index, frame_number,
- ost->quality / (float)FF_QP2LAMBDA);
+ quality / (float)FF_QP2LAMBDA);
}
if (psnr_val >= 0)
@@ -801,18 +685,11 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base));
}
- if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Subtitle heartbeat logic failed in %s! (%s)\n",
- __func__, av_err2str(ret));
- return ret;
- }
-
e->data_size += pkt->size;
e->packets_encoded++;
- ret = tq_send(e->queue_out, 0, pkt);
+ ret = sch_enc_send(e->sch, e->sch_idx, pkt);
if (ret < 0) {
av_packet_unref(pkt);
return ret;
@@ -822,50 +699,6 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
av_assert0(0);
}
-static int submit_encode_frame(OutputFile *of, OutputStream *ost,
- AVFrame *frame, AVPacket *pkt)
-{
- Encoder *e = ost->enc;
- int ret;
-
- if (ost->sq_idx_encode < 0)
- return encode_frame(of, ost, frame, pkt);
-
- if (frame) {
- ret = av_frame_ref(e->sq_frame, frame);
- if (ret < 0)
- return ret;
- frame = e->sq_frame;
- }
-
- ret = sq_send(of->sq_encode, ost->sq_idx_encode,
- SQFRAME(frame));
- if (ret < 0) {
- if (frame)
- av_frame_unref(frame);
- if (ret != AVERROR_EOF)
- return ret;
- }
-
- while (1) {
- AVFrame *enc_frame = e->sq_frame;
-
- ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
- SQFRAME(enc_frame));
- if (ret == AVERROR_EOF) {
- enc_frame = NULL;
- } else if (ret < 0) {
- return (ret == AVERROR(EAGAIN)) ? 0 : ret;
- }
-
- ret = encode_frame(of, ost, enc_frame, pkt);
- if (enc_frame)
- av_frame_unref(enc_frame);
- if (ret < 0)
- return ret;
- }
-}
-
static int do_audio_out(OutputFile *of, OutputStream *ost,
AVFrame *frame, AVPacket *pkt)
{
@@ -881,7 +714,7 @@ static int do_audio_out(OutputFile *of, OutputStream *ost,
if (!check_recording_time(ost, frame->pts, frame->time_base))
return AVERROR_EOF;
- return submit_encode_frame(of, ost, frame, pkt);
+ return encode_frame(of, ost, frame, pkt);
}
static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf,
@@ -949,7 +782,7 @@ static int do_video_out(OutputFile *of, OutputStream *ost,
}
#endif
- return submit_encode_frame(of, ost, in_picture, pkt);
+ return encode_frame(of, ost, in_picture, pkt);
}
static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
@@ -958,9 +791,12 @@ static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
enum AVMediaType type = ost->type;
if (type == AVMEDIA_TYPE_SUBTITLE) {
+ const AVSubtitle *subtitle = frame && frame->buf[0] ?
+ (AVSubtitle*)frame->buf[0]->data : NULL;
+
// no flushing for subtitles
- return frame ?
- do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0;
+ return subtitle && subtitle->num_rects ?
+ do_subtitle_out(of, ost, subtitle, pkt) : 0;
}
if (frame) {
@@ -968,7 +804,7 @@ static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
do_audio_out(of, ost, frame, pkt);
}
- return submit_encode_frame(of, ost, NULL, pkt);
+ return encode_frame(of, ost, NULL, pkt);
}
static void enc_thread_set_name(const OutputStream *ost)
@@ -1009,25 +845,52 @@ fail:
void *encoder_thread(void *arg)
{
OutputStream *ost = arg;
- OutputFile *of = output_files[ost->file_index];
Encoder *e = ost->enc;
EncoderThread et;
int ret = 0, input_status = 0;
+ int name_set = 0;
ret = enc_thread_init(&et);
if (ret < 0)
goto finish;
- enc_thread_set_name(ost);
+ /* Open the subtitle encoders immediately. AVFrame-based encoders
+ * are opened through a callback from the scheduler once they get
+ * their first frame
+ *
+ * N.B.: because the callback is called from a different thread,
+ * enc_ctx MUST NOT be accessed before sch_enc_receive() returns
+ * for the first time for audio/video. */
+ if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type != AVMEDIA_TYPE_AUDIO) {
+ ret = enc_open(ost, NULL);
+ if (ret < 0)
+ goto finish;
+ }
while (!input_status) {
- int dummy;
+ input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame);
+ if (input_status < 0) {
+ if (input_status == AVERROR_EOF) {
+ av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
+ if (e->opened)
+ break;
+
+ av_log(ost, AV_LOG_ERROR, "Could not open encoder before EOF\n");
+ ret = AVERROR(EINVAL);
+ } else {
+ av_log(ost, AV_LOG_ERROR, "Error receiving a frame for encoding: %s\n",
+ av_err2str(ret));
+ ret = input_status;
+ }
+ goto finish;
+ }
- input_status = tq_receive(e->queue_in, &dummy, et.frame);
- if (input_status < 0)
- av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
+ if (!name_set) {
+ enc_thread_set_name(ost);
+ name_set = 1;
+ }
- ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt);
+ ret = frame_encode(ost, et.frame, et.pkt);
av_packet_unref(et.pkt);
av_frame_unref(et.frame);
@@ -1040,15 +903,14 @@ void *encoder_thread(void *arg)
av_err2str(ret));
break;
}
+ }
- // signal to the consumer thread that the frame was encoded
- ret = tq_send(e->queue_out, 0, et.pkt);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(ost, AV_LOG_ERROR,
- "Error communicating with the main thread\n");
- break;
- }
+ // flush the encoder
+ if (ret == 0 || ret == AVERROR_EOF) {
+ ret = frame_encode(ost, NULL, et.pkt);
+ if (ret < 0 && ret != AVERROR_EOF)
+ av_log(ost, AV_LOG_ERROR, "Error flushing encoder: %s\n",
+ av_err2str(ret));
}
// EOF is normal thread termination
@@ -1056,118 +918,7 @@ void *encoder_thread(void *arg)
ret = 0;
finish:
- if (ost->sq_idx_encode >= 0)
- sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-
- tq_receive_finish(e->queue_in, 0);
- tq_send_finish (e->queue_out, 0);
-
enc_thread_uninit(&et);
- av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n");
-
return (void*)(intptr_t)ret;
}
-
-int enc_frame(OutputStream *ost, AVFrame *frame)
-{
- OutputFile *of = output_files[ost->file_index];
- Encoder *e = ost->enc;
- int ret, thread_ret;
-
- ret = enc_open(ost, frame);
- if (ret < 0)
- return ret;
-
- if (!e->queue_in)
- return AVERROR_EOF;
-
- // send the frame/EOF to the encoder thread
- if (frame) {
- ret = tq_send(e->queue_in, 0, frame);
- if (ret < 0)
- goto finish;
- } else
- tq_send_finish(e->queue_in, 0);
-
- // retrieve all encoded data for the frame
- while (1) {
- int dummy;
-
- ret = tq_receive(e->queue_out, &dummy, e->pkt);
- if (ret < 0)
- break;
-
- // frame fully encoded
- if (!e->pkt->data && !e->pkt->side_data_elems)
- return 0;
-
- // process the encoded packet
- ret = of_output_packet(of, ost, e->pkt);
- if (ret < 0)
- goto finish;
- }
-
-finish:
- thread_ret = enc_thread_stop(e);
- if (thread_ret < 0) {
- av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n",
- av_err2str(thread_ret));
- ret = err_merge(ret, thread_ret);
- }
-
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
-
- // signal EOF to the muxer
- return of_output_packet(of, ost, NULL);
-}
-
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
-{
- Encoder *e = ost->enc;
- AVFrame *f = e->sub_frame;
- int ret;
-
- // XXX the queue for transferring data to the encoder thread runs
- // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put
- // that inside the frame
- // eventually, subtitles should be switched to use AVFrames natively
- ret = subtitle_wrap_frame(f, sub, 1);
- if (ret < 0)
- return ret;
-
- ret = enc_frame(ost, f);
- av_frame_unref(f);
-
- return ret;
-}
-
-int enc_flush(void)
-{
- int ret = 0;
-
- for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
- OutputFile *of = output_files[ost->file_index];
- if (ost->sq_idx_encode >= 0)
- sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
- }
-
- for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
- Encoder *e = ost->enc;
- AVCodecContext *enc = ost->enc_ctx;
- int err;
-
- if (!enc || !e->opened ||
- (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO))
- continue;
-
- err = enc_frame(ost, NULL);
- if (err != AVERROR_EOF && ret < 0)
- ret = err_merge(ret, err);
-
- av_assert0(!e->queue_in);
- }
-
- return ret;
-}
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 635b1b0b6e..ada235b084 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -21,8 +21,6 @@
#include <stdint.h>
#include "ffmpeg.h"
-#include "ffmpeg_utils.h"
-#include "thread_queue.h"
#include "libavfilter/avfilter.h"
#include "libavfilter/buffersink.h"
@@ -53,10 +51,11 @@ typedef struct FilterGraphPriv {
// true when the filtergraph contains only meta filters
// that do not modify the frame data
int is_meta;
+ // source filters are present in the graph
+ int have_sources;
int disable_conversions;
- int nb_inputs_bound;
- int nb_outputs_bound;
+ unsigned nb_outputs_done;
const char *graph_desc;
@@ -67,41 +66,6 @@ typedef struct FilterGraphPriv {
Scheduler *sch;
unsigned sch_idx;
-
- pthread_t thread;
- /**
- * Queue for sending frames from the main thread to the filtergraph. Has
- * nb_inputs+1 streams - the first nb_inputs stream correspond to
- * filtergraph inputs. Frames on those streams may have their opaque set to
- * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
- * EOF event for the correspondint stream. Will be immediately followed by
- * this stream being send-closed.
- * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
- * a subtitle heartbeat event. Will only be sent for sub2video streams.
- *
- * The last stream is "control" - the main thread sends empty AVFrames with
- * opaque set to
- * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available
- * from filtergraph outputs. These frames are sent to corresponding
- * streams in queue_out. Finally an empty frame is sent to the control
- * stream in queue_out.
- * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are
- * available the terminating empty frame's opaque will contain the index+1
- * of the filtergraph input to which more input frames should be supplied.
- */
- ThreadQueue *queue_in;
- /**
- * Queue for sending frames from the filtergraph back to the main thread.
- * Has nb_outputs+1 streams - the first nb_outputs stream correspond to
- * filtergraph outputs.
- *
- * The last stream is "control" - see documentation for queue_in for more
- * details.
- */
- ThreadQueue *queue_out;
- // submitting frames to filter thread returned EOF
- // this only happens on thread exit, so is not per-input
- int eof_in;
} FilterGraphPriv;
static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
@@ -123,6 +87,9 @@ typedef struct FilterGraphThread {
// The output index is stored in frame opaque.
AVFifo *frame_queue_out;
+ // index of the next input to request from the scheduler
+ unsigned next_in;
+ // set to 1 after at least one frame passed through this output
int got_frame;
// EOF status of each input/output, as received by the thread
@@ -253,9 +220,6 @@ typedef struct OutputFilterPriv {
int64_t ts_offset;
int64_t next_pts;
FPSConvContext fps;
-
- // set to 1 after at least one frame passed through this output
- int got_frame;
} OutputFilterPriv;
static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
@@ -653,57 +617,6 @@ static int ifilter_has_all_input_formats(FilterGraph *fg)
static void *filter_thread(void *arg);
-// start the filtering thread once all inputs and outputs are bound
-static int fg_thread_try_start(FilterGraphPriv *fgp)
-{
- FilterGraph *fg = &fgp->fg;
- ObjPool *op;
- int ret = 0;
-
- if (fgp->nb_inputs_bound < fg->nb_inputs ||
- fgp->nb_outputs_bound < fg->nb_outputs)
- return 0;
-
- op = objpool_alloc_frames();
- if (!op)
- return AVERROR(ENOMEM);
-
- fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
- if (!fgp->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- // at least one output is mandatory
- op = objpool_alloc_frames();
- if (!op)
- goto fail;
-
- fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
- if (!fgp->queue_out) {
- objpool_free(&op);
- goto fail;
- }
-
- ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
- if (ret) {
- ret = AVERROR(ret);
- av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
- fg->index, av_err2str(ret));
- goto fail;
- }
-
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
-
- tq_free(&fgp->queue_in);
- tq_free(&fgp->queue_out);
-
- return ret;
-}
-
static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
{
AVFilterContext *ctx = inout->filter_ctx;
@@ -729,7 +642,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg)
ofilter->graph = fg;
ofp->format = -1;
ofp->index = fg->nb_outputs - 1;
- ofilter->last_pts = AV_NOPTS_VALUE;
return ofilter;
}
@@ -760,10 +672,7 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
return AVERROR(ENOMEM);
}
- fgp->nb_inputs_bound++;
- av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
-
- return fg_thread_try_start(fgp);
+ return 0;
}
static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -902,10 +811,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
if (ret < 0)
return ret;
- fgp->nb_outputs_bound++;
- av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
-
- return fg_thread_try_start(fgp);
+ return 0;
}
static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -935,34 +841,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg)
return ifilter;
}
-static int fg_thread_stop(FilterGraphPriv *fgp)
-{
- void *ret;
-
- if (!fgp->queue_in)
- return 0;
-
- for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
- InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
- ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
-
- if (ifp)
- ifp->eof = 1;
-
- tq_send_finish(fgp->queue_in, i);
- }
-
- for (int i = 0; i <= fgp->fg.nb_outputs; i++)
- tq_receive_finish(fgp->queue_out, i);
-
- pthread_join(fgp->thread, &ret);
-
- tq_free(&fgp->queue_in);
- tq_free(&fgp->queue_out);
-
- return (int)(intptr_t)ret;
-}
-
void fg_free(FilterGraph **pfg)
{
FilterGraph *fg = *pfg;
@@ -972,8 +850,6 @@ void fg_free(FilterGraph **pfg)
return;
fgp = fgp_from_fg(fg);
- fg_thread_stop(fgp);
-
avfilter_graph_free(&fg->graph);
for (int j = 0; j < fg->nb_inputs; j++) {
InputFilter *ifilter = fg->inputs[j];
@@ -1072,6 +948,15 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch)
if (ret < 0)
goto fail;
+ for (unsigned i = 0; i < graph->nb_filters; i++) {
+ const AVFilter *f = graph->filters[i]->filter;
+ if (!avfilter_filter_pad_count(f, 0) &&
+ !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) {
+ fgp->have_sources = 1;
+ break;
+ }
+ }
+
for (AVFilterInOut *cur = inputs; cur; cur = cur->next) {
InputFilter *const ifilter = ifilter_alloc(fg);
InputFilterPriv *ifp;
@@ -1800,6 +1685,7 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
AVBufferRef *hw_device;
AVFilterInOut *inputs, *outputs, *cur;
int ret, i, simple = filtergraph_is_simple(fg);
+ int have_input_eof = 0;
const char *graph_desc = fgp->graph_desc;
cleanup_filtergraph(fg);
@@ -1922,11 +1808,18 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
ret = av_buffersrc_add_frame(ifp->filter, NULL);
if (ret < 0)
goto fail;
+ have_input_eof = 1;
}
}
- return 0;
+ if (have_input_eof) {
+ // make sure the EOF propagates to the end of the graph
+ ret = avfilter_graph_request_oldest(fg->graph);
+ if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+ goto fail;
+ }
+ return 0;
fail:
cleanup_filtergraph(fg);
return ret;
@@ -2182,7 +2075,7 @@ static void video_sync_process(OutputFilterPriv *ofp, AVFrame *frame,
fps->frames_prev_hist[2]);
if (!*nb_frames && fps->last_dropped) {
- ofilter->nb_frames_drop++;
+ atomic_fetch_add(&ofilter->nb_frames_drop, 1);
fps->last_dropped++;
}
@@ -2260,21 +2153,23 @@ finish:
fps->frames_prev_hist[0] = *nb_frames_prev;
if (*nb_frames_prev == 0 && fps->last_dropped) {
- ofilter->nb_frames_drop++;
+ atomic_fetch_add(&ofilter->nb_frames_drop, 1);
av_log(ost, AV_LOG_VERBOSE,
"*** dropping frame %"PRId64" at ts %"PRId64"\n",
fps->frame_number, fps->last_frame->pts);
}
if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > *nb_frames_prev)) {
+ uint64_t nb_frames_dup;
if (*nb_frames > dts_error_threshold * 30) {
av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too large, skipping\n", *nb_frames - 1);
- ofilter->nb_frames_drop++;
+ atomic_fetch_add(&ofilter->nb_frames_drop, 1);
*nb_frames = 0;
return;
}
- ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev);
+ nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup,
+ *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev));
av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - 1);
- if (ofilter->nb_frames_dup > fps->dup_warning) {
+ if (nb_frames_dup > fps->dup_warning) {
av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames duplicated\n", fps->dup_warning);
fps->dup_warning *= 10;
}
@@ -2284,8 +2179,57 @@ finish:
fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
}
+static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
+{
+ FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
+ int ret;
+
+ // we are finished and no frames were ever seen at this output,
+ // at least initialize the encoder with a dummy frame
+ if (!fgt->got_frame) {
+ AVFrame *frame = fgt->frame;
+ FrameData *fd;
+
+ frame->time_base = ofp->tb_out;
+ frame->format = ofp->format;
+
+ frame->width = ofp->width;
+ frame->height = ofp->height;
+ frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
+
+ frame->sample_rate = ofp->sample_rate;
+ if (ofp->ch_layout.nb_channels) {
+ ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
+ if (ret < 0)
+ return ret;
+ }
+
+ fd = frame_data(frame);
+ if (!fd)
+ return AVERROR(ENOMEM);
+
+ fd->frame_rate_filter = ofp->fps.framerate;
+
+ av_assert0(!frame->buf[0]);
+
+ av_log(ofp->ofilter.ost, AV_LOG_WARNING,
+ "No filtered frames for output stream, trying to "
+ "initialize anyway.\n");
+
+ ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
+ if (ret < 0) {
+ av_frame_unref(frame);
+ return ret;
+ }
+ }
+
+ fgt->eof_out[ofp->index] = 1;
+
+ return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
+}
+
static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
AVFrame *frame_prev = ofp->fps.last_frame;
@@ -2332,28 +2276,17 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
frame_out = frame;
}
- if (buffer) {
- AVFrame *f = av_frame_alloc();
-
- if (!f) {
+ {
+ // send the frame to consumers
+ ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out);
+ if (ret < 0) {
av_frame_unref(frame_out);
- return AVERROR(ENOMEM);
- }
- av_frame_move_ref(f, frame_out);
- f->opaque = (void*)(intptr_t)ofp->index;
+ if (!fgt->eof_out[ofp->index]) {
+ fgt->eof_out[ofp->index] = 1;
+ fgp->nb_outputs_done++;
+ }
- ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
- if (ret < 0) {
- av_frame_free(&f);
- return AVERROR(ENOMEM);
- }
- } else {
- // return the frame to the main thread
- ret = tq_send(fgp->queue_out, ofp->index, frame_out);
- if (ret < 0) {
- av_frame_unref(frame_out);
- fgt->eof_out[ofp->index] = 1;
return ret == AVERROR_EOF ? 0 : ret;
}
}
@@ -2374,16 +2307,14 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
av_frame_move_ref(frame_prev, frame);
}
- if (!frame) {
- tq_send_finish(fgp->queue_out, ofp->index);
- fgt->eof_out[ofp->index] = 1;
- }
+ if (!frame)
+ return close_output(ofp, fgt);
return 0;
}
static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
OutputStream *ost = ofp->ofilter.ost;
@@ -2393,8 +2324,8 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
ret = av_buffersink_get_frame_flags(filter, frame,
AV_BUFFERSINK_FLAG_NO_REQUEST);
- if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
- ret = fg_output_frame(ofp, fgt, NULL, buffer);
+ if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
+ ret = fg_output_frame(ofp, fgt, NULL);
return (ret < 0) ? ret : 1;
} else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
return 1;
@@ -2448,7 +2379,7 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
fd->frame_rate_filter = ofp->fps.framerate;
}
- ret = fg_output_frame(ofp, fgt, frame, buffer);
+ ret = fg_output_frame(ofp, fgt, frame);
av_frame_unref(frame);
if (ret < 0)
return ret;
@@ -2456,44 +2387,68 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
return 0;
}
-/* retrieve all frames available at filtergraph outputs and either send them to
- * the main thread (buffer=0) or buffer them for later (buffer=1) */
+/* retrieve all frames available at filtergraph outputs
+ * and send them to consumers */
static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(fg);
- int ret = 0;
+ int did_step = 0;
- if (!fg->graph)
- return 0;
+ // graph not configured, just select the input to request
+ if (!fg->graph) {
+ for (int i = 0; i < fg->nb_inputs; i++) {
+ InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
+ if (ifp->format < 0 && !fgt->eof_in[i]) {
+ fgt->next_in = i;
+ return 0;
+ }
+ }
- // process buffered frames
- if (!buffer) {
- AVFrame *f;
+ // This state - graph is not configured, but all inputs are either
+ // initialized or EOF - should be unreachable because sending EOF to a
+ // filter without even a fallback format should fail
+ av_assert0(0);
+ return AVERROR_BUG;
+ }
- while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
- int out_idx = (intptr_t)f->opaque;
- f->opaque = NULL;
- ret = tq_send(fgp->queue_out, out_idx, f);
- av_frame_free(&f);
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
+ while (fgp->nb_outputs_done < fg->nb_outputs) {
+ int ret;
+
+ ret = avfilter_graph_request_oldest(fg->graph);
+ if (ret == AVERROR(EAGAIN)) {
+ fgt->next_in = choose_input(fg, fgt);
+ break;
+ } else if (ret < 0) {
+ if (ret == AVERROR_EOF)
+ av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, finishing\n");
+ else
+ av_log(fg, AV_LOG_ERROR,
+ "Error requesting a frame from the filtergraph: %s\n",
+ av_err2str(ret));
+ return ret;
}
- }
+ fgt->next_in = fg->nb_inputs;
- /* Reap all buffers present in the buffer sinks */
- for (int i = 0; i < fg->nb_outputs; i++) {
- OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
- int ret = 0;
+ // return after one iteration, so that scheduler can rate-control us
+ if (did_step && fgp->have_sources)
+ return 0;
- while (!ret) {
- ret = fg_output_step(ofp, fgt, frame, buffer);
- if (ret < 0)
- return ret;
+ /* Reap all buffers present in the buffer sinks */
+ for (int i = 0; i < fg->nb_outputs; i++) {
+ OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+ ret = 0;
+ while (!ret) {
+ ret = fg_output_step(ofp, fgt, frame);
+ if (ret < 0)
+ return ret;
+ }
}
- }
+ did_step = 1;
+ };
- return 0;
+ return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
}
static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
@@ -2571,6 +2526,9 @@ static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
int ret;
+ if (fgt->eof_in[ifp->index])
+ return 0;
+
fgt->eof_in[ifp->index] = 1;
if (ifp->filter) {
@@ -2672,7 +2630,7 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
return ret;
}
- ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
+ ret = fg->graph ? read_frames(fg, fgt, tmp) : 0;
av_frame_free(&tmp);
if (ret < 0)
return ret;
@@ -2705,82 +2663,6 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
return 0;
}
-static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
- AVFrame *frame)
-{
- const enum FrameOpaque msg = (intptr_t)frame->opaque;
- FilterGraph *fg = &fgp->fg;
- int graph_eof = 0;
- int ret;
-
- frame->opaque = NULL;
- av_assert0(msg > 0);
- av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
-
- if (!fg->graph) {
- // graph not configured yet, ignore all messages other than choosing
- // the input to read from
- if (msg != FRAME_OPAQUE_CHOOSE_INPUT) {
- av_frame_unref(frame);
- goto done;
- }
-
- for (int i = 0; i < fg->nb_inputs; i++) {
- InputFilter *ifilter = fg->inputs[i];
- InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- if (ifp->format < 0 && !fgt->eof_in[i]) {
- frame->opaque = (void*)(intptr_t)(i + 1);
- goto done;
- }
- }
-
- // This state - graph is not configured, but all inputs are either
- // initialized or EOF - should be unreachable because sending EOF to a
- // filter without even a fallback format should fail
- av_assert0(0);
- return AVERROR_BUG;
- }
-
- if (msg == FRAME_OPAQUE_SEND_COMMAND) {
- FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
- send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
- av_frame_unref(frame);
- goto done;
- }
-
- if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
- ret = avfilter_graph_request_oldest(fg->graph);
-
- graph_eof = ret == AVERROR_EOF;
-
- if (ret == AVERROR(EAGAIN)) {
- frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
- goto done;
- } else if (ret < 0 && !graph_eof)
- return ret;
- }
-
- ret = read_frames(fg, fgt, frame, 0);
- if (ret < 0) {
- av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n");
- return ret;
- }
-
- if (graph_eof)
- return AVERROR_EOF;
-
- // signal to the main thread that we are done processing the message
-done:
- ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
- return ret;
- }
-
- return 0;
-}
-
static void fg_thread_set_name(const FilterGraph *fg)
{
char name[16];
@@ -2867,282 +2749,86 @@ static void *filter_thread(void *arg)
InputFilter *ifilter;
InputFilterPriv *ifp;
enum FrameOpaque o;
- int input_idx, eof_frame;
+ unsigned input_idx = fgt.next_in;
- input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
- if (input_idx < 0 ||
- (input_idx == fg->nb_inputs && input_status < 0)) {
+ input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
+ &input_idx, fgt.frame);
+ if (input_status == AVERROR_EOF) {
av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
break;
+ } else if (input_status == AVERROR(EAGAIN)) {
+ // should only happen when we didn't request any input
+ av_assert0(input_idx == fg->nb_inputs);
+ goto read_frames;
}
+ av_assert0(input_status >= 0);
+
+ o = (intptr_t)fgt.frame->opaque;
o = (intptr_t)fgt.frame->opaque;
// message on the control stream
if (input_idx == fg->nb_inputs) {
- ret = msg_process(fgp, &fgt, fgt.frame);
- if (ret < 0)
- goto finish;
+ FilterCommand *fc;
+
+ av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && fgt.frame->buf[0]);
+ fc = (FilterCommand*)fgt.frame->buf[0]->data;
+ send_command(fg, fc->time, fc->target, fc->command, fc->arg,
+ fc->all_filters);
+ av_frame_unref(fgt.frame);
continue;
}
// we received an input frame or EOF
ifilter = fg->inputs[input_idx];
ifp = ifp_from_ifilter(ifilter);
- eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
+
if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT;
ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL);
- } else if (input_status >= 0 && fgt.frame->buf[0]) {
+ } else if (fgt.frame->buf[0]) {
ret = send_frame(fg, &fgt, ifilter, fgt.frame);
} else {
- int64_t pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE;
- AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 };
- ret = send_eof(&fgt, ifilter, pts, tb);
+ av_assert1(o == FRAME_OPAQUE_EOF);
+ ret = send_eof(&fgt, ifilter, fgt.frame->pts, fgt.frame->time_base);
}
av_frame_unref(fgt.frame);
if (ret < 0)
- break;
-
- if (eof_frame) {
- // an EOF frame is immediately followed by sender closing
- // the corresponding stream, so retrieve that event
- input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
- av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
- }
-
- // signal to the main thread that we are done
- ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
- if (ret < 0) {
- if (ret == AVERROR_EOF)
- break;
-
- av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
goto finish;
- }
- }
-
-finish:
- // EOF is normal termination
- if (ret == AVERROR_EOF)
- ret = 0;
-
- for (int i = 0; i <= fg->nb_inputs; i++)
- tq_receive_finish(fgp->queue_in, i);
- for (int i = 0; i <= fg->nb_outputs; i++)
- tq_send_finish(fgp->queue_out, i);
-
- fg_thread_uninit(&fgt);
-
- av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
-
- return (void*)(intptr_t)ret;
-}
-
-static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
- AVFrame *frame, enum FrameOpaque type)
-{
- InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- int output_idx, ret;
-
- if (ifp->eof) {
- av_frame_unref(frame);
- return AVERROR_EOF;
- }
-
- frame->opaque = (void*)(intptr_t)type;
-
- ret = tq_send(fgp->queue_in, ifp->index, frame);
- if (ret < 0) {
- ifp->eof = 1;
- av_frame_unref(frame);
- return ret;
- }
-
- if (type == FRAME_OPAQUE_EOF)
- tq_send_finish(fgp->queue_in, ifp->index);
-
- // wait for the frame to be processed
- ret = tq_receive(fgp->queue_out, &output_idx, frame);
- av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
-
- return ret;
-}
-
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
- int ret;
- if (keep_reference) {
- ret = av_frame_ref(fgp->frame, frame);
- if (ret < 0)
- return ret;
- } else
- av_frame_move_ref(fgp->frame, frame);
-
- return thread_send_frame(fgp, ifilter, fgp->frame, 0);
-}
-
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
- int ret;
-
- fgp->frame->pts = pts;
- fgp->frame->time_base = tb;
-
- ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
-
- return ret == AVERROR_EOF ? 0 : ret;
-}
-
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-
- fgp->frame->pts = pts;
- fgp->frame->time_base = tb;
-
- thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
-}
-
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
-{
- FilterGraphPriv *fgp = fgp_from_fg(graph);
- int ret, got_frames = 0;
-
- if (fgp->eof_in)
- return AVERROR_EOF;
-
- // signal to the filtering thread to return all frames it can
- av_assert0(!fgp->frame->buf[0]);
- fgp->frame->opaque = (void*)(intptr_t)(best_ist ?
- FRAME_OPAQUE_CHOOSE_INPUT :
- FRAME_OPAQUE_REAP_FILTERS);
-
- ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
- if (ret < 0) {
- fgp->eof_in = 1;
- goto finish;
- }
-
- while (1) {
- OutputFilter *ofilter;
- OutputFilterPriv *ofp;
- OutputStream *ost;
- int output_idx;
-
- ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-
- // EOF on the whole queue or the control stream
- if (output_idx < 0 ||
- (ret < 0 && output_idx == graph->nb_outputs))
- goto finish;
-
- // EOF for a specific stream
- if (ret < 0) {
- ofilter = graph->outputs[output_idx];
- ofp = ofp_from_ofilter(ofilter);
-
- // we are finished and no frames were ever seen at this output,
- // at least initialize the encoder with a dummy frame
- if (!ofp->got_frame) {
- AVFrame *frame = fgp->frame;
- FrameData *fd;
-
- frame->time_base = ofp->tb_out;
- frame->format = ofp->format;
-
- frame->width = ofp->width;
- frame->height = ofp->height;
- frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
-
- frame->sample_rate = ofp->sample_rate;
- if (ofp->ch_layout.nb_channels) {
- ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
- if (ret < 0)
- return ret;
- }
-
- fd = frame_data(frame);
- if (!fd)
- return AVERROR(ENOMEM);
-
- fd->frame_rate_filter = ofp->fps.framerate;
-
- av_assert0(!frame->buf[0]);
-
- av_log(ofilter->ost, AV_LOG_WARNING,
- "No filtered frames for output stream, trying to "
- "initialize anyway.\n");
-
- enc_open(ofilter->ost, frame);
- av_frame_unref(frame);
- }
-
- close_output_stream(graph->outputs[output_idx]->ost);
- continue;
- }
-
- // request was fully processed by the filtering thread,
- // return the input stream to read from, if needed
- if (output_idx == graph->nb_outputs) {
- int input_idx = (intptr_t)fgp->frame->opaque - 1;
- av_assert0(input_idx <= graph->nb_inputs);
-
- if (best_ist) {
- *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
- ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
-
- if (input_idx < 0 && !got_frames) {
- for (int i = 0; i < graph->nb_outputs; i++)
- graph->outputs[i]->ost->unavailable = 1;
- }
- }
+read_frames:
+ // retrieve all newly avalable frames
+ ret = read_frames(fg, &fgt, fgt.frame);
+ if (ret == AVERROR_EOF) {
+ av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
break;
+ } else if (ret < 0) {
+ av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n",
+ av_err2str(ret));
+ goto finish;
}
+ }
- // got a frame from the filtering thread, send it for encoding
- ofilter = graph->outputs[output_idx];
- ost = ofilter->ost;
- ofp = ofp_from_ofilter(ofilter);
+ for (unsigned i = 0; i < fg->nb_outputs; i++) {
+ OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
- if (ost->finished) {
- av_frame_unref(fgp->frame);
- tq_receive_finish(fgp->queue_out, output_idx);
+ if (fgt.eof_out[i])
continue;
- }
-
- if (fgp->frame->pts != AV_NOPTS_VALUE) {
- ofilter->last_pts = av_rescale_q(fgp->frame->pts,
- fgp->frame->time_base,
- AV_TIME_BASE_Q);
- }
- ret = enc_frame(ost, fgp->frame);
- av_frame_unref(fgp->frame);
+ ret = fg_output_frame(ofp, &fgt, NULL);
if (ret < 0)
goto finish;
-
- ofp->got_frame = 1;
- got_frames = 1;
}
finish:
- if (ret < 0) {
- fgp->eof_in = 1;
- for (int i = 0; i < graph->nb_outputs; i++)
- close_output_stream(graph->outputs[i]->ost);
- }
+ // EOF is normal termination
+ if (ret == AVERROR_EOF)
+ ret = 0;
- return ret;
-}
+ fg_thread_uninit(&fgt);
-int reap_filters(FilterGraph *fg, int flush)
-{
- return fg_transcode_step(fg, NULL);
+ return (void*)(intptr_t)ret;
}
void fg_send_command(FilterGraph *fg, double time, const char *target,
@@ -3151,10 +2837,6 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
FilterGraphPriv *fgp = fgp_from_fg(fg);
AVBufferRef *buf;
FilterCommand *fc;
- int output_idx, ret;
-
- if (!fgp->queue_in)
- return;
fc = av_mallocz(sizeof(*fc));
if (!fc)
@@ -3180,13 +2862,5 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
fgp->frame->buf[0] = buf;
fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
- ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame);
- if (ret < 0) {
- av_frame_unref(fgp->frame);
- return;
- }
-
- // wait for the frame to be processed
- ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
- av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+ sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame);
}
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index ef5c2f60e0..067dc65d4e 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -23,16 +23,13 @@
#include "ffmpeg.h"
#include "ffmpeg_mux.h"
#include "ffmpeg_utils.h"
-#include "objpool.h"
#include "sync_queue.h"
-#include "thread_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
#include "libavutil/log.h"
#include "libavutil/mem.h"
#include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
#include "libavcodec/packet.h"
@@ -41,10 +38,9 @@
typedef struct MuxThreadContext {
AVPacket *pkt;
+ AVPacket *fix_sub_duration_pkt;
} MuxThreadContext;
-int want_sdp = 1;
-
static Muxer *mux_from_of(OutputFile *of)
{
return (Muxer*)of;
@@ -207,14 +203,41 @@ static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
return 0;
}
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt);
+
/* apply the output bitstream filters */
-static int mux_packet_filter(Muxer *mux, OutputStream *ost,
- AVPacket *pkt, int *stream_eof)
+static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt,
+ OutputStream *ost, AVPacket *pkt, int *stream_eof)
{
MuxStream *ms = ms_from_ost(ost);
const char *err_msg;
int ret = 0;
+ if (pkt && !ost->enc) {
+ ret = of_streamcopy(ost, pkt);
+ if (ret == AVERROR(EAGAIN))
+ return 0;
+ else if (ret == AVERROR_EOF) {
+ av_packet_unref(pkt);
+ pkt = NULL;
+ ret = 0;
+ } else if (ret < 0)
+ goto fail;
+ }
+
+ // emit heartbeat for -fix_sub_duration;
+ // we are only interested in heartbeats on on random access points.
+ if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) {
+ mt->fix_sub_duration_pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION;
+ mt->fix_sub_duration_pkt->pts = pkt->pts;
+ mt->fix_sub_duration_pkt->time_base = pkt->time_base;
+
+ ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx,
+ mt->fix_sub_duration_pkt);
+ if (ret < 0)
+ goto fail;
+ }
+
if (ms->bsf_ctx) {
int bsf_eof = 0;
@@ -278,6 +301,7 @@ static void thread_set_name(OutputFile *of)
static void mux_thread_uninit(MuxThreadContext *mt)
{
av_packet_free(&mt->pkt);
+ av_packet_free(&mt->fix_sub_duration_pkt);
memset(mt, 0, sizeof(*mt));
}
@@ -290,6 +314,10 @@ static int mux_thread_init(MuxThreadContext *mt)
if (!mt->pkt)
goto fail;
+ mt->fix_sub_duration_pkt = av_packet_alloc();
+ if (!mt->fix_sub_duration_pkt)
+ goto fail;
+
return 0;
fail:
@@ -316,19 +344,22 @@ void *muxer_thread(void *arg)
OutputStream *ost;
int stream_idx, stream_eof = 0;
- ret = tq_receive(mux->tq, &stream_idx, mt.pkt);
+ ret = sch_mux_receive(mux->sch, of->index, mt.pkt);
+ stream_idx = mt.pkt->stream_index;
if (stream_idx < 0) {
av_log(mux, AV_LOG_VERBOSE, "All streams finished\n");
ret = 0;
break;
}
- ost = of->streams[stream_idx];
- ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
+ ost = of->streams[mux->sch_stream_idx[stream_idx]];
+ mt.pkt->stream_index = ost->index;
+
+ ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
av_packet_unref(mt.pkt);
if (ret == AVERROR_EOF) {
if (stream_eof) {
- tq_receive_finish(mux->tq, stream_idx);
+ sch_mux_receive_finish(mux->sch, of->index, stream_idx);
} else {
av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n");
ret = 0;
@@ -343,243 +374,55 @@ void *muxer_thread(void *arg)
finish:
mux_thread_uninit(&mt);
- for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
- tq_receive_finish(mux->tq, i);
-
- av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n");
-
return (void*)(intptr_t)ret;
}
-static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
-{
- int ret = 0;
-
- if (!pkt || ost->finished & MUXER_FINISHED)
- goto finish;
-
- ret = tq_send(mux->tq, ost->index, pkt);
- if (ret < 0)
- goto finish;
-
- return 0;
-
-finish:
- if (pkt)
- av_packet_unref(pkt);
-
- ost->finished |= MUXER_FINISHED;
- tq_send_finish(mux->tq, ost->index);
- return ret == AVERROR_EOF ? 0 : ret;
-}
-
-static int queue_packet(OutputStream *ost, AVPacket *pkt)
-{
- MuxStream *ms = ms_from_ost(ost);
- AVPacket *tmp_pkt = NULL;
- int ret;
-
- if (!av_fifo_can_write(ms->muxing_queue)) {
- size_t cur_size = av_fifo_can_read(ms->muxing_queue);
- size_t pkt_size = pkt ? pkt->size : 0;
- unsigned int are_we_over_size =
- (ms->muxing_queue_data_size + pkt_size) > ms->muxing_queue_data_threshold;
- size_t limit = are_we_over_size ? ms->max_muxing_queue_size : SIZE_MAX;
- size_t new_size = FFMIN(2 * cur_size, limit);
-
- if (new_size <= cur_size) {
- av_log(ost, AV_LOG_ERROR,
- "Too many packets buffered for output stream %d:%d.\n",
- ost->file_index, ost->st->index);
- return AVERROR(ENOSPC);
- }
- ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size);
- if (ret < 0)
- return ret;
- }
-
- if (pkt) {
- ret = av_packet_make_refcounted(pkt);
- if (ret < 0)
- return ret;
-
- tmp_pkt = av_packet_alloc();
- if (!tmp_pkt)
- return AVERROR(ENOMEM);
-
- av_packet_move_ref(tmp_pkt, pkt);
- ms->muxing_queue_data_size += tmp_pkt->size;
- }
- av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
-
- return 0;
-}
-
-static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost)
-{
- int ret;
-
- if (mux->tq) {
- return thread_submit_packet(mux, ost, pkt);
- } else {
- /* the muxer is not initialized yet, buffer the packet */
- ret = queue_packet(ost, pkt);
- if (ret < 0) {
- if (pkt)
- av_packet_unref(pkt);
- return ret;
- }
- }
-
- return 0;
-}
-
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
-{
- Muxer *mux = mux_from_of(of);
- int ret = 0;
-
- if (pkt && pkt->dts != AV_NOPTS_VALUE)
- ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q);
-
- ret = submit_packet(mux, pkt, ost);
- if (ret < 0) {
- av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the muxer: %s",
- av_err2str(ret));
- return ret;
- }
-
- return 0;
-}
-
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt)
{
OutputFile *of = output_files[ost->file_index];
MuxStream *ms = ms_from_ost(ost);
+ DemuxPktData *pd = pkt->opaque_ref ? (DemuxPktData*)pkt->opaque_ref->data : NULL;
+ int64_t dts = pd ? pd->dts_est : AV_NOPTS_VALUE;
int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time;
int64_t ts_offset;
- AVPacket *opkt = ms->pkt;
- int ret;
-
- av_packet_unref(opkt);
if (of->recording_time != INT64_MAX &&
dts >= of->recording_time + start_time)
- pkt = NULL;
-
- // EOF: flush output bitstream filters.
- if (!pkt)
- return of_output_packet(of, ost, NULL);
+ return AVERROR_EOF;
if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) &&
!ms->copy_initial_nonkeyframes)
- return 0;
+ return AVERROR(EAGAIN);
if (!ms->streamcopy_started) {
if (!ms->copy_prior_start &&
(pkt->pts == AV_NOPTS_VALUE ?
dts < ms->ts_copy_start :
pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base)))
- return 0;
+ return AVERROR(EAGAIN);
if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time)
- return 0;
+ return AVERROR(EAGAIN);
}
- ret = av_packet_ref(opkt, pkt);
- if (ret < 0)
- return ret;
-
- ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base);
+ ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base);
if (pkt->pts != AV_NOPTS_VALUE)
- opkt->pts -= ts_offset;
+ pkt->pts -= ts_offset;
if (pkt->dts == AV_NOPTS_VALUE) {
- opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base);
+ pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base);
} else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
- opkt->pts = opkt->dts - ts_offset;
+ pkt->pts = pkt->dts - ts_offset;
}
- opkt->dts -= ts_offset;
- {
- int ret = trigger_fix_sub_duration_heartbeat(ost, pkt);
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Subtitle heartbeat logic failed in %s! (%s)\n",
- __func__, av_err2str(ret));
- return ret;
- }
- }
-
- ret = of_output_packet(of, ost, opkt);
- if (ret < 0)
- return ret;
+ pkt->dts -= ts_offset;
ms->streamcopy_started = 1;
return 0;
}
-static int thread_stop(Muxer *mux)
-{
- void *ret;
-
- if (!mux || !mux->tq)
- return 0;
-
- for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
- tq_send_finish(mux->tq, i);
-
- pthread_join(mux->thread, &ret);
-
- tq_free(&mux->tq);
-
- return (int)(intptr_t)ret;
-}
-
-static int thread_start(Muxer *mux)
-{
- AVFormatContext *fc = mux->fc;
- ObjPool *op;
- int ret;
-
- op = objpool_alloc_packets();
- if (!op)
- return AVERROR(ENOMEM);
-
- mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
- if (!mux->tq) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux);
- if (ret) {
- tq_free(&mux->tq);
- return AVERROR(ret);
- }
-
- /* flush the muxing queues */
- for (int i = 0; i < fc->nb_streams; i++) {
- OutputStream *ost = mux->of.streams[i];
- MuxStream *ms = ms_from_ost(ost);
- AVPacket *pkt;
-
- while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
- ret = thread_submit_packet(mux, ost, pkt);
- if (pkt) {
- ms->muxing_queue_data_size -= pkt->size;
- av_packet_free(&pkt);
- }
- if (ret < 0)
- return ret;
- }
- }
-
- return 0;
-}
-
int print_sdp(const char *filename);
int print_sdp(const char *filename)
@@ -590,11 +433,6 @@ int print_sdp(const char *filename)
AVIOContext *sdp_pb;
AVFormatContext **avc;
- for (i = 0; i < nb_output_files; i++) {
- if (!mux_from_of(output_files[i])->header_written)
- return 0;
- }
-
avc = av_malloc_array(nb_output_files, sizeof(*avc));
if (!avc)
return AVERROR(ENOMEM);
@@ -629,25 +467,17 @@ int print_sdp(const char *filename)
avio_closep(&sdp_pb);
}
- // SDP successfully written, allow muxer threads to start
- ret = 1;
-
fail:
av_freep(&avc);
return ret;
}
-int mux_check_init(Muxer *mux)
+int mux_check_init(void *arg)
{
+ Muxer *mux = arg;
OutputFile *of = &mux->of;
AVFormatContext *fc = mux->fc;
- int ret, i;
-
- for (i = 0; i < fc->nb_streams; i++) {
- OutputStream *ost = of->streams[i];
- if (!ost->initialized)
- return 0;
- }
+ int ret;
ret = avformat_write_header(fc, &mux->opts);
if (ret < 0) {
@@ -659,27 +489,7 @@ int mux_check_init(Muxer *mux)
mux->header_written = 1;
av_dump_format(fc, of->index, fc->url, 1);
- nb_output_dumped++;
-
- if (sdp_filename || want_sdp) {
- ret = print_sdp(sdp_filename);
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
- return ret;
- } else if (ret == 1) {
- /* SDP is written only after all the muxers are ready, so now we
- * start ALL the threads */
- for (i = 0; i < nb_output_files; i++) {
- ret = thread_start(mux_from_of(output_files[i]));
- if (ret < 0)
- return ret;
- }
- }
- } else {
- ret = thread_start(mux_from_of(of));
- if (ret < 0)
- return ret;
- }
+ atomic_fetch_add(&nb_output_dumped, 1);
return 0;
}
@@ -736,9 +546,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost)
ost->st->time_base);
}
- ost->initialized = 1;
+ if (ms->sch_idx >= 0)
+ return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx);
- return mux_check_init(mux);
+ return 0;
}
static int check_written(OutputFile *of)
@@ -852,15 +663,13 @@ int of_write_trailer(OutputFile *of)
AVFormatContext *fc = mux->fc;
int ret, mux_result = 0;
- if (!mux->tq) {
+ if (!mux->header_written) {
av_log(mux, AV_LOG_ERROR,
"Nothing was written into output file, because "
"at least one of its streams received no packets.\n");
return AVERROR(EINVAL);
}
- mux_result = thread_stop(mux);
-
ret = av_write_trailer(fc);
if (ret < 0) {
av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret));
@@ -905,13 +714,6 @@ static void ost_free(OutputStream **post)
ost->logfile = NULL;
}
- if (ms->muxing_queue) {
- AVPacket *pkt;
- while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0)
- av_packet_free(&pkt);
- av_fifo_freep2(&ms->muxing_queue);
- }
-
avcodec_parameters_free(&ost->par_in);
av_bsf_free(&ms->bsf_ctx);
@@ -976,8 +778,6 @@ void of_free(OutputFile **pof)
return;
mux = mux_from_of(of);
- thread_stop(mux);
-
sq_free(&of->sq_encode);
sq_free(&mux->sq_mux);
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index eee2b2cb07..5d7cf3fa76 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -25,7 +25,6 @@
#include <stdint.h>
#include "ffmpeg_sched.h"
-#include "thread_queue.h"
#include "libavformat/avformat.h"
@@ -33,7 +32,6 @@
#include "libavutil/dict.h"
#include "libavutil/fifo.h"
-#include "libavutil/thread.h"
typedef struct MuxStream {
OutputStream ost;
@@ -41,9 +39,6 @@ typedef struct MuxStream {
// name used for logging
char log_name[32];
- /* the packets are buffered here until the muxer is ready to be initialized */
- AVFifo *muxing_queue;
-
AVBSFContext *bsf_ctx;
AVPacket *bsf_pkt;
@@ -57,17 +52,6 @@ typedef struct MuxStream {
int64_t max_frames;
- /*
- * The size of the AVPackets' buffers in queue.
- * Updated when a packet is either pushed or pulled from the queue.
- */
- size_t muxing_queue_data_size;
-
- int max_muxing_queue_size;
-
- /* Threshold after which max_muxing_queue_size will be in effect */
- size_t muxing_queue_data_threshold;
-
// timestamp from which the streamcopied streams should start,
// in AV_TIME_BASE_Q;
// everything before it should be discarded
@@ -106,9 +90,6 @@ typedef struct Muxer {
int *sch_stream_idx;
int nb_sch_stream_idx;
- pthread_t thread;
- ThreadQueue *tq;
-
AVDictionary *opts;
int thread_queue_size;
@@ -122,10 +103,7 @@ typedef struct Muxer {
AVPacket *sq_pkt;
} Muxer;
-/* whether we want to print an SDP, set in of_open() */
-extern int want_sdp;
-
-int mux_check_init(Muxer *mux);
+int mux_check_init(void *arg);
static MuxStream *ms_from_ost(OutputStream *ost)
{
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 534b4379c7..6459296ab0 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -924,13 +924,6 @@ static int new_stream_audio(Muxer *mux, const OptionsContext *o,
return 0;
}
-static int new_stream_attachment(Muxer *mux, const OptionsContext *o,
- OutputStream *ost)
-{
- ost->finished = 1;
- return 0;
-}
-
static int new_stream_subtitle(Muxer *mux, const OptionsContext *o,
OutputStream *ost)
{
@@ -1168,9 +1161,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (!ost->par_in)
return AVERROR(ENOMEM);
- ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
- if (!ms->muxing_queue)
- return AVERROR(ENOMEM);
ms->last_mux_dts = AV_NOPTS_VALUE;
ost->st = st;
@@ -1190,7 +1180,8 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (!ost->enc_ctx)
return AVERROR(ENOMEM);
- ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
+ ret = sch_add_enc(mux->sch, encoder_thread, ost,
+ ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL : enc_open);
if (ret < 0)
return ret;
ms->sch_idx_enc = ret;
@@ -1414,9 +1405,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx,
max_muxing_queue_size, muxing_queue_data_threshold);
-
- ms->max_muxing_queue_size = max_muxing_queue_size;
- ms->muxing_queue_data_threshold = muxing_queue_data_threshold;
}
MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample,
@@ -1434,8 +1422,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (ost->enc_ctx && av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24)
av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0);
- ost->last_mux_dts = AV_NOPTS_VALUE;
-
MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i,
ms->copy_initial_nonkeyframes, oc, st);
@@ -1443,7 +1429,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
case AVMEDIA_TYPE_VIDEO: ret = new_stream_video (mux, o, ost); break;
case AVMEDIA_TYPE_AUDIO: ret = new_stream_audio (mux, o, ost); break;
case AVMEDIA_TYPE_SUBTITLE: ret = new_stream_subtitle (mux, o, ost); break;
- case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o, ost); break;
}
if (ret < 0)
return ret;
@@ -1938,7 +1923,6 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
MuxStream *ms = ms_from_ost(ost);
enum AVMediaType type = ost->type;
- ost->sq_idx_encode = -1;
ost->sq_idx_mux = -1;
nb_interleaved += IS_INTERLEAVED(type);
@@ -1961,11 +1945,17 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
* - at least one encoded audio/video stream is frame-limited, since
* that has similar semantics to 'shortest'
* - at least one audio encoder requires constant frame sizes
+ *
+ * Note that encoding sync queues are handled in the scheduler, because
+ * different encoders run in different threads and need external
+ * synchronization, while muxer sync queues can be handled inside the muxer
*/
if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) {
- of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux);
- if (!of->sq_encode)
- return AVERROR(ENOMEM);
+ int sq_idx, ret;
+
+ sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux);
+ if (sq_idx < 0)
+ return sq_idx;
for (int i = 0; i < oc->nb_streams; i++) {
OutputStream *ost = of->streams[i];
@@ -1975,13 +1965,11 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
if (!IS_AV_ENC(ost, type))
continue;
- ost->sq_idx_encode = sq_add_stream(of->sq_encode,
- of->shortest || ms->max_frames < INT64_MAX);
- if (ost->sq_idx_encode < 0)
- return ost->sq_idx_encode;
-
- if (ms->max_frames != INT64_MAX)
- sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames);
+ ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc,
+ of->shortest || ms->max_frames < INT64_MAX,
+ ms->max_frames);
+ if (ret < 0)
+ return ret;
}
}
@@ -2652,23 +2640,6 @@ static int validate_enc_avopt(Muxer *mux, const AVDictionary *codec_avopt)
return 0;
}
-static int init_output_stream_nofilter(OutputStream *ost)
-{
- int ret = 0;
-
- if (ost->enc_ctx) {
- ret = enc_open(ost, NULL);
- if (ret < 0)
- return ret;
- } else {
- ret = of_stream_init(output_files[ost->file_index], ost);
- if (ret < 0)
- return ret;
- }
-
- return ret;
-}
-
static const char *output_file_item_name(void *obj)
{
const Muxer *mux = obj;
@@ -2751,8 +2722,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
av_strlcat(mux->log_name, "/", sizeof(mux->log_name));
av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name));
- if (strcmp(oc->oformat->name, "rtp"))
- want_sdp = 0;
of->format = oc->oformat;
if (recording_time != INT64_MAX)
@@ -2768,7 +2737,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
AVFMT_FLAG_BITEXACT);
}
- err = sch_add_mux(sch, muxer_thread, NULL, mux,
+ err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
!strcmp(oc->oformat->name, "rtp"));
if (err < 0)
return err;
@@ -2854,26 +2823,15 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
of->url = filename;
- /* initialize stream copy and subtitle/data streams.
- * Encoded AVFrame based streams will get initialized when the first AVFrame
- * is received in do_video_out
- */
+ /* initialize streamcopy streams. */
for (int i = 0; i < of->nb_streams; i++) {
OutputStream *ost = of->streams[i];
- if (ost->filter)
- continue;
-
- err = init_output_stream_nofilter(ost);
- if (err < 0)
- return err;
- }
-
- /* write the header for files with no streams */
- if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) {
- int ret = mux_check_init(mux);
- if (ret < 0)
- return ret;
+ if (!ost->enc) {
+ err = of_stream_init(of, ost);
+ if (err < 0)
+ return err;
+ }
}
return 0;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index d463306546..6177a96a4e 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[] = {"top", NULL};
HWDevice *filter_hw_device;
char *vstats_filename;
-char *sdp_filename;
float audio_drift_threshold = 0.1;
float dts_delta_threshold = 10;
@@ -580,9 +579,8 @@ fail:
static int opt_sdp_file(void *optctx, const char *opt, const char *arg)
{
- av_free(sdp_filename);
- sdp_filename = av_strdup(arg);
- return 0;
+ Scheduler *sch = optctx;
+ return sch_sdp_filename(sch, arg);
}
#if CONFIG_VAAPI
diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
index 957a410921..bc9b833799 100644
--- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
+++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
@@ -1,48 +1,40 @@
1
-00:00:00,968 --> 00:00:01,001
+00:00:00,968 --> 00:00:01,168
<font face="Monospace">{\an7}(</font>
2
-00:00:01,001 --> 00:00:01,168
-<font face="Monospace">{\an7}(</font>
-
-3
00:00:01,168 --> 00:00:01,368
<font face="Monospace">{\an7}(<i> inaudibl</i></font>
-4
+3
00:00:01,368 --> 00:00:01,568
<font face="Monospace">{\an7}(<i> inaudible radio chat</i></font>
-5
+4
00:00:01,568 --> 00:00:02,002
<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
-6
-00:00:02,002 --> 00:00:03,003
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
-
-7
-00:00:03,003 --> 00:00:03,103
+5
+00:00:02,002 --> 00:00:03,103
<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
-8
+6
00:00:03,103 --> 00:00:03,303
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>></font>
-9
+7
00:00:03,303 --> 00:00:03,503
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>> Safety rema</font>
-10
+8
00:00:03,504 --> 00:00:03,704
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>> Safety remains our numb</font>
-11
+9
00:00:03,704 --> 00:00:04,004
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>> Safety remains our number one</font>