diff options
author | Pavel Nikiforov <nikiforov.pavel@gmail.com> | 2016-03-08 23:27:45 +0300 |
---|---|---|
committer | Michael Niedermayer <michael@niedermayer.cc> | 2016-05-25 01:39:22 +0200 |
commit | 413c842a697866b62ea5dbb5e5fa54ae990d503e (patch) | |
tree | 038b49228777bd97151b269c238abe6db37ed8cc /libavformat/udp.c | |
parent | 49640ae315abf2f38df368763fd753383a1381e4 (diff) | |
download | ffmpeg-413c842a697866b62ea5dbb5e5fa54ae990d503e.tar.gz |
avformat/udp: Add a delay between packets for streaming to clients with short buffer
This commit enables sending UDP packets in a background thread with specified delay.
When sending packets without a delay some devices with small RX buffer
( MAG200 STB, for example) will drop tail packets in bursts causing
decoding errors.
To use it specify "fifo_size" with "packet_gap" .
The output url will looks like udp://xxx:yyy?fifo_size=<output fifo
size>&packet_gap=<delay in usecs>
Signed-off-by: Michael Niedermayer <michael@niedermayer.cc>
Diffstat (limited to 'libavformat/udp.c')
-rw-r--r-- | libavformat/udp.c | 134 |
1 files changed, 131 insertions, 3 deletions
diff --git a/libavformat/udp.c b/libavformat/udp.c index e42b911c42..70dc98e4de 100644 --- a/libavformat/udp.c +++ b/libavformat/udp.c @@ -92,6 +92,7 @@ typedef struct UDPContext { int circular_buffer_size; AVFifoBuffer *fifo; int circular_buffer_error; + int64_t packet_gap; /* delay between transmitted packets */ #if HAVE_PTHREAD_CANCEL pthread_t circular_buffer_thread; pthread_mutex_t mutex; @@ -112,6 +113,7 @@ typedef struct UDPContext { #define E AV_OPT_FLAG_ENCODING_PARAM static const AVOption options[] = { { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, + { "packet_gap", "Delay between packets", OFFSET(packet_gap), AV_OPT_TYPE_DURATION, { .i64 = 0 }, 0, INT_MAX, .flags = E }, { "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E }, { "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, { "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E }, @@ -486,7 +488,7 @@ static int udp_get_file_handle(URLContext *h) } #if HAVE_PTHREAD_CANCEL -static void *circular_buffer_task( void *_URLContext) +static void *circular_buffer_task_rx( void *_URLContext) { URLContext *h = _URLContext; UDPContext *s = h->priv_data; @@ -542,6 +544,81 @@ end: pthread_mutex_unlock(&s->mutex); return NULL; } + +static void do_udp_write(void *arg, void *buf, int size) { + URLContext *h = arg; + UDPContext *s = h->priv_data; + + int ret; + + if (!(h->flags & AVIO_FLAG_NONBLOCK)) { + ret = ff_network_wait_fd(s->udp_fd, 1); + if (ret < 0) { + s->circular_buffer_error = ret; + return; + } + } + + if (!s->is_connected) { + ret = sendto (s->udp_fd, buf, size, 0, + (struct sockaddr *) &s->dest_addr, + s->dest_addr_len); + } else + ret = send(s->udp_fd, buf, size, 0); + + s->circular_buffer_error=ret; +} + +static void *circular_buffer_task_tx( void *_URLContext) +{ + URLContext *h = _URLContext; + UDPContext *s = h->priv_data; + int old_cancelstate; + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); + + for(;;) { + int len; + uint8_t tmp[4]; + + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate); + + av_usleep(s->packet_gap); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); + + pthread_mutex_lock(&s->mutex); + + len=av_fifo_size(s->fifo); + + while (len<4) { + if (pthread_cond_wait(&s->cond, &s->mutex) < 0) { + goto end; + } + len=av_fifo_size(s->fifo); + } + + av_fifo_generic_peek(s->fifo, tmp, 4, NULL); + len=AV_RL32(tmp); + + if (len>0 && av_fifo_size(s->fifo)>=len+4) { + av_fifo_drain(s->fifo, 4); /* skip packet length */ + av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */ + if (s->circular_buffer_error == len) { + /* all ok - reset error */ + s->circular_buffer_error=0; + } + } + + pthread_mutex_unlock(&s->mutex); + } + +end: + pthread_mutex_unlock(&s->mutex); + return NULL; +} + + #endif static int parse_source_list(char *buf, char **sources, int *num_sources, @@ -650,6 +727,16 @@ static int udp_open(URLContext *h, const char *uri, int flags) "'circular_buffer_size' option was set but it is not supported " "on this build (pthread support is required)\n"); } + if (av_find_info_tag(buf, sizeof(buf), "packet_gap", p)) { + if (av_parse_time(&s->packet_gap, buf, 1)<0) { + av_log(h, AV_LOG_ERROR, "Can't parse 'packet_gap'"); + goto fail; + } + if (!HAVE_PTHREAD_CANCEL) + av_log(h, AV_LOG_WARNING, + "'packet_gap' option was set but it is not supported " + "on this build (pthread support is required)\n"); + } if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) { av_strlcpy(localaddr, buf, sizeof(localaddr)); } @@ -829,7 +916,18 @@ static int udp_open(URLContext *h, const char *uri, int flags) s->udp_fd = udp_fd; #if HAVE_PTHREAD_CANCEL - if (!is_output && s->circular_buffer_size) { + /* + Create thread in case of: + 1. Input and circular_buffer_size is set + 2. Output and packet_gap and circular_buffer_size is set + */ + + if (is_output && s->packet_gap && !s->circular_buffer_size) { + /* Warn user in case of 'circular_buffer_size' is not set */ + av_log(h, AV_LOG_WARNING,"'packet_gap' option was set but 'circular_buffer_size' is not, but required\n"); + } + + if ((!is_output && s->circular_buffer_size) || (is_output && s->packet_gap && s->circular_buffer_size)) { int ret; /* start the task going */ @@ -844,7 +942,7 @@ static int udp_open(URLContext *h, const char *uri, int flags) av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret)); goto cond_fail; } - ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h); + ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h); if (ret != 0) { av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret)); goto thread_fail; @@ -945,6 +1043,36 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size) UDPContext *s = h->priv_data; int ret; +#if HAVE_PTHREAD_CANCEL + if (s->fifo) { + uint8_t tmp[4]; + + pthread_mutex_lock(&s->mutex); + + /* + Return error if last tx failed. + Here we can't know on which packet error was, but it needs to know that error exists. + */ + if (s->circular_buffer_error<0) { + int err=s->circular_buffer_error; + s->circular_buffer_error=0; + pthread_mutex_unlock(&s->mutex); + return err; + } + + if(av_fifo_space(s->fifo) < size + 4) { + /* What about a partial packet tx ? */ + pthread_mutex_unlock(&s->mutex); + return AVERROR(ENOMEM); + } + AV_WL32(tmp, size); + av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */ + av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */ + pthread_cond_signal(&s->cond); + pthread_mutex_unlock(&s->mutex); + return size; + } +#endif if (!(h->flags & AVIO_FLAG_NONBLOCK)) { ret = ff_network_wait_fd(s->udp_fd, 1); if (ret < 0) |