From 81975cd24b043c853d5eeacbb2692a3a6f966034 Mon Sep 17 00:00:00 2001 From: Marton Balint Date: Thu, 7 May 2020 21:49:47 +0200 Subject: [PATCH] avformat/fifo: add timeshift option to delay output Signed-off-by: Marton Balint --- doc/muxers.texi | 5 ++++ libavformat/fifo.c | 59 ++++++++++++++++++++++++++++++++++++++++++- libavformat/version.h | 2 +- 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/doc/muxers.texi b/doc/muxers.texi index c598abbe66..d6f9de3702 100644 --- a/doc/muxers.texi +++ b/doc/muxers.texi @@ -2275,6 +2275,11 @@ certain (usually permanent) errors the recovery is not attempted even when Specify whether to wait for the keyframe after recovering from queue overflow or failure. This option is set to 0 (false) by default. +@item timeshift @var{duration} +Buffer the specified amount of packets and delay writing the output. Note that +@var{queue_size} must be big enough to store the packets for timeshift. At the +end of the input the fifo buffer is flushed at realtime speed. + @end table @subsection Examples diff --git a/libavformat/fifo.c b/libavformat/fifo.c index d11dc6626c..17748e94ce 100644 --- a/libavformat/fifo.c +++ b/libavformat/fifo.c @@ -19,6 +19,8 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include + #include "libavutil/avassert.h" #include "libavutil/opt.h" #include "libavutil/time.h" @@ -77,6 +79,9 @@ typedef struct FifoContext { /* Value > 0 signals queue overflow */ volatile uint8_t overflow_flag; + atomic_int_least64_t queue_duration; + int64_t last_sent_dts; + int64_t timeshift; } FifoContext; typedef struct FifoThreadContext { @@ -98,9 +103,12 @@ typedef struct FifoThreadContext { * so finalization by calling write_trailer and ff_io_close must be done * before exiting / reinitialization of underlying muxer */ uint8_t header_written; + + int64_t last_received_dts; } FifoThreadContext; typedef enum FifoMessageType { + FIFO_NOOP, FIFO_WRITE_HEADER, FIFO_WRITE_PACKET, FIFO_FLUSH_OUTPUT @@ -159,6 +167,15 @@ static int fifo_thread_flush_output(FifoThreadContext *ctx) return av_write_frame(avf2, NULL); } +static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts) +{ + AVStream *st = avf->streams[pkt->stream_index]; + int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q); + int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts); + *last_dts = dts; + return duration; +} + static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) { AVFormatContext *avf = ctx->avf; @@ -167,6 +184,9 @@ static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) AVRational src_tb, dst_tb; int ret, s_idx; + if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) + atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed); + if (ctx->drop_until_keyframe) { if (pkt->flags & AV_PKT_FLAG_KEY) { ctx->drop_until_keyframe = 0; @@ -209,6 +229,9 @@ static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg { int ret = AVERROR(EINVAL); + if (msg->type == FIFO_NOOP) + return 0; + if (!ctx->header_written) { ret = fifo_thread_write_header(ctx); if (ret < 0) @@ -390,12 +413,13 @@ static void *fifo_consumer_thread(void *data) AVFormatContext *avf = data; FifoContext *fifo = avf->priv_data; AVThreadMessageQueue *queue = fifo->queue; - FifoMessage msg = {FIFO_WRITE_HEADER, {0}}; + FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}}; int ret; FifoThreadContext fifo_thread_ctx; memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext)); fifo_thread_ctx.avf = avf; + fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE; while (1) { uint8_t just_flushed = 0; @@ -429,6 +453,10 @@ static void *fifo_consumer_thread(void *data) if (just_flushed) av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n"); + if (fifo->timeshift) + while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift) + av_usleep(10000); + ret = av_thread_message_queue_recv(queue, &msg, 0); if (ret < 0) { av_thread_message_queue_set_err_send(queue, ret); @@ -488,6 +516,8 @@ static int fifo_init(AVFormatContext *avf) " only when drop_pkts_on_overflow is also turned on\n"); return AVERROR(EINVAL); } + atomic_init(&fifo->queue_duration, 0); + fifo->last_sent_dts = AV_NOPTS_VALUE; oformat = av_guess_format(fifo->format, avf->url, NULL); if (!oformat) { @@ -563,6 +593,9 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) goto fail; } + if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) + atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed); + return ret; fail: if (pkt) @@ -576,6 +609,27 @@ static int fifo_write_trailer(AVFormatContext *avf) int ret; av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); + if (fifo->timeshift) { + int64_t now = av_gettime_relative(); + int64_t elapsed = 0; + FifoMessage msg = {FIFO_NOOP}; + do { + int64_t delay = av_gettime_relative() - now; + if (delay < 0) { // Discontinuity? + delay = 10000; + now = av_gettime_relative(); + } else { + now += delay; + } + atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed); + elapsed += delay; + if (elapsed > fifo->timeshift) + break; + av_usleep(10000); + ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK); + } while (ret >= 0 || ret == AVERROR(EAGAIN)); + atomic_store(&fifo->queue_duration, INT64_MAX); + } ret = pthread_join(fifo->writer_thread, NULL); if (ret < 0) { @@ -630,6 +684,9 @@ static const AVOption options[] = { {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, + {"timeshift", "Delay fifo output", OFFSET(timeshift), + AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM}, + {NULL}, }; diff --git a/libavformat/version.h b/libavformat/version.h index e34abac822..59151a71a0 100644 --- a/libavformat/version.h +++ b/libavformat/version.h @@ -33,7 +33,7 @@ // Also please add any ticket numbers that you believe might be affected here #define LIBAVFORMAT_VERSION_MAJOR 58 #define LIBAVFORMAT_VERSION_MINOR 46 -#define LIBAVFORMAT_VERSION_MICRO 100 +#define LIBAVFORMAT_VERSION_MICRO 101 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \ LIBAVFORMAT_VERSION_MINOR, \