From 413c842a697866b62ea5dbb5e5fa54ae990d503e Mon Sep 17 00:00:00 2001 From: Pavel Nikiforov Date: Tue, 8 Mar 2016 23:27:45 +0300 Subject: [PATCH] avformat/udp: Add a delay between packets for streaming to clients with short buffer This commit enables sending UDP packets in a background thread with specified delay. When sending packets without a delay some devices with small RX buffer ( MAG200 STB, for example) will drop tail packets in bursts causing decoding errors. To use it specify "fifo_size" with "packet_gap" . The output url will looks like udp://xxx:yyy?fifo_size=&packet_gap= Signed-off-by: Michael Niedermayer --- doc/protocols.texi | 3 + libavformat/udp.c | 134 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 134 insertions(+), 3 deletions(-) diff --git a/doc/protocols.texi b/doc/protocols.texi index a1084bd8a7..a9c9d0c462 100644 --- a/doc/protocols.texi +++ b/doc/protocols.texi @@ -1285,6 +1285,9 @@ Set the UDP maximum socket buffer size in bytes. This is used to set either the receive or send buffer size, depending on what the socket is used for. Default is 64KB. See also @var{fifo_size}. +@item packet_gap=@var{seconds} +Delay between packets + @item localport=@var{port} Override the local UDP port to bind with. diff --git a/libavformat/udp.c b/libavformat/udp.c index e42b911c42..70dc98e4de 100644 --- a/libavformat/udp.c +++ b/libavformat/udp.c @@ -92,6 +92,7 @@ typedef struct UDPContext { int circular_buffer_size; AVFifoBuffer *fifo; int circular_buffer_error; + int64_t packet_gap; /* delay between transmitted packets */ #if HAVE_PTHREAD_CANCEL pthread_t circular_buffer_thread; pthread_mutex_t mutex; @@ -112,6 +113,7 @@ typedef struct UDPContext { #define E AV_OPT_FLAG_ENCODING_PARAM static const AVOption options[] = { { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, + { "packet_gap", "Delay between packets", OFFSET(packet_gap), AV_OPT_TYPE_DURATION, { .i64 = 0 }, 0, INT_MAX, .flags = E }, { "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E }, { "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, { "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E }, @@ -486,7 +488,7 @@ static int udp_get_file_handle(URLContext *h) } #if HAVE_PTHREAD_CANCEL -static void *circular_buffer_task( void *_URLContext) +static void *circular_buffer_task_rx( void *_URLContext) { URLContext *h = _URLContext; UDPContext *s = h->priv_data; @@ -542,6 +544,81 @@ end: pthread_mutex_unlock(&s->mutex); return NULL; } + +static void do_udp_write(void *arg, void *buf, int size) { + URLContext *h = arg; + UDPContext *s = h->priv_data; + + int ret; + + if (!(h->flags & AVIO_FLAG_NONBLOCK)) { + ret = ff_network_wait_fd(s->udp_fd, 1); + if (ret < 0) { + s->circular_buffer_error = ret; + return; + } + } + + if (!s->is_connected) { + ret = sendto (s->udp_fd, buf, size, 0, + (struct sockaddr *) &s->dest_addr, + s->dest_addr_len); + } else + ret = send(s->udp_fd, buf, size, 0); + + s->circular_buffer_error=ret; +} + +static void *circular_buffer_task_tx( void *_URLContext) +{ + URLContext *h = _URLContext; + UDPContext *s = h->priv_data; + int old_cancelstate; + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); + + for(;;) { + int len; + uint8_t tmp[4]; + + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate); + + av_usleep(s->packet_gap); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); + + pthread_mutex_lock(&s->mutex); + + len=av_fifo_size(s->fifo); + + while (len<4) { + if (pthread_cond_wait(&s->cond, &s->mutex) < 0) { + goto end; + } + len=av_fifo_size(s->fifo); + } + + av_fifo_generic_peek(s->fifo, tmp, 4, NULL); + len=AV_RL32(tmp); + + if (len>0 && av_fifo_size(s->fifo)>=len+4) { + av_fifo_drain(s->fifo, 4); /* skip packet length */ + av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */ + if (s->circular_buffer_error == len) { + /* all ok - reset error */ + s->circular_buffer_error=0; + } + } + + pthread_mutex_unlock(&s->mutex); + } + +end: + pthread_mutex_unlock(&s->mutex); + return NULL; +} + + #endif static int parse_source_list(char *buf, char **sources, int *num_sources, @@ -650,6 +727,16 @@ static int udp_open(URLContext *h, const char *uri, int flags) "'circular_buffer_size' option was set but it is not supported " "on this build (pthread support is required)\n"); } + if (av_find_info_tag(buf, sizeof(buf), "packet_gap", p)) { + if (av_parse_time(&s->packet_gap, buf, 1)<0) { + av_log(h, AV_LOG_ERROR, "Can't parse 'packet_gap'"); + goto fail; + } + if (!HAVE_PTHREAD_CANCEL) + av_log(h, AV_LOG_WARNING, + "'packet_gap' option was set but it is not supported " + "on this build (pthread support is required)\n"); + } if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) { av_strlcpy(localaddr, buf, sizeof(localaddr)); } @@ -829,7 +916,18 @@ static int udp_open(URLContext *h, const char *uri, int flags) s->udp_fd = udp_fd; #if HAVE_PTHREAD_CANCEL - if (!is_output && s->circular_buffer_size) { + /* + Create thread in case of: + 1. Input and circular_buffer_size is set + 2. Output and packet_gap and circular_buffer_size is set + */ + + if (is_output && s->packet_gap && !s->circular_buffer_size) { + /* Warn user in case of 'circular_buffer_size' is not set */ + av_log(h, AV_LOG_WARNING,"'packet_gap' option was set but 'circular_buffer_size' is not, but required\n"); + } + + if ((!is_output && s->circular_buffer_size) || (is_output && s->packet_gap && s->circular_buffer_size)) { int ret; /* start the task going */ @@ -844,7 +942,7 @@ static int udp_open(URLContext *h, const char *uri, int flags) av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret)); goto cond_fail; } - ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h); + ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h); if (ret != 0) { av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret)); goto thread_fail; @@ -945,6 +1043,36 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size) UDPContext *s = h->priv_data; int ret; +#if HAVE_PTHREAD_CANCEL + if (s->fifo) { + uint8_t tmp[4]; + + pthread_mutex_lock(&s->mutex); + + /* + Return error if last tx failed. + Here we can't know on which packet error was, but it needs to know that error exists. + */ + if (s->circular_buffer_error<0) { + int err=s->circular_buffer_error; + s->circular_buffer_error=0; + pthread_mutex_unlock(&s->mutex); + return err; + } + + if(av_fifo_space(s->fifo) < size + 4) { + /* What about a partial packet tx ? */ + pthread_mutex_unlock(&s->mutex); + return AVERROR(ENOMEM); + } + AV_WL32(tmp, size); + av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */ + av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */ + pthread_cond_signal(&s->cond); + pthread_mutex_unlock(&s->mutex); + return size; + } +#endif if (!(h->flags & AVIO_FLAG_NONBLOCK)) { ret = ff_network_wait_fd(s->udp_fd, 1); if (ret < 0)