aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Niedermayer <michaelni@gmx.at>2011-12-23 01:17:18 +0100
committerMichael Niedermayer <michaelni@gmx.at>2012-01-03 22:26:20 +0100
commit3710f0b9edd7cf69a83ed00d2e33630b7188187f (patch)
treeb7b875ff35e98fd7e0e172e30d9b829f73c6f89d
parent12d0e44bdbe1421d828db7a84cbedeb6f2eb8f6b (diff)
downloadffmpeg-3710f0b9edd7cf69a83ed00d2e33630b7188187f.tar.gz
udp: Replace double select() by select+mutex+cond.
When no data was available both the buffer thread as well as the main thread would block in select(), when data becomes available both should move forward and as data is read in the buffer thread the main thread would block in select() later the read data was put in the fifo but the main thread still would be blocked in select() until either the timeout or another packet would come in. This is solved in this commit by using a mutex and a condition variable Signed-off-by: Michael Niedermayer <michaelni@gmx.at> (cherry picked from commit bc900501e0e2002e40d2d0c87b5a98b913b2d1a2) Signed-off-by: Michael Niedermayer <michaelni@gmx.at>
-rw-r--r--libavformat/udp.c41
1 files changed, 27 insertions, 14 deletions
diff --git a/libavformat/udp.c b/libavformat/udp.c
index 5eb6cd1f86..040cf90fa3 100644
--- a/libavformat/udp.c
+++ b/libavformat/udp.c
@@ -69,6 +69,8 @@ typedef struct {
int circular_buffer_error;
#if HAVE_PTHREADS
pthread_t circular_buffer_thread;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
#endif
uint8_t tmp[UDP_MAX_PKT_SIZE+4];
int remaining_in_dg;
@@ -317,6 +319,7 @@ static int udp_get_file_handle(URLContext *h)
return s->udp_fd;
}
+#if HAVE_PTHREADS
static void *circular_buffer_task( void *_URLContext)
{
URLContext *h = _URLContext;
@@ -331,7 +334,7 @@ static void *circular_buffer_task( void *_URLContext)
if (ff_check_interrupt(&h->interrupt_callback)) {
s->circular_buffer_error = EINTR;
- return NULL;
+ goto end;
}
FD_ZERO(&rfds);
@@ -343,7 +346,7 @@ static void *circular_buffer_task( void *_URLContext)
if (ff_neterrno() == AVERROR(EINTR))
continue;
s->circular_buffer_error = EIO;
- return NULL;
+ goto end;
}
if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds)))
@@ -357,23 +360,31 @@ static void *circular_buffer_task( void *_URLContext)
if(left < UDP_MAX_PKT_SIZE + 4) {
av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
s->circular_buffer_error = EIO;
- return NULL;
+ goto end;
}
left = FFMIN(left, s->fifo->end - s->fifo->wptr);
len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
if (len < 0) {
if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
s->circular_buffer_error = EIO;
- return NULL;
+ goto end;
}
continue;
}
AV_WL32(s->tmp, len);
+ pthread_mutex_lock(&s->mutex);
av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
}
+end:
+ pthread_mutex_lock(&s->mutex);
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
return NULL;
}
+#endif
/* put it in UDP context */
/* return non zero if error */
@@ -516,6 +527,8 @@ static int udp_open(URLContext *h, const char *uri, int flags)
if (!is_output && s->circular_buffer_size) {
/* start the task going */
s->fifo = av_fifo_alloc(s->circular_buffer_size);
+ pthread_mutex_init(&s->mutex, NULL);
+ pthread_cond_init(&s->cond, NULL);
if (pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h)) {
av_log(h, AV_LOG_ERROR, "pthread_create failed\n");
goto fail;
@@ -536,15 +549,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
UDPContext *s = h->priv_data;
int ret;
int avail;
- fd_set rfds;
- struct timeval tv;
+#if HAVE_PTHREADS
if (s->fifo) {
-
+ pthread_mutex_lock(&s->mutex);
do {
avail = av_fifo_size(s->fifo);
if (avail) { // >=size) {
uint8_t tmp[4];
+ pthread_mutex_unlock(&s->mutex);
av_fifo_generic_read(s->fifo, tmp, 4, NULL);
avail= AV_RL32(tmp);
@@ -557,19 +570,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
return avail;
} else if(s->circular_buffer_error){
+ pthread_mutex_unlock(&s->mutex);
return s->circular_buffer_error;
}
else {
- FD_ZERO(&rfds);
- FD_SET(s->udp_fd, &rfds);
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv);
- if (ret<0)
- return ret;
+ pthread_cond_wait(&s->cond, &s->mutex);
}
} while( 1);
}
+#endif
if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
ret = ff_network_wait_fd(s->udp_fd, 0);
@@ -610,6 +619,10 @@ static int udp_close(URLContext *h)
udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
closesocket(s->udp_fd);
av_fifo_free(s->fifo);
+#if HAVE_PTHREADS
+ pthread_mutex_destroy(&s->mutex);
+ pthread_cond_destroy(&s->cond);
+#endif
return 0;
}