/* * FIFO pseudo-muxer * Copyright (c) 2016 Jan Sebechlebsky * * This file is part of FFmpeg. * * FFmpeg is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * FFmpeg is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with FFmpeg; if not, write to the Free Software * Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include <stdatomic.h> #include "libavutil/avassert.h" #include "libavutil/opt.h" #include "libavutil/time.h" #include "libavutil/thread.h" #include "libavutil/threadmessage.h" #include "avformat.h" #include "internal.h" #include "mux.h" #define FIFO_DEFAULT_QUEUE_SIZE 60 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds typedef struct FifoContext { const AVClass *class; AVFormatContext *avf; char *format; AVDictionary *format_options; int queue_size; AVThreadMessageQueue *queue; pthread_t writer_thread; /* Return value of last write_trailer_call */ int write_trailer_ret; /* Time to wait before next recovery attempt * This can refer to the time in processed stream, * or real time. */ int64_t recovery_wait_time; /* Maximal number of unsuccessful successive recovery attempts */ int max_recovery_attempts; /* Whether to attempt recovery from failure */ int attempt_recovery; /* If >0 stream time will be used when waiting * for the recovery attempt instead of real time */ int recovery_wait_streamtime; /* If >0 recovery will be attempted regardless of error code * (except AVERROR_EXIT, so exit request is never ignored) */ int recover_any_error; /* Whether to drop packets in case the queue is full. */ int drop_pkts_on_overflow; /* Whether to wait for keyframe when recovering * from failure or queue overflow */ int restart_with_keyframe; pthread_mutex_t overflow_flag_lock; int overflow_flag_lock_initialized; /* Value > 0 signals queue overflow */ volatile uint8_t overflow_flag; atomic_int_least64_t queue_duration; int64_t last_sent_dts; int64_t timeshift; } FifoContext; typedef struct FifoThreadContext { AVFormatContext *avf; /* Timestamp of last failure. * This is either pts in case stream time is used, * or microseconds as returned by av_getttime_relative() */ int64_t last_recovery_ts; /* Number of current recovery process * Value > 0 means we are in recovery process */ int recovery_nr; /* If > 0 all frames will be dropped until keyframe is received */ uint8_t drop_until_keyframe; /* Value > 0 means that the previous write_header call was successful * so finalization by calling write_trailer and ff_io_close must be done * before exiting / reinitialization of underlying muxer */ uint8_t header_written; int64_t last_received_dts; } FifoThreadContext; typedef enum FifoMessageType { FIFO_NOOP, FIFO_WRITE_HEADER, FIFO_WRITE_PACKET, FIFO_FLUSH_OUTPUT } FifoMessageType; typedef struct FifoMessage { FifoMessageType type; AVPacket pkt; } FifoMessage; static int fifo_thread_write_header(FifoThreadContext *ctx) { AVFormatContext *avf = ctx->avf; FifoContext *fifo = avf->priv_data; AVFormatContext *avf2 = fifo->avf; AVDictionary *format_options = NULL; int ret, i; ret = av_dict_copy(&format_options, fifo->format_options, 0); if (ret < 0) goto end; ret = ff_format_output_open(avf2, avf->url, &format_options); if (ret < 0) { av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url, av_err2str(ret)); goto end; } for (i = 0;i < avf2->nb_streams; i++) ffstream(avf2->streams[i])->cur_dts = 0; ret = avformat_write_header(avf2, &format_options); if (!ret) ctx->header_written = 1; // Check for options unrecognized by underlying muxer if (format_options) { const AVDictionaryEntry *entry = NULL; while ((entry = av_dict_iterate(format_options, entry))) av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key); ret = AVERROR(EINVAL); } end: av_dict_free(&format_options); return ret; } static int fifo_thread_flush_output(FifoThreadContext *ctx) { AVFormatContext *avf = ctx->avf; FifoContext *fifo = avf->priv_data; AVFormatContext *avf2 = fifo->avf; return av_write_frame(avf2, NULL); } static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts) { AVStream *st = avf->streams[pkt->stream_index]; int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q); int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts); *last_dts = dts; return duration; } static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) { AVFormatContext *avf = ctx->avf; FifoContext *fifo = avf->priv_data; AVFormatContext *avf2 = fifo->avf; AVRational src_tb, dst_tb; int ret, s_idx; int64_t orig_pts, orig_dts, orig_duration; if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed); if (ctx->drop_until_keyframe) { if (pkt->flags & AV_PKT_FLAG_KEY) { ctx->drop_until_keyframe = 0; av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n"); } else { av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n"); av_packet_unref(pkt); return 0; } } orig_pts = pkt->pts; orig_dts = pkt->dts; orig_duration = pkt->duration; s_idx = pkt->stream_index; src_tb = avf->streams[s_idx]->time_base; dst_tb = avf2->streams[s_idx]->time_base; av_packet_rescale_ts(pkt, src_tb, dst_tb); ret = av_write_frame(avf2, pkt); if (ret >= 0) { av_packet_unref(pkt); } else { // avoid scaling twice pkt->pts = orig_pts; pkt->dts = orig_dts; pkt->duration = orig_duration; } return ret; } static int fifo_thread_write_trailer(FifoThreadContext *ctx) { AVFormatContext *avf = ctx->avf; FifoContext *fifo = avf->priv_data; AVFormatContext *avf2 = fifo->avf; int ret; if (!ctx->header_written) return 0; ret = av_write_trailer(avf2); ff_format_io_close(avf2, &avf2->pb); return ret; } static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg) { int ret = AVERROR(EINVAL); if (msg->type == FIFO_NOOP) return 0; if (!ctx->header_written) { ret = fifo_thread_write_header(ctx); if (ret < 0) return ret; } switch(msg->type) { case FIFO_WRITE_HEADER: av_assert0(ret >= 0); return ret; case FIFO_WRITE_PACKET: return fifo_thread_write_packet(ctx, &msg->pkt); case FIFO_FLUSH_OUTPUT: return fifo_thread_flush_output(ctx); } av_assert0(0); return AVERROR(EINVAL); } static int is_recoverable(const FifoContext *fifo, int err_no) { if (!fifo->attempt_recovery) return 0; if (fifo->recover_any_error) return err_no != AVERROR_EXIT; switch (err_no) { case AVERROR(EINVAL): case AVERROR(ENOSYS): case AVERROR_EOF: case AVERROR_EXIT: case AVERROR_PATCHWELCOME: return 0; default: return 1; } } static void free_message(void *msg) { FifoMessage *fifo_msg = msg; if (fifo_msg->type == FIFO_WRITE_PACKET) av_packet_unref(&fifo_msg->pkt); } static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt, int err_no) { AVFormatContext *avf = ctx->avf; FifoContext *fifo = avf->priv_data; int ret; av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n", av_err2str(err_no)); if (fifo->recovery_wait_streamtime) { if (pkt->pts == AV_NOPTS_VALUE) av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation" " timestamp, recovery will be attempted immediately"); ctx->last_recovery_ts = pkt->pts; } else { ctx->last_recovery_ts = av_gettime_relative(); } if (fifo->max_recovery_attempts && ctx->recovery_nr >= fifo->max_recovery_attempts) { av_log(avf, AV_LOG_ERROR, "Maximal number of %d recovery attempts reached.\n", fifo->max_recovery_attempts); ret = err_no; } else { ret = AVERROR(EAGAIN); } return ret; } static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no) { AVFormatContext *avf = ctx->avf; FifoContext *fifo = avf->priv_data; AVPacket *pkt = &msg->pkt; int64_t time_since_recovery; int ret; if (!is_recoverable(fifo, err_no)) { ret = err_no; goto fail; } if (ctx->header_written) { fifo->write_trailer_ret = fifo_thread_write_trailer(ctx); ctx->header_written = 0; } if (!ctx->recovery_nr) { ctx->last_recovery_ts = fifo->recovery_wait_streamtime ? AV_NOPTS_VALUE : 0; } else { if (fifo->recovery_wait_streamtime) { if (ctx->last_recovery_ts == AV_NOPTS_VALUE) { AVRational tb = avf->streams[pkt->stream_index]->time_base; time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts, tb, AV_TIME_BASE_Q); } else { /* Enforce recovery immediately */ time_since_recovery = fifo->recovery_wait_time; } } else { time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts; } if (time_since_recovery < fifo->recovery_wait_time) return AVERROR(EAGAIN); } ctx->recovery_nr++; if (fifo->max_recovery_attempts) { av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n", ctx->recovery_nr, fifo->max_recovery_attempts); } else { av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n", ctx->recovery_nr); } if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow) ctx->drop_until_keyframe = 1; ret = fifo_thread_dispatch_message(ctx, msg); if (ret < 0) { if (is_recoverable(fifo, ret)) { return fifo_thread_process_recovery_failure(ctx, pkt, ret); } else { goto fail; } } else { av_log(avf, AV_LOG_INFO, "Recovery successful\n"); ctx->recovery_nr = 0; } return 0; fail: free_message(msg); return ret; } static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no) { AVFormatContext *avf = ctx->avf; FifoContext *fifo = avf->priv_data; int ret; do { if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) { int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts; int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery); if (time_to_wait) av_usleep(FFMIN(10000, time_to_wait)); } ret = fifo_thread_attempt_recovery(ctx, msg, err_no); } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow); if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) { if (msg->type == FIFO_WRITE_PACKET) av_packet_unref(&msg->pkt); ret = 0; } return ret; } static void *fifo_consumer_thread(void *data) { AVFormatContext *avf = data; FifoContext *fifo = avf->priv_data; AVThreadMessageQueue *queue = fifo->queue; FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}}; int ret; FifoThreadContext fifo_thread_ctx; memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext)); fifo_thread_ctx.avf = avf; fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE; ff_thread_setname("fifo-consumer"); while (1) { uint8_t just_flushed = 0; if (!fifo_thread_ctx.recovery_nr) ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg); if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) { int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret); if (rec_ret < 0) { av_thread_message_queue_set_err_send(queue, rec_ret); break; } } /* If the queue is full at the moment when fifo_write_packet * attempts to insert new message (packet) to the queue, * it sets the fifo->overflow_flag to 1 and drops packet. * Here in consumer thread, the flag is checked and if it is * set, the queue is flushed and flag cleared. */ pthread_mutex_lock(&fifo->overflow_flag_lock); if (fifo->overflow_flag) { av_thread_message_flush(queue); if (fifo->restart_with_keyframe) fifo_thread_ctx.drop_until_keyframe = 1; fifo->overflow_flag = 0; just_flushed = 1; } pthread_mutex_unlock(&fifo->overflow_flag_lock); if (just_flushed) av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n"); if (fifo->timeshift) while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift) av_usleep(10000); ret = av_thread_message_queue_recv(queue, &msg, 0); if (ret < 0) { av_thread_message_queue_set_err_send(queue, ret); break; } } fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx); return NULL; } static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat, const char *filename) { FifoContext *fifo = avf->priv_data; AVFormatContext *avf2; int ret = 0, i; ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename); if (ret < 0) return ret; fifo->avf = avf2; avf2->interrupt_callback = avf->interrupt_callback; avf2->max_delay = avf->max_delay; ret = av_dict_copy(&avf2->metadata, avf->metadata, 0); if (ret < 0) return ret; avf2->opaque = avf->opaque; #if FF_API_AVFORMAT_IO_CLOSE FF_DISABLE_DEPRECATION_WARNINGS avf2->io_close = avf->io_close; FF_ENABLE_DEPRECATION_WARNINGS #endif avf2->io_close2 = avf->io_close2; avf2->io_open = avf->io_open; avf2->flags = avf->flags; for (i = 0; i < avf->nb_streams; ++i) { AVStream *st = ff_stream_clone(avf2, avf->streams[i]); if (!st) return AVERROR(ENOMEM); } return 0; } static int fifo_init(AVFormatContext *avf) { FifoContext *fifo = avf->priv_data; const AVOutputFormat *oformat; int ret = 0; if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) { av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on" " only when drop_pkts_on_overflow is also turned on\n"); return AVERROR(EINVAL); } atomic_init(&fifo->queue_duration, 0); fifo->last_sent_dts = AV_NOPTS_VALUE; oformat = av_guess_format(fifo->format, avf->url, NULL); if (!oformat) { ret = AVERROR_MUXER_NOT_FOUND; return ret; } ret = fifo_mux_init(avf, oformat, avf->url); if (ret < 0) return ret; ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size, sizeof(FifoMessage)); if (ret < 0) return ret; av_thread_message_queue_set_free_func(fifo->queue, free_message); ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL); if (ret < 0) return AVERROR(ret); fifo->overflow_flag_lock_initialized = 1; return 0; } static int fifo_write_header(AVFormatContext *avf) { FifoContext * fifo = avf->priv_data; int ret; ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf); if (ret) { av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n", av_err2str(AVERROR(ret))); ret = AVERROR(ret); } return ret; } static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) { FifoContext *fifo = avf->priv_data; FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT}; int ret; if (pkt) { ret = av_packet_ref(&msg.pkt,pkt); if (ret < 0) return ret; } ret = av_thread_message_queue_send(fifo->queue, &msg, fifo->drop_pkts_on_overflow ? AV_THREAD_MESSAGE_NONBLOCK : 0); if (ret == AVERROR(EAGAIN)) { uint8_t overflow_set = 0; /* Queue is full, set fifo->overflow_flag to 1 * to let consumer thread know the queue should * be flushed. */ pthread_mutex_lock(&fifo->overflow_flag_lock); if (!fifo->overflow_flag) fifo->overflow_flag = overflow_set = 1; pthread_mutex_unlock(&fifo->overflow_flag_lock); if (overflow_set) av_log(avf, AV_LOG_WARNING, "FIFO queue full\n"); ret = 0; goto fail; } else if (ret < 0) { goto fail; } if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE) atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed); return ret; fail: if (pkt) av_packet_unref(&msg.pkt); return ret; } static int fifo_write_trailer(AVFormatContext *avf) { FifoContext *fifo= avf->priv_data; int ret; av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); if (fifo->timeshift) { int64_t now = av_gettime_relative(); int64_t elapsed = 0; FifoMessage msg = {FIFO_NOOP}; do { int64_t delay = av_gettime_relative() - now; if (delay < 0) { // Discontinuity? delay = 10000; now = av_gettime_relative(); } else { now += delay; } atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed); elapsed += delay; if (elapsed > fifo->timeshift) break; av_usleep(10000); ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK); } while (ret >= 0 || ret == AVERROR(EAGAIN)); atomic_store(&fifo->queue_duration, INT64_MAX); } ret = pthread_join(fifo->writer_thread, NULL); if (ret < 0) { av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n", av_err2str(AVERROR(ret))); return AVERROR(ret); } ret = fifo->write_trailer_ret; return ret; } static void fifo_deinit(AVFormatContext *avf) { FifoContext *fifo = avf->priv_data; avformat_free_context(fifo->avf); av_thread_message_queue_free(&fifo->queue); if (fifo->overflow_flag_lock_initialized) pthread_mutex_destroy(&fifo->overflow_flag_lock); } #define OFFSET(x) offsetof(FifoContext, x) static const AVOption options[] = { {"fifo_format", "Target muxer", OFFSET(format), AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, {"queue_size", "Size of fifo queue", OFFSET(queue_size), AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM}, {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options), AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts), AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM}, {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time), AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM}, {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery", OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, {"timeshift", "Delay fifo output", OFFSET(timeshift), AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM}, {NULL}, }; static const AVClass fifo_muxer_class = { .class_name = "Fifo muxer", .item_name = av_default_item_name, .option = options, .version = LIBAVUTIL_VERSION_INT, }; const FFOutputFormat ff_fifo_muxer = { .p.name = "fifo", .p.long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"), .p.priv_class = &fifo_muxer_class, #if FF_API_ALLOW_FLUSH .p.flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE, #else .p.flags = AVFMT_NOFILE | AVFMT_TS_NEGATIVE, #endif .priv_data_size = sizeof(FifoContext), .init = fifo_init, .write_header = fifo_write_header, .write_packet = fifo_write_packet, .write_trailer = fifo_write_trailer, .deinit = fifo_deinit, .flags_internal = FF_FMT_ALLOW_FLUSH, };