/*
 * Inter-thread scheduling/synchronization.
 * Copyright (c) 2023 Anton Khirnov
 *
 * This file is part of FFmpeg.
 *
 * FFmpeg is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * FFmpeg is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with FFmpeg; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 */

#ifndef FFTOOLS_FFMPEG_SCHED_H
#define FFTOOLS_FFMPEG_SCHED_H

#include <stddef.h>
#include <stdint.h>

#include "ffmpeg_utils.h"

/*
 * This file contains the API for the transcode scheduler.
 *
 * Overall architecture of the transcoding process involves instances of the
 * following components:
 * - demuxers, each containing any number of demuxed streams; demuxed packets
 *   belonging to some stream are sent to any number of decoders (transcoding)
 *   and/or muxers (streamcopy);
 * - decoders, which receive encoded packets from some demuxed stream or
 *   encoder, decode them, and send decoded frames to any number of filtergraph
 *   inputs (audio/video) or encoders (subtitles);
 * - filtergraphs, each containing zero or more inputs (0 in case the
 *   filtergraph contains a lavfi source filter), and one or more outputs; the
 *   inputs and outputs need not have matching media types;
 *   each filtergraph input receives decoded frames from some decoder or another
 *   filtergraph output;
 *   filtered frames from each output are sent to some encoder;
 * - encoders, which receive decoded frames from some decoder (subtitles) or
 *   some filtergraph output (audio/video), encode them, and send encoded
 *   packets to any number of muxed streams or decoders;
 * - muxers, each containing any number of muxed streams; each muxed stream
 *   receives encoded packets from some demuxed stream (streamcopy) or some
 *   encoder (transcoding); those packets are interleaved and written out by the
 *   muxer.
 *
 * The structure formed by the above components is a directed acyclic graph
 * (absence of cycles is checked at startup).
 *
 * There must be at least one muxer instance, otherwise the transcode produces
 * no output and is meaningless. Otherwise, in a generic transcoding scenario
 * there may be arbitrary number of instances of any of the above components,
 * interconnected in various ways.
 *
 * The code tries to keep all the output streams across all the muxers in sync
 * (i.e. at the same DTS), which is accomplished by varying the rates at which
 * packets are read from different demuxers and lavfi sources. Note that the
 * degree of control we have over synchronization is fundamentally limited - if
 * some demuxed streams in the same input are interleaved at different rates
 * than that at which they are to be muxed (e.g. because an input file is badly
 * interleaved, or the user changed their speed by mismatching amounts), then
 * there will be increasing amounts of buffering followed by eventual
 * transcoding failure.
 *
 * N.B. 1: there are meaningful transcode scenarios with no demuxers, e.g.
 * - encoding and muxing output from filtergraph(s) that have no inputs;
 * - creating a file that contains nothing but attachments and/or metadata.
 *
 * N.B. 2: a filtergraph output could, in principle, feed multiple encoders, but
 * this is unnecessary because the (a)split filter provides the same
 * functionality.
 *
 * The scheduler, in the above model, is the master object that oversees and
 * facilitates the transcoding process. The basic idea is that all instances
 * of the abovementioned components communicate only with the scheduler and not
 * with each other. The scheduler is then the single place containing the
 * knowledge about the whole transcoding pipeline.
 */

struct AVFrame;
struct AVPacket;

typedef struct Scheduler Scheduler;

enum SchedulerNodeType {
    SCH_NODE_TYPE_NONE = 0,
    SCH_NODE_TYPE_DEMUX,
    SCH_NODE_TYPE_MUX,
    SCH_NODE_TYPE_DEC,
    SCH_NODE_TYPE_ENC,
    SCH_NODE_TYPE_FILTER_IN,
    SCH_NODE_TYPE_FILTER_OUT,
};

typedef struct SchedulerNode {
    enum SchedulerNodeType  type;
    unsigned                idx;
    unsigned                idx_stream;
} SchedulerNode;

typedef int (*SchThreadFunc)(void *arg);

#define SCH_DSTREAM(file, stream)                           \
    (SchedulerNode){ .type = SCH_NODE_TYPE_DEMUX,           \
                     .idx = file, .idx_stream = stream }
#define SCH_MSTREAM(file, stream)                           \
    (SchedulerNode){ .type = SCH_NODE_TYPE_MUX,             \
                     .idx = file, .idx_stream = stream }
#define SCH_DEC(decoder)                                    \
    (SchedulerNode){ .type = SCH_NODE_TYPE_DEC,             \
                    .idx = decoder }
#define SCH_ENC(encoder)                                    \
    (SchedulerNode){ .type = SCH_NODE_TYPE_ENC,             \
                    .idx = encoder }
#define SCH_FILTER_IN(filter, input)                        \
    (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_IN,       \
                     .idx = filter, .idx_stream = input }
#define SCH_FILTER_OUT(filter, output)                      \
    (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_OUT,      \
                     .idx = filter, .idx_stream = output }

Scheduler *sch_alloc(void);
void sch_free(Scheduler **sch);

int sch_start(Scheduler *sch);
int sch_stop(Scheduler *sch, int64_t *finish_ts);

/**
 * Wait until transcoding terminates or the specified timeout elapses.
 *
 * @param timeout_us Amount of time in microseconds after which this function
 *                   will timeout.
 * @param transcode_ts Current transcode timestamp in AV_TIME_BASE_Q, for
 *                     informational purposes only.
 *
 * @retval 0 waiting timed out, transcoding is not finished
 * @retval 1 transcoding is finished
 */
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts);

/**
 * Add a demuxer to the scheduler.
 *
 * @param func Function executed as the demuxer task.
 * @param ctx Demuxer state; will be passed to func and used for logging.
 *
 * @retval ">=0" Index of the newly-created demuxer.
 * @retval "<0"  Error code.
 */
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx);
/**
 * Add a demuxed stream for a previously added demuxer.
 *
 * @param demux_idx index previously returned by sch_add_demux()
 *
 * @retval ">=0" Index of the newly-created demuxed stream.
 * @retval "<0"  Error code.
 */
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx);

/**
 * Add a decoder to the scheduler.
 *
 * @param func Function executed as the decoder task.
 * @param ctx Decoder state; will be passed to func and used for logging.
 * @param send_end_ts The decoder will return an end timestamp after flush packets
 *                    are delivered to it. See documentation for
 *                    sch_dec_receive() for more details.
 *
 * @retval ">=0" Index of the newly-created decoder.
 * @retval "<0"  Error code.
 */
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
                int send_end_ts);

/**
 * Add a filtergraph to the scheduler.
 *
 * @param nb_inputs Number of filtergraph inputs.
 * @param nb_outputs number of filtergraph outputs
 * @param func Function executed as the filtering task.
 * @param ctx Filter state; will be passed to func and used for logging.
 *
 * @retval ">=0" Index of the newly-created filtergraph.
 * @retval "<0"  Error code.
 */
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
                        SchThreadFunc func, void *ctx);

/**
 * Add a muxer to the scheduler.
 *
 * Note that muxer thread startup is more complicated than for other components,
 * because
 * - muxer streams fed by audio/video encoders become initialized dynamically at
 *   runtime, after those encoders receive their first frame and initialize
 *   themselves, followed by calling sch_mux_stream_ready()
 * - the header can be written after all the streams for a muxer are initialized
 * - we may need to write an SDP, which must happen
 *      - AFTER all the headers are written
 *      - BEFORE any packets are written by any muxer
 *      - with all the muxers quiescent
 * To avoid complicated muxer-thread synchronization dances, we postpone
 * starting the muxer threads until after the SDP is written. The sequence of
 * events is then as follows:
 * - After sch_mux_stream_ready() is called for all the streams in a given muxer,
 *   the header for that muxer is written (care is taken that headers for
 *   different muxers are not written concurrently, since they write file
 *   information to stderr). If SDP is not wanted, the muxer thread then starts
 *   and muxing begins.
 * - When SDP _is_ wanted, no muxer threads start until the header for the last
 *   muxer is written. After that, the SDP is written, after which all the muxer
 *   threads are started at once.
 *
 * In order for the above to work, the scheduler needs to be able to invoke
 * just writing the header, which is the reason the init parameter exists.
 *
 * @param func Function executed as the muxing task.
 * @param init Callback that is called to initialize the muxer and write the
 *             header. Called after sch_mux_stream_ready() is called for all the
 *             streams in the muxer.
 * @param ctx Muxer state; will be passed to func/init and used for logging.
 * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
 * @param thread_queue_size number of packets that can be buffered before
 *                          sending to the muxer blocks
 *
 * @retval ">=0" Index of the newly-created muxer.
 * @retval "<0"  Error code.
 */
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
                void *ctx, int sdp_auto, unsigned thread_queue_size);

/**
 * Default size of a packet thread queue.  For muxing this can be overridden by
 * the thread_queue_size option as passed to a call to sch_add_mux().
 */
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE 8

/**
 * Default size of a frame thread queue.
 */
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE 8

/**
 * Add a muxed stream for a previously added muxer.
 *
 * @param mux_idx index previously returned by sch_add_mux()
 *
 * @retval ">=0" Index of the newly-created muxed stream.
 * @retval "<0"  Error code.
 */
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx);

/**
 * Configure limits on packet buffering performed before the muxer task is
 * started.
 *
 * @param mux_idx index previously returned by sch_add_mux()
 * @param stream_idx_idx index previously returned by sch_add_mux_stream()
 * @param data_threshold Total size of the buffered packets' data after which
 *                       max_packets applies.
 * @param max_packets maximum Maximum number of buffered packets after
 *                            data_threshold is reached.
 */
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
                              size_t data_threshold, int max_packets);

/**
 * Signal to the scheduler that the specified muxed stream is initialized and
 * ready. Muxing is started once all the streams are ready.
 */
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx);

/**
 * Set the file path for the SDP.
 *
 * The SDP is written when either of the following is true:
 * - this function is called at least once
 * - sdp_auto=1 is passed to EVERY call of sch_add_mux()
 */
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename);

/**
 * Add an encoder to the scheduler.
 *
 * @param func Function executed as the encoding task.
 * @param ctx Encoder state; will be passed to func and used for logging.
 * @param open_cb This callback, if specified, will be called when the first
 *                frame is obtained for this encoder. For audio encoders with a
 *                fixed frame size (which use a sync queue in the scheduler to
 *                rechunk frames), it must return that frame size on success.
 *                Otherwise (non-audio, variable frame size) it should return 0.
 *
 * @retval ">=0" Index of the newly-created encoder.
 * @retval "<0"  Error code.
 */
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
                int (*open_cb)(void *func_arg, const struct AVFrame *frame));

/**
 * Add an pre-encoding sync queue to the scheduler.
 *
 * @param buf_size_us Sync queue buffering size, passed to sq_alloc().
 * @param logctx Logging context for the sync queue. passed to sq_alloc().
 *
 * @retval ">=0" Index of the newly-created sync queue.
 * @retval "<0"  Error code.
 */
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx);
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
                   int limiting, uint64_t max_frames);

int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst);

enum DemuxSendFlags {
    /**
     * Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations
     * send normally to other types.
     */
    DEMUX_SEND_STREAMCOPY_EOF = (1 << 0),
};

/**
 * Called by demuxer tasks to communicate with their downstreams. The following
 * may be sent:
 * - a demuxed packet for the stream identified by pkt->stream_index;
 * - demuxer discontinuity/reset (e.g. after a seek) - this is signalled by an
 *   empty packet with stream_index=-1.
 *
 * @param demux_idx demuxer index
 * @param pkt A demuxed packet to send.
 *            When flushing (i.e. pkt->stream_index=-1 on entry to this
 *            function), on successful return pkt->pts/pkt->time_base will be
 *            set to the maximum end timestamp of any decoded audio stream, or
 *            AV_NOPTS_VALUE if no decoded audio streams are present.
 *
 * @retval "non-negative value" success
 * @retval AVERROR_EOF all consumers for the stream are done
 * @retval AVERROR_EXIT all consumers are done, should terminate demuxing
 * @retval "anoter negative error code" other failure
 */
int sch_demux_send(Scheduler *sch, unsigned demux_idx, struct AVPacket *pkt,
                   unsigned flags);

/**
 * Called by decoder tasks to receive a packet for decoding.
 *
 * @param dec_idx decoder index
 * @param pkt Input packet will be written here on success.
 *
 *            An empty packet signals that the decoder should be flushed, but
 *            more packets will follow (e.g. after seeking). When a decoder
 *            created with send_end_ts=1 receives a flush packet, it must write
 *            the end timestamp of the stream after flushing to
 *            pkt->pts/time_base on the next call to this function (if any).
 *
 * @retval "non-negative value" success
 * @retval AVERROR_EOF no more packets will arrive, should terminate decoding
 * @retval "another negative error code" other failure
 */
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, struct AVPacket *pkt);

/**
 * Called by decoder tasks to send a decoded frame downstream.
 *
 * @param dec_idx Decoder index previously returned by sch_add_dec().
 * @param frame Decoded frame; on success it is consumed and cleared by this
 *              function
 *
 * @retval ">=0" success
 * @retval AVERROR_EOF all consumers are done, should terminate decoding
 * @retval "another negative error code" other failure
 */
int sch_dec_send(Scheduler *sch, unsigned dec_idx, struct AVFrame *frame);

/**
 * Called by filtergraph tasks to obtain frames for filtering. Will wait for a
 * frame to become available and return it in frame.
 *
 * Filtergraphs that contain lavfi sources and do not currently require new
 * input frames should call this function as a means of rate control - then
 * in_idx should be set equal to nb_inputs on entry to this function.
 *
 * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
 * @param[in,out] in_idx On input contains the index of the input on which a frame
 *                       is most desired. May be set to nb_inputs to signal that
 *                       the filtergraph does not need more input currently.
 *
 *                       On success, will be replaced with the input index of
 *                       the actually returned frame or EOF timestamp.
 *
 * @retval ">=0" Frame data or EOF timestamp was delivered into frame, in_idx
 *               contains the index of the input it belongs to.
 * @retval AVERROR(EAGAIN) No frame was returned, the filtergraph should
 *                         resume filtering. May only be returned when
 *                         in_idx=nb_inputs on entry to this function.
 * @retval AVERROR_EOF No more frames will arrive, should terminate filtering.
 */
int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
                       unsigned *in_idx, struct AVFrame *frame);
/**
 * Called by filter tasks to signal that a filter input will no longer accept input.
 *
 * @param fg_idx Filtergraph index previously returned from sch_add_filtergraph().
 * @param in_idx Index of the input to finish.
 */
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx);

/**
 * Called by filtergraph tasks to send a filtered frame or EOF to consumers.
 *
 * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
 * @param out_idx Index of the output which produced the frame.
 * @param frame The frame to send to consumers. When NULL, signals that no more
 *              frames will be produced for the specified output. When non-NULL,
 *              the frame is consumed and cleared by this function on success.
 *
 * @retval "non-negative value" success
 * @retval AVERROR_EOF all consumers are done
 * @retval "anoter negative error code" other failure
 */
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx,
                    struct AVFrame *frame);

int sch_filter_command(Scheduler *sch, unsigned fg_idx, struct AVFrame *frame);

/**
 * Called by encoder tasks to obtain frames for encoding. Will wait for a frame
 * to become available and return it in frame.
 *
 * @param enc_idx Encoder index previously returned by sch_add_enc().
 * @param frame   Newly-received frame will be stored here on success. Must be
 *                clean on entrance to this function.
 *
 * @retval 0 A frame was successfully delivered into frame.
 * @retval AVERROR_EOF No more frames will be delivered, the encoder should
 *                     flush everything and terminate.
 *
 */
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, struct AVFrame *frame);

/**
 * Called by encoder tasks to send encoded packets downstream.
 *
 * @param enc_idx Encoder index previously returned by sch_add_enc().
 * @param pkt     An encoded packet; it will be consumed and cleared by this
 *                function on success.
 *
 * @retval 0     success
 * @retval "<0"  Error code.
 */
int sch_enc_send   (Scheduler *sch, unsigned enc_idx, struct AVPacket *pkt);

/**
 * Called by muxer tasks to obtain packets for muxing. Will wait for a packet
 * for any muxed stream to become available and return it in pkt.
 *
 * @param mux_idx Muxer index previously returned by sch_add_mux().
 * @param pkt     Newly-received packet will be stored here on success. Must be
 *                clean on entrance to this function.
 *
 * @retval 0 A packet was successfully delivered into pkt. Its stream_index
 *           corresponds to a stream index previously returned from
 *           sch_add_mux_stream().
 * @retval AVERROR_EOF When pkt->stream_index is non-negative, this signals that
 *                     no more packets will be delivered for this stream index.
 *                     Otherwise this indicates that no more packets will be
 *                     delivered for any stream and the muxer should therefore
 *                     flush everything and terminate.
 */
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, struct AVPacket *pkt);

/**
 * Called by muxer tasks to signal that a stream will no longer accept input.
 *
 * @param stream_idx Stream index previously returned from sch_add_mux_stream().
 */
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx);

int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
                              unsigned dec_idx);
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
                          const AVPacket *pkt);

#endif /* FFTOOLS_FFMPEG_SCHED_H */