diff options
author | Martin Storsjö <martin@martin.st> | 2010-10-01 17:50:24 +0000 |
---|---|---|
committer | Martin Storsjö <martin@martin.st> | 2010-10-01 17:50:24 +0000 |
commit | 58ee09911e0dab6432b18e9a2c5a876fdf8ef078 (patch) | |
tree | 1180f080d8185600d0b538f2a54d3e43b392b4bd /libavformat | |
parent | 0260741876b535c11cef43a959d1a9a970a308e9 (diff) | |
download | ffmpeg-58ee09911e0dab6432b18e9a2c5a876fdf8ef078.tar.gz |
rtpdec: Reorder received RTP packets according to the seq number
Reordering is enabled only when receiving over UDP.
Originally committed as revision 25294 to svn://svn.ffmpeg.org/ffmpeg/trunk
Diffstat (limited to 'libavformat')
-rw-r--r-- | libavformat/rtpdec.c | 120 | ||||
-rw-r--r-- | libavformat/rtpdec.h | 21 | ||||
-rw-r--r-- | libavformat/rtsp.c | 5 |
3 files changed, 139 insertions, 7 deletions
diff --git a/libavformat/rtpdec.c b/libavformat/rtpdec.c index 33497e648f..d349ce79b2 100644 --- a/libavformat/rtpdec.c +++ b/libavformat/rtpdec.c @@ -334,7 +334,7 @@ void rtp_send_punch_packets(URLContext* rtp_handle) * MPEG2TS streams to indicate that they should be demuxed inside the * rtp demux (otherwise CODEC_ID_MPEG2TS packets are returned) */ -RTPDemuxContext *rtp_parse_open(AVFormatContext *s1, AVStream *st, URLContext *rtpc, int payload_type) +RTPDemuxContext *rtp_parse_open(AVFormatContext *s1, AVStream *st, URLContext *rtpc, int payload_type, int queue_size) { RTPDemuxContext *s; @@ -346,6 +346,7 @@ RTPDemuxContext *rtp_parse_open(AVFormatContext *s1, AVStream *st, URLContext *r s->first_rtcp_ntp_time = AV_NOPTS_VALUE; s->ic = s1; s->st = st; + s->queue_size = queue_size; rtp_init_statistics(&s->statistics, 0); // do we know the initial sequence from sdp? if (!strcmp(ff_rtp_enc_name(payload_type), "MP2T")) { s->ts = ff_mpegts_parse_open(s->ic); @@ -504,9 +505,84 @@ static int rtp_parse_packet_internal(RTPDemuxContext *s, AVPacket *pkt, // now perform timestamp things.... finalize_packet(s, pkt, timestamp); + s->prev_ret = rv; return rv; } +void ff_rtp_reset_packet_queue(RTPDemuxContext *s) +{ + while (s->queue) { + RTPPacket *next = s->queue->next; + av_free(s->queue->buf); + av_free(s->queue); + s->queue = next; + } + s->seq = 0; + s->queue_len = 0; + s->prev_ret = 0; +} + +static void enqueue_packet(RTPDemuxContext *s, uint8_t *buf, int len) +{ + uint16_t seq = AV_RB16(buf + 2); + RTPPacket *cur = s->queue, *prev = NULL, *packet; + + /* Find the correct place in the queue to insert the packet */ + while (cur) { + int16_t diff = seq - cur->seq; + if (diff < 0) + break; + prev = cur; + cur = cur->next; + } + + packet = av_mallocz(sizeof(*packet)); + if (!packet) + return; + packet->recvtime = av_gettime(); + packet->seq = seq; + packet->len = len; + packet->buf = buf; + packet->next = cur; + if (prev) + prev->next = packet; + else + s->queue = packet; + s->queue_len++; +} + +static int has_next_packet(RTPDemuxContext *s) +{ + return s->queue && s->queue->seq == s->seq + 1; +} + +int64_t ff_rtp_queued_packet_time(RTPDemuxContext *s) +{ + return s->queue ? s->queue->recvtime : 0; +} + +static int rtp_parse_queued_packet(RTPDemuxContext *s, AVPacket *pkt) +{ + int rv; + RTPPacket *next; + + if (s->queue_len <= 0) + return -1; + + if (!has_next_packet(s)) + av_log(s->st ? s->st->codec : NULL, AV_LOG_WARNING, + "RTP: missed %d packets\n", s->queue->seq - s->seq - 1); + + /* Parse the first packet in the queue, and dequeue it */ + rv = rtp_parse_packet_internal(s, pkt, s->queue->buf, s->queue->len); + next = s->queue->next; + av_free(s->queue->buf); + av_free(s->queue); + s->queue = next; + s->queue_len--; + return rv ? rv : has_next_packet(s); +} + /** * Parse an RTP or RTCP packet directly sent as a buffer. * @param s RTP parse context. @@ -525,6 +601,11 @@ int rtp_parse_packet(RTPDemuxContext *s, AVPacket *pkt, int rv= 0; if (!buf) { + /* If parsing of the previous packet actually returned 0, there's + * nothing more to be parsed from that packet, but we may have + * indicated that we can return the next enqueued packet. */ + if (!s->prev_ret) + return rtp_parse_queued_packet(s, pkt); /* return the next packets, if any */ if(s->st && s->parse_packet) { /* timestamp should be overwritten by parse_packet, if not, @@ -533,7 +614,8 @@ int rtp_parse_packet(RTPDemuxContext *s, AVPacket *pkt, rv= s->parse_packet(s->ic, s->dynamic_protocol_context, s->st, pkt, ×tamp, NULL, 0, flags); finalize_packet(s, pkt, timestamp); - return rv; + s->prev_ret = rv; + return rv ? rv : has_next_packet(s); } else { // TODO: Move to a dynamic packet handler (like above) if (s->read_buf_index >= s->read_buf_size) @@ -545,8 +627,10 @@ int rtp_parse_packet(RTPDemuxContext *s, AVPacket *pkt, s->read_buf_index += ret; if (s->read_buf_index < s->read_buf_size) return 1; - else - return 0; + else { + s->prev_ret = 0; + return has_next_packet(s); + } } } @@ -559,11 +643,37 @@ int rtp_parse_packet(RTPDemuxContext *s, AVPacket *pkt, return rtcp_parse_packet(s, buf, len); } - return rtp_parse_packet_internal(s, pkt, buf, len); + if (s->seq == 0 || s->queue_size <= 1) { + /* First packet, or no reordering */ + return rtp_parse_packet_internal(s, pkt, buf, len); + } else { + uint16_t seq = AV_RB16(buf + 2); + int16_t diff = seq - s->seq; + if (diff < 0) { + /* Packet older than the previously emitted one, drop */ + av_log(s->st ? s->st->codec : NULL, AV_LOG_WARNING, + "RTP: dropping old packet received too late\n"); + return -1; + } else if (diff <= 1) { + /* Correct packet */ + rv = rtp_parse_packet_internal(s, pkt, buf, len); + return rv ? rv : has_next_packet(s); + } else { + /* Still missing some packet, enqueue this one. */ + enqueue_packet(s, buf, len); + *bufptr = NULL; + /* Return the first enqueued packet if the queue is full, + * even if we're missing something */ + if (s->queue_len >= s->queue_size) + return rtp_parse_queued_packet(s, pkt); + return -1; + } + } } void rtp_parse_close(RTPDemuxContext *s) { + ff_rtp_reset_packet_queue(s); if (!strcmp(ff_rtp_enc_name(s->payload_type), "MP2T")) { ff_mpegts_parse_close(s->ts); } diff --git a/libavformat/rtpdec.h b/libavformat/rtpdec.h index df2ec77680..84deefec68 100644 --- a/libavformat/rtpdec.h +++ b/libavformat/rtpdec.h @@ -32,15 +32,19 @@ typedef struct RTPDynamicProtocolHandler_s RTPDynamicProtocolHandler; #define RTP_MIN_PACKET_LENGTH 12 #define RTP_MAX_PACKET_LENGTH 1500 /* XXX: suppress this define */ +#define RTP_REORDER_QUEUE_DEFAULT_SIZE 10 + #define RTP_NOTS_VALUE ((uint32_t)-1) typedef struct RTPDemuxContext RTPDemuxContext; -RTPDemuxContext *rtp_parse_open(AVFormatContext *s1, AVStream *st, URLContext *rtpc, int payload_type); +RTPDemuxContext *rtp_parse_open(AVFormatContext *s1, AVStream *st, URLContext *rtpc, int payload_type, int queue_size); void rtp_parse_set_dynamic_protocol(RTPDemuxContext *s, PayloadContext *ctx, RTPDynamicProtocolHandler *handler); int rtp_parse_packet(RTPDemuxContext *s, AVPacket *pkt, uint8_t **buf, int len); void rtp_parse_close(RTPDemuxContext *s); +int64_t ff_rtp_queued_packet_time(RTPDemuxContext *s); +void ff_rtp_reset_packet_queue(RTPDemuxContext *s); #if (LIBAVFORMAT_VERSION_MAJOR <= 53) int rtp_get_local_port(URLContext *h); #endif @@ -131,6 +135,14 @@ struct RTPDynamicProtocolHandler_s { struct RTPDynamicProtocolHandler_s *next; }; +typedef struct RTPPacket { + uint16_t seq; + uint8_t *buf; + int len; + int64_t recvtime; + struct RTPPacket *next; +} RTPPacket; + // moved out of rtp.c, because the h264 decoder needs to know about this structure.. struct RTPDemuxContext { AVFormatContext *ic; @@ -152,6 +164,13 @@ struct RTPDemuxContext { RTPStatistics statistics; ///< Statistics for this stream (used by RTCP receiver reports) + /** Fields for packet reordering @{ */ + int prev_ret; ///< The return value of the actual parsing of the previous packet + RTPPacket* queue; ///< A sorted queue of buffered packets not yet returned + int queue_len; ///< The number of packets in queue + int queue_size; ///< The size of queue, or 0 if reordering is disabled + /*@}*/ + /* rtcp sender statistics receive */ int64_t last_rtcp_ntp_time; // TODO: move into statistics int64_t first_rtcp_ntp_time; // TODO: move into statistics diff --git a/libavformat/rtsp.c b/libavformat/rtsp.c index 92aa352703..1e4b9c1db7 100644 --- a/libavformat/rtsp.c +++ b/libavformat/rtsp.c @@ -582,7 +582,9 @@ static int rtsp_open_transport_ctx(AVFormatContext *s, RTSPStream *rtsp_st) rtsp_st->dynamic_handler); else rtsp_st->transport_priv = rtp_parse_open(s, st, rtsp_st->rtp_handle, - rtsp_st->sdp_payload_type); + rtsp_st->sdp_payload_type, + (rt->lower_transport == RTSP_LOWER_TRANSPORT_TCP || !s->max_delay) + ? 0 : RTP_REORDER_QUEUE_DEFAULT_SIZE); if (!rtsp_st->transport_priv) { return AVERROR(ENOMEM); @@ -1270,6 +1272,7 @@ static int rtsp_read_play(AVFormatContext *s) continue; if (rtsp_st->stream_index >= 0) st = s->streams[rtsp_st->stream_index]; + ff_rtp_reset_packet_queue(rtpctx); if (reply->range_start != AV_NOPTS_VALUE) { rtpctx->last_rtcp_ntp_time = AV_NOPTS_VALUE; rtpctx->first_rtcp_ntp_time = AV_NOPTS_VALUE; |