fftools/ffmpeg: rework -shortest implementation

The -shortest option (which finishes the output file at the time the
shortest stream ends) is currently implemented by faking the -t option
when an output stream ends. This approach is fragile, since it depends
on the frames/packets being processed in a specific order. E.g. there
are currently some situations in which the output file length will
depend unpredictably on unrelated factors like encoder delay. More
importantly, the present work aiming at splitting various ffmpeg
components into different threads will make this approach completely
unworkable, since the frames/packets will arrive in effectively random
order.

This commit introduces a "sync queue", which is essentially a collection
of FIFOs, one per stream. Frames/packets are submitted to these FIFOs
and are then released for further processing (encoding or muxing) when
it is ensured that the frame in question will not cause its stream to
get ahead of the other streams (the logic is similar to libavformat's
interleaving queue).

These sync queues are then used for encoding and/or muxing when the
-shortest option is specified.

A new option – -shortest_buf_duration – controls the maximum number of
queued packets, to avoid runaway memory usage.

This commit changes the results of the following tests:
- copy-shortest[12]: the last audio frame is now gone. This is
  correct, since it actually outlasts the last video frame.
- shortest-sub: the video packets following the last subtitle packet are
  now gone. This is also correct.
pull/388/head
Anton Khirnov 2 years ago
parent 9ac78fb347
commit 4740fea7dd
  1. 1
      Changelog
  2. 16
      doc/ffmpeg.texi
  3. 2
      fftools/Makefile
  4. 109
      fftools/ffmpeg.c
  5. 9
      fftools/ffmpeg.h
  6. 67
      fftools/ffmpeg_mux.c
  7. 82
      fftools/ffmpeg_opt.c
  8. 425
      fftools/sync_queue.c
  9. 100
      fftools/sync_queue.h
  10. 2
      tests/fate/ffmpeg.mak
  11. 1
      tests/ref/fate/copy-shortest1
  12. 1
      tests/ref/fate/copy-shortest2
  13. 4
      tests/ref/fate/shortest-sub

@ -4,6 +4,7 @@ releases are sorted from youngest to oldest.
version <next>:
- Radiance HDR image support
- ddagrab (Desktop Duplication) video capture filter
- ffmpeg -shortest_buf_duration option
version 5.1:

@ -1765,6 +1765,22 @@ Default value is 0.
Enable bitexact mode for (de)muxer and (de/en)coder
@item -shortest (@emph{output})
Finish encoding when the shortest output stream ends.
Note that this option may require buffering frames, which introduces extra
latency. The maximum amount of this latency may be controlled with the
@code{-shortest_buf_duration} option.
@item -shortest_buf_duration @var{duration} (@emph{output})
The @code{-shortest} option may require buffering potentially large amounts
of data when at least one of the streams is "sparse" (i.e. has large gaps
between frames – this is typically the case for subtitles).
This option controls the maximum duration of buffered frames in seconds.
Larger values may allow the @code{-shortest} option to produce more accurate
results, but increase memory use and latency.
The default value is 10 seconds.
@item -dts_delta_threshold
Timestamp discontinuity delta threshold.
@item -dts_error_threshold @var{seconds}

@ -14,6 +14,8 @@ OBJS-ffmpeg += \
fftools/ffmpeg_hw.o \
fftools/ffmpeg_mux.o \
fftools/ffmpeg_opt.o \
fftools/objpool.o \
fftools/sync_queue.o \
define DOFFTOOL
OBJS-$(1) += fftools/cmdutils.o fftools/opt_common.o fftools/$(1).o $(OBJS-$(1)-yes)

@ -104,6 +104,7 @@
#include "ffmpeg.h"
#include "cmdutils.h"
#include "sync_queue.h"
#include "libavutil/avassert.h"
@ -569,6 +570,7 @@ static void ffmpeg_cleanup(int ret)
av_bsf_free(&ost->bsf_ctx);
av_frame_free(&ost->filtered_frame);
av_frame_free(&ost->sq_frame);
av_frame_free(&ost->last_frame);
av_packet_free(&ost->pkt);
av_dict_free(&ost->encoder_opts);
@ -691,13 +693,10 @@ static void update_benchmark(const char *fmt, ...)
static void close_output_stream(OutputStream *ost)
{
OutputFile *of = output_files[ost->file_index];
AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
ost->finished |= ENCODER_FINISHED;
if (of->shortest) {
int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
of->recording_time = FFMIN(of->recording_time, end);
}
if (ost->sq_idx_encode >= 0)
sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
}
/*
@ -726,10 +725,15 @@ static void output_packet(OutputFile *of, AVPacket *pkt,
goto finish;
while ((ret = av_bsf_receive_packet(ost->bsf_ctx, pkt)) >= 0)
of_submit_packet(of, pkt, ost);
if (ret == AVERROR_EOF)
of_submit_packet(of, NULL, ost);
if (ret == AVERROR(EAGAIN))
ret = 0;
} else if (!eof)
of_submit_packet(of, pkt, ost);
} else
of_submit_packet(of, eof ? NULL : pkt, ost);
if (eof)
ost->finished |= MUXER_FINISHED;
finish:
if (ret < 0 && ret != AVERROR_EOF) {
@ -899,6 +903,7 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
if (frame) {
ost->frames_encoded++;
ost->samples_encoded += frame->nb_samples;
if (debug_ts) {
av_log(NULL, AV_LOG_INFO, "encoder <- type:%s "
@ -971,6 +976,52 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
av_assert0(0);
}
static int submit_encode_frame(OutputFile *of, OutputStream *ost,
AVFrame *frame)
{
int ret;
if (ost->sq_idx_encode < 0)
return encode_frame(of, ost, frame);
if (frame) {
ret = av_frame_ref(ost->sq_frame, frame);
if (ret < 0)
return ret;
frame = ost->sq_frame;
}
ret = sq_send(of->sq_encode, ost->sq_idx_encode,
SQFRAME(frame));
if (ret < 0) {
if (frame)
av_frame_unref(frame);
if (ret != AVERROR_EOF)
return ret;
}
while (1) {
AVFrame *enc_frame = ost->sq_frame;
ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
SQFRAME(enc_frame));
if (ret == AVERROR_EOF) {
enc_frame = NULL;
} else if (ret < 0) {
return (ret == AVERROR(EAGAIN)) ? 0 : ret;
}
ret = encode_frame(of, ost, enc_frame);
if (enc_frame)
av_frame_unref(enc_frame);
if (ret < 0) {
if (ret == AVERROR_EOF)
close_output_stream(ost);
return ret;
}
}
}
static void do_audio_out(OutputFile *of, OutputStream *ost,
AVFrame *frame)
{
@ -984,10 +1035,9 @@ static void do_audio_out(OutputFile *of, OutputStream *ost,
if (frame->pts == AV_NOPTS_VALUE || audio_sync_method < 0)
frame->pts = ost->sync_opts;
ost->sync_opts = frame->pts + frame->nb_samples;
ost->samples_encoded += frame->nb_samples;
ret = encode_frame(of, ost, frame);
if (ret < 0)
ret = submit_encode_frame(of, ost, frame);
if (ret < 0 && ret != AVERROR_EOF)
exit_program(1);
}
@ -1151,15 +1201,18 @@ static void do_video_out(OutputFile *of,
if (delta0 > 1.1)
nb0_frames = llrintf(delta0 - 0.6);
}
next_picture->pkt_duration = 1;
break;
case VSYNC_VFR:
if (delta <= -0.6)
nb_frames = 0;
else if (delta > 0.6)
ost->sync_opts = llrint(sync_ipts);
next_picture->pkt_duration = duration;
break;
case VSYNC_DROP:
case VSYNC_PASSTHROUGH:
next_picture->pkt_duration = duration;
ost->sync_opts = llrint(sync_ipts);
break;
default:
@ -1273,8 +1326,8 @@ static void do_video_out(OutputFile *of,
av_log(NULL, AV_LOG_DEBUG, "Forced keyframe at time %f\n", pts_time);
}
ret = encode_frame(of, ost, in_picture);
if (ret < 0)
ret = submit_encode_frame(of, ost, in_picture);
if (ret < 0 && ret != AVERROR_EOF)
exit_program(1);
ost->sync_opts++;
@ -1286,19 +1339,6 @@ static void do_video_out(OutputFile *of,
av_frame_move_ref(ost->last_frame, next_picture);
}
static void finish_output_stream(OutputStream *ost)
{
OutputFile *of = output_files[ost->file_index];
AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
ost->finished = ENCODER_FINISHED | MUXER_FINISHED;
if (of->shortest) {
int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
of->recording_time = FFMIN(of->recording_time, end);
}
}
/**
* Get and encode new output from any of the filtergraphs, without causing
* activity.
@ -1766,7 +1806,7 @@ static void flush_encoders(void)
exit_program(1);
}
finish_output_stream(ost);
output_packet(of, ost->pkt, ost, 1);
}
init_output_stream_wrapper(ost, NULL, 1);
@ -1775,7 +1815,7 @@ static void flush_encoders(void)
if (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO)
continue;
ret = encode_frame(of, ost, NULL);
ret = submit_encode_frame(of, ost, NULL);
if (ret != AVERROR_EOF)
exit_program(1);
}
@ -3086,6 +3126,9 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
break;
}
if (ost->sq_idx_encode >= 0)
sq_set_tb(of->sq_encode, ost->sq_idx_encode, enc_ctx->time_base);
ost->mux_timebase = enc_ctx->time_base;
return 0;
@ -3094,6 +3137,7 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
static int init_output_stream(OutputStream *ost, AVFrame *frame,
char *error, int error_len)
{
OutputFile *of = output_files[ost->file_index];
int ret = 0;
if (ost->encoding_needed) {
@ -3226,6 +3270,9 @@ static int init_output_stream(OutputStream *ost, AVFrame *frame,
if (ret < 0)
return ret;
if (ost->sq_idx_mux >= 0)
sq_set_tb(of->sq_mux, ost->sq_idx_mux, ost->mux_timebase);
ost->initialized = 1;
ret = of_check_init(output_files[ost->file_index]);
@ -3930,8 +3977,10 @@ static int process_input(int file_index)
OutputStream *ost = output_streams[j];
if (ost->source_index == ifile->ist_index + i &&
(ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE))
finish_output_stream(ost);
(ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE)) {
OutputFile *of = output_files[ost->file_index];
output_packet(of, ost->pkt, ost, 1);
}
}
}

@ -26,6 +26,7 @@
#include <signal.h>
#include "cmdutils.h"
#include "sync_queue.h"
#include "libavformat/avformat.h"
#include "libavformat/avio.h"
@ -151,6 +152,7 @@ typedef struct OptionsContext {
int64_t limit_filesize;
float mux_preload;
float mux_max_delay;
float shortest_buf_duration;
int shortest;
int bitexact;
@ -484,6 +486,7 @@ typedef struct OutputStream {
int64_t max_frames;
AVFrame *filtered_frame;
AVFrame *last_frame;
AVFrame *sq_frame;
AVPacket *pkt;
int64_t last_dropped;
int64_t last_nb0_frames[3];
@ -575,6 +578,9 @@ typedef struct OutputStream {
/* frame encode sum of squared error values */
int64_t error[4];
int sq_idx_encode;
int sq_idx_mux;
} OutputStream;
typedef struct Muxer Muxer;
@ -585,6 +591,9 @@ typedef struct OutputFile {
Muxer *mux;
const AVOutputFormat *format;
SyncQueue *sq_encode;
SyncQueue *sq_mux;
AVFormatContext *ctx;
int ost_index; /* index of the first stream in output_streams */
int64_t recording_time; ///< desired length of the resulting file in microseconds == AV_TIME_BASE units

@ -20,6 +20,7 @@
#include <string.h>
#include "ffmpeg.h"
#include "sync_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
@ -56,6 +57,8 @@ struct Muxer {
int64_t limit_filesize;
int64_t final_filesize;
int header_written;
AVPacket *sq_pkt;
};
static int want_sdp = 1;
@ -72,13 +75,14 @@ static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream,
static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
MuxStream *ms = &of->mux->streams[ost->index];
AVPacket *tmp_pkt;
AVPacket *tmp_pkt = NULL;
int ret;
if (!av_fifo_can_write(ms->muxing_queue)) {
size_t cur_size = av_fifo_can_read(ms->muxing_queue);
size_t pkt_size = pkt ? pkt->size : 0;
unsigned int are_we_over_size =
(ms->muxing_queue_data_size + pkt->size) > ost->muxing_queue_data_threshold;
(ms->muxing_queue_data_size + pkt_size) > ost->muxing_queue_data_threshold;
size_t limit = are_we_over_size ? ost->max_muxing_queue_size : SIZE_MAX;
size_t new_size = FFMIN(2 * cur_size, limit);
@ -93,6 +97,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
return ret;
}
if (pkt) {
ret = av_packet_make_refcounted(pkt);
if (ret < 0)
return ret;
@ -103,6 +108,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
av_packet_move_ref(tmp_pkt, pkt);
ms->muxing_queue_data_size += tmp_pkt->size;
}
av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
return 0;
@ -192,11 +198,44 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
}
}
static void submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
if (ost->sq_idx_mux >= 0) {
int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt));
if (ret < 0) {
if (pkt)
av_packet_unref(pkt);
if (ret == AVERROR_EOF) {
ost->finished |= MUXER_FINISHED;
return;
} else
exit_program(1);
}
while (1) {
ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt));
if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
return;
else if (ret < 0)
exit_program(1);
write_packet(of, output_streams[of->ost_index + ret],
of->mux->sq_pkt);
}
} else {
if (pkt)
write_packet(of, ost, pkt);
else
ost->finished |= MUXER_FINISHED;
}
}
void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
{
AVStream *st = ost->st;
int ret;
if (pkt) {
/*
* Audio encoders may split the packets -- #frames in != #packets out.
* But there is no reordering, so we can limit the number of output packets
@ -211,9 +250,10 @@ void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
}
ost->frame_number++;
}
}
if (of->mux->header_written) {
write_packet(of, ost, pkt);
submit_packet(of, ost, pkt);
} else {
/* the muxer is not initialized yet, buffer the packet */
ret = queue_packet(of, ost, pkt);
@ -321,9 +361,11 @@ int of_check_init(OutputFile *of)
ost->mux_timebase = ost->st->time_base;
while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
ms->muxing_queue_data_size -= pkt->size;
write_packet(of, ost, pkt);
av_packet_free(&pkt);
submit_packet(of, ost, pkt);
if (pkt) {
ms->muxing_queue_data_size -= pkt->size;
av_packet_free(&pkt);
}
}
}
@ -383,6 +425,8 @@ static void mux_free(Muxer **pmux, int nb_streams)
av_freep(&mux->streams);
av_dict_free(&mux->opts);
av_packet_free(&mux->sq_pkt);
av_freep(pmux);
}
@ -394,6 +438,9 @@ void of_close(OutputFile **pof)
if (!of)
return;
sq_free(&of->sq_encode);
sq_free(&of->sq_mux);
s = of->ctx;
mux_free(&of->mux, s ? s->nb_streams : 0);
@ -437,6 +484,14 @@ int of_muxer_init(OutputFile *of, AVDictionary *opts, int64_t limit_filesize)
if (strcmp(of->format->name, "rtp"))
want_sdp = 0;
if (of->sq_mux) {
mux->sq_pkt = av_packet_alloc();
if (!mux->sq_pkt) {
ret = AVERROR(ENOMEM);
goto fail;
}
}
/* write the header for files with no streams */
if (of->format->flags & AVFMT_NOSTREAMS && of->ctx->nb_streams == 0) {
ret = of_check_init(of);

@ -31,6 +31,7 @@
#include "fopen_utf8.h"
#include "cmdutils.h"
#include "opt_common.h"
#include "sync_queue.h"
#include "libavformat/avformat.h"
@ -236,6 +237,7 @@ static void init_options(OptionsContext *o)
o->accurate_seek = 1;
o->thread_queue_size = -1;
o->input_sync_ref = -1;
o->shortest_buf_duration = 10.f;
}
static int show_hwaccels(void *optctx, const char *opt, const char *arg)
@ -2385,6 +2387,78 @@ static int init_complex_filters(void)
return 0;
}
static int setup_sync_queues(OutputFile *of, AVFormatContext *oc, int64_t buf_size_us)
{
int nb_av_enc = 0, nb_interleaved = 0;
#define IS_AV_ENC(ost, type) \
(ost->encoding_needed && (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO))
#define IS_INTERLEAVED(type) (type != AVMEDIA_TYPE_ATTACHMENT)
for (int i = 0; i < oc->nb_streams; i++) {
OutputStream *ost = output_streams[of->ost_index + i];
enum AVMediaType type = ost->st->codecpar->codec_type;
ost->sq_idx_encode = -1;
ost->sq_idx_mux = -1;
nb_interleaved += IS_INTERLEAVED(type);
nb_av_enc += IS_AV_ENC(ost, type);
}
if (!(nb_interleaved > 1 && of->shortest))
return 0;
/* if we have more than one encoded audio/video streams, then we
* synchronize them before encoding */
if (nb_av_enc > 1) {
of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us);
if (!of->sq_encode)
return AVERROR(ENOMEM);
for (int i = 0; i < oc->nb_streams; i++) {
OutputStream *ost = output_streams[of->ost_index + i];
enum AVMediaType type = ost->st->codecpar->codec_type;
if (!IS_AV_ENC(ost, type))
continue;
ost->sq_idx_encode = sq_add_stream(of->sq_encode);
if (ost->sq_idx_encode < 0)
return ost->sq_idx_encode;
ost->sq_frame = av_frame_alloc();
if (!ost->sq_frame)
return AVERROR(ENOMEM);
}
}
/* if there are any additional interleaved streams, then ALL the streams
* are also synchronized before sending them to the muxer */
if (nb_interleaved > nb_av_enc) {
of->sq_mux = sq_alloc(SYNC_QUEUE_PACKETS, buf_size_us);
if (!of->sq_mux)
return AVERROR(ENOMEM);
for (int i = 0; i < oc->nb_streams; i++) {
OutputStream *ost = output_streams[of->ost_index + i];
enum AVMediaType type = ost->st->codecpar->codec_type;
if (!IS_INTERLEAVED(type))
continue;
ost->sq_idx_mux = sq_add_stream(of->sq_mux);
if (ost->sq_idx_mux < 0)
return ost->sq_idx_mux;
}
}
#undef IS_AV_ENC
#undef IS_INTERLEAVED
return 0;
}
static int open_output_file(OptionsContext *o, const char *filename)
{
AVFormatContext *oc;
@ -3022,6 +3096,12 @@ loop_end:
exit_program(1);
}
err = setup_sync_queues(of, oc, o->shortest_buf_duration * AV_TIME_BASE);
if (err < 0) {
av_log(NULL, AV_LOG_FATAL, "Error setting up output sync queues\n");
exit_program(1);
}
err = of_muxer_init(of, format_opts, o->limit_filesize);
if (err < 0) {
av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n");
@ -3739,6 +3819,8 @@ const OptionDef options[] = {
{ "shortest", OPT_BOOL | OPT_EXPERT | OPT_OFFSET |
OPT_OUTPUT, { .off = OFFSET(shortest) },
"finish encoding within shortest input" },
{ "shortest_buf_duration", HAS_ARG | OPT_FLOAT | OPT_EXPERT | OPT_OFFSET | OPT_OUTPUT, { .off = OFFSET(shortest_buf_duration) },
"maximum buffering duration (in seconds) for the -shortest option" },
{ "bitexact", OPT_BOOL | OPT_EXPERT | OPT_OFFSET |
OPT_OUTPUT | OPT_INPUT, { .off = OFFSET(bitexact) },
"bitexact mode" },

@ -0,0 +1,425 @@
/*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <stdint.h>
#include <string.h>
#include "libavutil/avassert.h"
#include "libavutil/error.h"
#include "libavutil/fifo.h"
#include "libavutil/mathematics.h"
#include "libavutil/mem.h"
#include "objpool.h"
#include "sync_queue.h"
typedef struct SyncQueueStream {
AVFifo *fifo;
AVRational tb;
/* stream head: largest timestamp seen */
int64_t head_ts;
/* no more frames will be sent for this stream */
int finished;
} SyncQueueStream;
struct SyncQueue {
enum SyncQueueType type;
/* no more frames will be sent for any stream */
int finished;
/* sync head: the stream with the _smallest_ head timestamp
* this stream determines which frames can be output */
int head_stream;
/* the finished stream with the smallest finish timestamp or -1 */
int head_finished_stream;
// maximum buffering duration in microseconds
int64_t buf_size_us;
SyncQueueStream *streams;
unsigned int nb_streams;
// pool of preallocated frames to avoid constant allocations
ObjPool *pool;
};
static void frame_move(const SyncQueue *sq, SyncQueueFrame dst,
SyncQueueFrame src)
{
if (sq->type == SYNC_QUEUE_PACKETS)
av_packet_move_ref(dst.p, src.p);
else
av_frame_move_ref(dst.f, src.f);
}
static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame)
{
return (sq->type == SYNC_QUEUE_PACKETS) ?
frame.p->pts + frame.p->duration :
frame.f->pts + frame.f->pkt_duration;
}
static int frame_null(const SyncQueue *sq, SyncQueueFrame frame)
{
return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL);
}
static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
{
SyncQueueStream *st = &sq->streams[stream_idx];
st->finished = 1;
if (st->head_ts != AV_NOPTS_VALUE) {
/* check if this stream is the new finished head */
if (sq->head_finished_stream < 0 ||
av_compare_ts(st->head_ts, st->tb,
sq->streams[sq->head_finished_stream].head_ts,
sq->streams[sq->head_finished_stream].tb) < 0) {
sq->head_finished_stream = stream_idx;
}
/* mark as finished all streams that should no longer receive new frames,
* due to them being ahead of some finished stream */
st = &sq->streams[sq->head_finished_stream];
for (unsigned int i = 0; i < sq->nb_streams; i++) {
SyncQueueStream *st1 = &sq->streams[i];
if (st != st1 && st1->head_ts != AV_NOPTS_VALUE &&
av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0)
st1->finished = 1;
}
}
/* mark the whole queue as finished if all streams are finished */
for (unsigned int i = 0; i < sq->nb_streams; i++) {
if (!sq->streams[i].finished)
return;
}
sq->finished = 1;
}
static void queue_head_update(SyncQueue *sq)
{
if (sq->head_stream < 0) {
/* wait for one timestamp in each stream before determining
* the queue head */
for (unsigned int i = 0; i < sq->nb_streams; i++) {
SyncQueueStream *st = &sq->streams[i];
if (st->head_ts == AV_NOPTS_VALUE)
return;
}
// placeholder value, correct one will be found below
sq->head_stream = 0;
}
for (unsigned int i = 0; i < sq->nb_streams; i++) {
SyncQueueStream *st_head = &sq->streams[sq->head_stream];
SyncQueueStream *st_other = &sq->streams[i];
if (st_other->head_ts != AV_NOPTS_VALUE &&
av_compare_ts(st_other->head_ts, st_other->tb,
st_head->head_ts, st_head->tb) < 0)
sq->head_stream = i;
}
}
/* update this stream's head timestamp */
static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts)
{
SyncQueueStream *st = &sq->streams[stream_idx];
if (ts == AV_NOPTS_VALUE ||
(st->head_ts != AV_NOPTS_VALUE && st->head_ts >= ts))
return;
st->head_ts = ts;
/* if this stream is now ahead of some finished stream, then
* this stream is also finished */
if (sq->head_finished_stream >= 0 &&
av_compare_ts(sq->streams[sq->head_finished_stream].head_ts,
sq->streams[sq->head_finished_stream].tb,
ts, st->tb) <= 0)
finish_stream(sq, stream_idx);
/* update the overall head timestamp if it could have changed */
if (sq->head_stream < 0 || sq->head_stream == stream_idx)
queue_head_update(sq);
}
/* If the queue for the given stream (or all streams when stream_idx=-1)
* is overflowing, trigger a fake heartbeat on lagging streams.
*
* @return 1 if heartbeat triggered, 0 otherwise
*/
static int overflow_heartbeat(SyncQueue *sq, int stream_idx)
{
SyncQueueStream *st;
SyncQueueFrame frame;
int64_t tail_ts = AV_NOPTS_VALUE;
/* if no stream specified, pick the one that is most ahead */
if (stream_idx < 0) {
int64_t ts = AV_NOPTS_VALUE;
for (int i = 0; i < sq->nb_streams; i++) {
st = &sq->streams[i];
if (st->head_ts != AV_NOPTS_VALUE &&
(ts == AV_NOPTS_VALUE ||
av_compare_ts(ts, sq->streams[stream_idx].tb,
st->head_ts, st->tb) < 0)) {
ts = st->head_ts;
stream_idx = i;
}
}
/* no stream has a timestamp yet -> nothing to do */
if (stream_idx < 0)
return 0;
}
st = &sq->streams[stream_idx];
/* get the chosen stream's tail timestamp */
for (size_t i = 0; tail_ts == AV_NOPTS_VALUE &&
av_fifo_peek(st->fifo, &frame, 1, i) >= 0; i++)
tail_ts = frame_ts(sq, frame);
/* overflow triggers when the tail is over specified duration behind the head */
if (tail_ts == AV_NOPTS_VALUE || tail_ts >= st->head_ts ||
av_rescale_q(st->head_ts - tail_ts, st->tb, AV_TIME_BASE_Q) < sq->buf_size_us)
return 0;
/* signal a fake timestamp for all streams that prevent tail_ts from being output */
tail_ts++;
for (unsigned int i = 0; i < sq->nb_streams; i++) {
SyncQueueStream *st1 = &sq->streams[i];
int64_t ts;
if (st == st1 || st1->finished ||
(st1->head_ts != AV_NOPTS_VALUE &&
av_compare_ts(tail_ts, st->tb, st1->head_ts, st1->tb) <= 0))
continue;
ts = av_rescale_q(tail_ts, st->tb, st1->tb);
if (st1->head_ts != AV_NOPTS_VALUE)
ts = FFMAX(st1->head_ts + 1, ts);
stream_update_ts(sq, i, ts);
}
return 1;
}
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
{
SyncQueueStream *st;
SyncQueueFrame dst;
int64_t ts;
int ret;
av_assert0(stream_idx < sq->nb_streams);
st = &sq->streams[stream_idx];
av_assert0(st->tb.num > 0 && st->tb.den > 0);
if (frame_null(sq, frame)) {
finish_stream(sq, stream_idx);
return 0;
}
if (st->finished)
return AVERROR_EOF;
ret = objpool_get(sq->pool, (void**)&dst);
if (ret < 0)
return ret;
frame_move(sq, dst, frame);
ts = frame_ts(sq, dst);
ret = av_fifo_write(st->fifo, &dst, 1);
if (ret < 0) {
frame_move(sq, frame, dst);
objpool_release(sq->pool, (void**)&dst);
return ret;
}
stream_update_ts(sq, stream_idx, ts);
return 0;
}
static int receive_for_stream(SyncQueue *sq, unsigned int stream_idx,
SyncQueueFrame frame)
{
SyncQueueStream *st_head = sq->head_stream >= 0 ?
&sq->streams[sq->head_stream] : NULL;
SyncQueueStream *st;
av_assert0(stream_idx < sq->nb_streams);
st = &sq->streams[stream_idx];
if (av_fifo_can_read(st->fifo)) {
SyncQueueFrame peek;
int64_t ts;
int cmp = 1;
av_fifo_peek(st->fifo, &peek, 1, 0);
ts = frame_ts(sq, peek);
/* check if this stream's tail timestamp does not overtake
* the overall queue head */
if (ts != AV_NOPTS_VALUE && st_head)
cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb);
/* We can release frames that do not end after the queue head.
* Frames with no timestamps are just passed through with no conditions.
*/
if (cmp <= 0 || ts == AV_NOPTS_VALUE) {
frame_move(sq, frame, peek);
objpool_release(sq->pool, (void**)&peek);
av_fifo_drain2(st->fifo, 1);
return 0;
}
}
return (sq->finished || (st->finished && !av_fifo_can_read(st->fifo))) ?
AVERROR_EOF : AVERROR(EAGAIN);
}
static int receive_internal(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
{
int nb_eof = 0;
int ret;
/* read a frame for a specific stream */
if (stream_idx >= 0) {
ret = receive_for_stream(sq, stream_idx, frame);
return (ret < 0) ? ret : stream_idx;
}
/* read a frame for any stream with available output */
for (unsigned int i = 0; i < sq->nb_streams; i++) {
ret = receive_for_stream(sq, i, frame);
if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) {
nb_eof += (ret == AVERROR_EOF);
continue;
}
return (ret < 0) ? ret : i;
}
return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN);
}
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
{
int ret = receive_internal(sq, stream_idx, frame);
/* try again if the queue overflowed and triggered a fake heartbeat
* for lagging streams */
if (ret == AVERROR(EAGAIN) && overflow_heartbeat(sq, stream_idx))
ret = receive_internal(sq, stream_idx, frame);
return ret;
}
int sq_add_stream(SyncQueue *sq)
{
SyncQueueStream *tmp, *st;
tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams));
if (!tmp)
return AVERROR(ENOMEM);
sq->streams = tmp;
st = &sq->streams[sq->nb_streams];
memset(st, 0, sizeof(*st));
st->fifo = av_fifo_alloc2(1, sizeof(SyncQueueFrame), AV_FIFO_FLAG_AUTO_GROW);
if (!st->fifo)
return AVERROR(ENOMEM);
/* we set a valid default, so that a pathological stream that never
* receives even a real timebase (and no frames) won't stall all other
* streams forever; cf. overflow_heartbeat() */
st->tb = (AVRational){ 1, 1 };
st->head_ts = AV_NOPTS_VALUE;
return sq->nb_streams++;
}
void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
{
SyncQueueStream *st;
av_assert0(stream_idx < sq->nb_streams);
st = &sq->streams[stream_idx];
av_assert0(!av_fifo_can_read(st->fifo));
if (st->head_ts != AV_NOPTS_VALUE)
st->head_ts = av_rescale_q(st->head_ts, st->tb, tb);
st->tb = tb;
}
SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us)
{
SyncQueue *sq = av_mallocz(sizeof(*sq));
if (!sq)
return NULL;
sq->type = type;
sq->buf_size_us = buf_size_us;
sq->head_stream = -1;
sq->head_finished_stream = -1;
sq->pool = (type == SYNC_QUEUE_PACKETS) ? objpool_alloc_packets() :
objpool_alloc_frames();
if (!sq->pool) {
av_freep(&sq);
return NULL;
}
return sq;
}
void sq_free(SyncQueue **psq)
{
SyncQueue *sq = *psq;
if (!sq)
return;
for (unsigned int i = 0; i < sq->nb_streams; i++) {
SyncQueueFrame frame;
while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0)
objpool_release(sq->pool, (void**)&frame);
av_fifo_freep2(&sq->streams[i].fifo);
}
av_freep(&sq->streams);
objpool_free(&sq->pool);
av_freep(psq);
}

@ -0,0 +1,100 @@
/*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef FFTOOLS_SYNC_QUEUE_H
#define FFTOOLS_SYNC_QUEUE_H
#include <stdint.h>
#include "libavcodec/packet.h"
#include "libavutil/frame.h"
enum SyncQueueType {
SYNC_QUEUE_PACKETS,
SYNC_QUEUE_FRAMES,
};
typedef union SyncQueueFrame {
AVFrame *f;
AVPacket *p;
} SyncQueueFrame;
#define SQFRAME(frame) ((SyncQueueFrame){ .f = (frame) })
#define SQPKT(pkt) ((SyncQueueFrame){ .p = (pkt) })
typedef struct SyncQueue SyncQueue;
/**
* Allocate a sync queue of the given type.
*
* @param buf_size_us maximum duration that will be buffered in microseconds
*/
SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us);
void sq_free(SyncQueue **sq);
/**
* Add a new stream to the sync queue.
*
* @return
* - a non-negative stream index on success
* - a negative error code on error
*/
int sq_add_stream(SyncQueue *sq);
/**
* Set the timebase for the stream with index stream_idx. Should be called
* before sending any frames for this stream.
*/
void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb);
/**
* Submit a frame for the stream with index stream_idx.
*
* On success, the sync queue takes ownership of the frame and will reset the
* contents of the supplied frame. On failure, the frame remains owned by the
* caller.
*
* Sending a frame with NULL contents marks the stream as finished.
*
* @return
* - 0 on success
* - AVERROR_EOF when no more frames should be submitted for this stream
* - another a negative error code on failure
*/
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame);
/**
* Read a frame from the queue.
*
* @param stream_idx index of the stream to read a frame for. May be -1, then
* try to read a frame from any stream that is ready for
* output.
* @param frame output frame will be written here on success. The frame is owned
* by the caller.
*
* @return
* - a non-negative index of the stream to which the returned frame belongs
* - AVERROR(EAGAIN) when more frames need to be submitted to the queue
* - AVERROR_EOF when no more frames will be available for this stream (for any
* stream if stream_idx is -1)
* - another negative error code on failure
*/
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame);
#endif // FFTOOLS_SYNC_QUEUE_H

@ -105,7 +105,7 @@ FATE_SAMPLES_FFMPEG-$(call ALLYES, COLOR_FILTER, VOBSUB_DEMUXER, MATROSKA_DEMUXE
fate-shortest-sub: CMD = enc_dec \
vobsub $(TARGET_SAMPLES)/sub/vobsub.idx matroska \
"-filter_complex 'color=s=1x1:rate=1:duration=400' -pix_fmt rgb24 -allow_raw_vfw 1 -c:s copy -c:v rawvideo" \
framecrc "-map 0 -c copy -shortest"
framecrc "-map 0 -c copy -shortest -shortest_buf_duration 40"
# Basic test for fix_sub_duration, which calculates duration based on the
# following subtitle's pts.

@ -120,4 +120,3 @@
0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2
1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af
0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d
1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d

@ -120,4 +120,3 @@
0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2
1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af
0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d
1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d

@ -1,4 +1,4 @@
145b9b48d56f9c966bf41657f7569954 *tests/data/fate/shortest-sub.matroska
139232 tests/data/fate/shortest-sub.matroska
d71f5d359ef788ea689415bc1e4a90df *tests/data/fate/shortest-sub.out.framecrc
stddev:11541.12 PSNR: 15.08 MAXDIFF:22854 bytes: 2591/ 26055
876ac3fa52e467050ab843969d4cf343 *tests/data/fate/shortest-sub.out.framecrc
stddev:11541.12 PSNR: 15.08 MAXDIFF:22854 bytes: 2591/ 23735

Loading…
Cancel
Save