fftools/ffmpeg_dec: move decoding to a separate thread

This is only a preparatory step to a fully threaded architecture and
does not yet make decoding truly parallel - the main thread will
currently submit a packet and wait until it has been fully processed by
the decoding thread before moving on. Decoder behavior as observed by
the rest of the program should remain unchanged. That will change in
future commits after encoders and filters are moved to threads and a
thread-aware scheduler is added.
pull/389/head
Anton Khirnov 1 year ago
parent 5293adb1a7
commit 01897c1788
  1. 63
      fftools/ffmpeg.c
  2. 11
      fftools/ffmpeg.h
  3. 361
      fftools/ffmpeg_dec.c
  4. 5
      fftools/ffmpeg_mux.c

@ -728,6 +728,46 @@ cleanup:
return ret;
}
static void subtitle_free(void *opaque, uint8_t *data)
{
AVSubtitle *sub = (AVSubtitle*)data;
avsubtitle_free(sub);
av_free(sub);
}
int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy)
{
AVBufferRef *buf;
AVSubtitle *sub;
int ret;
if (copy) {
sub = av_mallocz(sizeof(*sub));
ret = sub ? copy_av_subtitle(sub, subtitle) : AVERROR(ENOMEM);
if (ret < 0) {
av_freep(&sub);
return ret;
}
} else {
sub = av_memdup(subtitle, sizeof(*subtitle));
if (!sub)
return AVERROR(ENOMEM);
memset(subtitle, 0, sizeof(*subtitle));
}
buf = av_buffer_create((uint8_t*)sub, sizeof(*sub),
subtitle_free, NULL, 0);
if (!buf) {
avsubtitle_free(sub);
av_freep(&sub);
return AVERROR(ENOMEM);
}
frame->buf[0] = buf;
return 0;
}
static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
{
int ret = AVERROR_BUG;
@ -1038,30 +1078,11 @@ static void decode_flush(InputFile *ifile)
{
for (int i = 0; i < ifile->nb_streams; i++) {
InputStream *ist = ifile->streams[i];
int ret;
if (ist->discard)
if (ist->discard || !ist->decoding_needed)
continue;
do {
ret = process_input_packet(ist, NULL, 1);
} while (ret > 0);
if (ist->decoding_needed) {
/* report last frame duration to the demuxer thread */
if (ist->par->codec_type == AVMEDIA_TYPE_AUDIO) {
LastFrameDuration dur;
dur.stream_idx = i;
dur.duration = av_rescale_q(ist->nb_samples,
(AVRational){ 1, ist->dec_ctx->sample_rate},
ist->st->time_base);
av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0);
}
avcodec_flush_buffers(ist->dec_ctx);
}
dec_packet(ist, NULL, 1);
}
}

@ -730,6 +730,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
int init_complex_filtergraph(FilterGraph *fg);
int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src);
int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy);
/**
* Get our axiliary frame data attached to the frame, allocating it
@ -941,4 +942,14 @@ extern const char * const opt_name_codec_tags[];
extern const char * const opt_name_frame_rates[];
extern const char * const opt_name_top_field_first[];
static inline void pkt_move(void *dst, void *src)
{
av_packet_move_ref(dst, src);
}
static inline void frame_move(void *dst, void *src)
{
av_frame_move_ref(dst, src);
}
#endif /* FFTOOLS_FFMPEG_H */

@ -30,6 +30,7 @@
#include "libavfilter/buffersrc.h"
#include "ffmpeg.h"
#include "thread_queue.h"
struct Decoder {
AVFrame *frame;
@ -45,8 +46,50 @@ struct Decoder {
AVRational last_frame_tb;
int64_t last_filter_in_rescale_delta;
int last_frame_sample_rate;
pthread_t thread;
/**
* Queue for sending coded packets from the main thread to
* the decoder thread.
*
* An empty packet is sent to flush the decoder without terminating
* decoding.
*/
ThreadQueue *queue_in;
/**
* Queue for sending decoded frames from the decoder thread
* to the main thread.
*
* An empty frame is sent to signal that a single packet has been fully
* processed.
*/
ThreadQueue *queue_out;
};
// data that is local to the decoder thread and not visible outside of it
typedef struct DecThreadContext {
AVFrame *frame;
AVPacket *pkt;
} DecThreadContext;
static int dec_thread_stop(Decoder *d)
{
void *ret;
if (!d->queue_in)
return 0;
tq_send_finish(d->queue_in, 0);
tq_receive_finish(d->queue_out, 0);
pthread_join(d->thread, &ret);
tq_free(&d->queue_in);
tq_free(&d->queue_out);
return (intptr_t)ret;
}
void dec_free(Decoder **pdec)
{
Decoder *dec = *pdec;
@ -54,6 +97,8 @@ void dec_free(Decoder **pdec)
if (!dec)
return;
dec_thread_stop(dec);
av_frame_free(&dec->frame);
av_packet_free(&dec->pkt);
@ -383,8 +428,10 @@ out:
return ret;
}
static int transcode_subtitles(InputStream *ist, const AVPacket *pkt)
static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
AVFrame *frame)
{
Decoder *d = ist->decoder;
AVPacket *flush_pkt = NULL;
AVSubtitle subtitle;
int got_output;
@ -403,20 +450,30 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt)
if (ret < 0) {
av_log(ist, AV_LOG_ERROR, "Error decoding subtitles: %s\n",
av_err2str(ret));
if (exit_on_error)
exit_program(1);
ist->decode_errors++;
return exit_on_error ? ret : 0;
}
if (ret < 0 || !got_output) {
if (!pkt)
sub2video_flush(ist);
return ret < 0 ? ret : AVERROR_EOF;
}
if (!got_output)
return pkt ? 0 : AVERROR_EOF;
ist->frames_decoded++;
return process_subtitle(ist, &subtitle);
// XXX the queue for transferring data back to the main thread runs
// on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
// inside the frame
// eventually, subtitles should be switched to use AVFrames natively
ret = subtitle_wrap_frame(frame, &subtitle, 0);
if (ret < 0) {
avsubtitle_free(&subtitle);
return ret;
}
ret = tq_send(d->queue_out, 0, frame);
if (ret < 0)
av_frame_unref(frame);
return ret;
}
static int send_filter_eof(InputStream *ist)
@ -434,7 +491,7 @@ static int send_filter_eof(InputStream *ist)
return 0;
}
int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame)
{
Decoder *d = ist->decoder;
AVCodecContext *dec = ist->dec_ctx;
@ -442,7 +499,7 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
int ret;
if (dec->codec_type == AVMEDIA_TYPE_SUBTITLE)
return transcode_subtitles(ist, pkt);
return transcode_subtitles(ist, pkt, frame);
// With fate-indeo3-2, we're getting 0-sized packets before EOF for some
// reason. This seems like a semi-critical bug. Don't trigger EOF, and
@ -457,23 +514,25 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
if (ret == AVERROR(EAGAIN)) {
av_log(ist, AV_LOG_FATAL, "A decoder returned an unexpected error code. "
"This is a bug, please report it.\n");
exit_program(1);
return AVERROR_BUG;
}
av_log(ist, AV_LOG_ERROR, "Error submitting %s to decoder: %s\n",
pkt ? "packet" : "EOF", av_err2str(ret));
if (exit_on_error)
exit_program(1);
if (ret != AVERROR_EOF)
if (ret != AVERROR_EOF) {
ist->decode_errors++;
if (!exit_on_error)
ret = 0;
}
return ret;
}
while (1) {
AVFrame *frame = d->frame;
FrameData *fd;
av_frame_unref(frame);
update_benchmark(NULL);
ret = avcodec_receive_frame(dec, frame);
update_benchmark("decode_%s %d.%d", type_desc,
@ -483,30 +542,22 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
av_assert0(pkt); // should never happen during flushing
return 0;
} else if (ret == AVERROR_EOF) {
/* after flushing, send an EOF on all the filter inputs attached to the stream */
/* except when looping we need to flush but not to send an EOF */
if (!no_eof) {
ret = send_filter_eof(ist);
if (ret < 0) {
av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
exit_program(1);
}
}
return AVERROR_EOF;
return ret;
} else if (ret < 0) {
av_log(ist, AV_LOG_ERROR, "Decoding error: %s\n", av_err2str(ret));
if (exit_on_error)
exit_program(1);
ist->decode_errors++;
if (exit_on_error)
return ret;
continue;
}
if (frame->decode_error_flags || (frame->flags & AV_FRAME_FLAG_CORRUPT)) {
av_log(ist, exit_on_error ? AV_LOG_FATAL : AV_LOG_WARNING,
"corrupt decoded frame\n");
if (exit_on_error)
exit_program(1);
return AVERROR_INVALIDDATA;
}
@ -514,7 +565,7 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
fd = frame_data(frame);
if (!fd) {
av_frame_unref(frame);
report_and_exit(AVERROR(ENOMEM));
return AVERROR(ENOMEM);
}
fd->pts = frame->pts;
fd->tb = dec->pkt_timebase;
@ -533,19 +584,254 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
if (ret < 0) {
av_log(NULL, AV_LOG_FATAL, "Error while processing the decoded "
"data for stream #%d:%d\n", ist->file_index, ist->index);
exit_program(1);
return ret;
}
}
ist->frames_decoded++;
ret = send_frame_to_filters(ist, frame);
av_frame_unref(frame);
ret = tq_send(d->queue_out, 0, frame);
if (ret < 0)
return ret;
}
}
static void dec_thread_set_name(const InputStream *ist)
{
char name[16];
snprintf(name, sizeof(name), "dec%d:%d:%s", ist->file_index, ist->index,
ist->dec_ctx->codec->name);
ff_thread_setname(name);
}
static void dec_thread_uninit(DecThreadContext *dt)
{
av_packet_free(&dt->pkt);
av_frame_free(&dt->frame);
memset(dt, 0, sizeof(*dt));
}
static int dec_thread_init(DecThreadContext *dt)
{
memset(dt, 0, sizeof(*dt));
dt->frame = av_frame_alloc();
if (!dt->frame)
goto fail;
dt->pkt = av_packet_alloc();
if (!dt->pkt)
goto fail;
return 0;
fail:
dec_thread_uninit(dt);
return AVERROR(ENOMEM);
}
static void *decoder_thread(void *arg)
{
InputStream *ist = arg;
InputFile *ifile = input_files[ist->file_index];
Decoder *d = ist->decoder;
DecThreadContext dt;
int ret = 0, input_status = 0;
ret = dec_thread_init(&dt);
if (ret < 0)
goto finish;
dec_thread_set_name(ist);
while (!input_status) {
int dummy, flush_buffers;
input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
flush_buffers = input_status >= 0 && !dt.pkt->buf;
if (!dt.pkt->buf)
av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
flush_buffers ? "flush" : "EOF");
ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
av_packet_unref(dt.pkt);
av_frame_unref(dt.frame);
if (ret == AVERROR_EOF) {
av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
flush_buffers ? "resetting" : "finishing");
if (!flush_buffers)
break;
/* report last frame duration to the demuxer thread */
if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
LastFrameDuration dur;
dur.stream_idx = ist->index;
dur.duration = av_rescale_q(ist->nb_samples,
(AVRational){ 1, ist->dec_ctx->sample_rate},
ist->st->time_base);
av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0);
}
avcodec_flush_buffers(ist->dec_ctx);
} else if (ret < 0) {
av_log(ist, AV_LOG_ERROR, "Error processing packet in decoder: %s\n",
av_err2str(ret));
break;
}
// signal to the consumer thread that the entire packet was processed
ret = tq_send(d->queue_out, 0, dt.frame);
if (ret < 0) {
if (ret != AVERROR_EOF)
av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
break;
}
}
// EOF is normal thread termination
if (ret == AVERROR_EOF)
ret = 0;
finish:
tq_receive_finish(d->queue_in, 0);
tq_send_finish (d->queue_out, 0);
// make sure the demuxer does not get stuck waiting for audio durations
// that will never arrive
if (ifile->audio_duration_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
av_thread_message_queue_set_err_recv(ifile->audio_duration_queue, AVERROR_EOF);
dec_thread_uninit(&dt);
av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
return (void*)(intptr_t)ret;
}
int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
{
Decoder *d = ist->decoder;
int ret = 0, thread_ret;
// thread already joined
if (!d->queue_in)
return AVERROR_EOF;
// send the packet/flush request/EOF to the decoder thread
if (pkt || no_eof) {
av_packet_unref(d->pkt);
if (pkt) {
ret = av_packet_ref(d->pkt, pkt);
if (ret < 0)
goto finish;
}
ret = tq_send(d->queue_in, 0, d->pkt);
if (ret < 0)
goto finish;
} else
tq_send_finish(d->queue_in, 0);
// retrieve all decoded data for the packet
while (1) {
int dummy;
ret = tq_receive(d->queue_out, &dummy, d->frame);
if (ret < 0)
goto finish;
// packet fully processed
if (!d->frame->buf[0])
return 0;
// process the decoded frame
if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
AVSubtitle *sub = (AVSubtitle*)d->frame->buf[0]->data;
ret = process_subtitle(ist, sub);
} else {
ret = send_frame_to_filters(ist, d->frame);
}
av_frame_unref(d->frame);
if (ret < 0)
goto finish;
}
finish:
thread_ret = dec_thread_stop(d);
if (thread_ret < 0) {
av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
av_err2str(thread_ret));
ret = err_merge(ret, thread_ret);
}
// non-EOF errors here are all fatal
if (ret < 0 && ret != AVERROR_EOF)
report_and_exit(ret);
// signal EOF to our downstreams
if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE)
sub2video_flush(ist);
else {
ret = send_filter_eof(ist);
if (ret < 0) {
av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
exit_program(1);
}
}
return AVERROR_EOF;
}
static int dec_thread_start(InputStream *ist)
{
Decoder *d = ist->decoder;
ObjPool *op;
int ret = 0;
op = objpool_alloc_packets();
if (!op)
return AVERROR(ENOMEM);
d->queue_in = tq_alloc(1, 1, op, pkt_move);
if (!d->queue_in) {
objpool_free(&op);
return AVERROR(ENOMEM);
}
op = objpool_alloc_frames();
if (!op)
goto fail;
d->queue_out = tq_alloc(1, 4, op, frame_move);
if (!d->queue_out) {
objpool_free(&op);
goto fail;
}
ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
if (ret) {
ret = AVERROR(ret);
av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
av_err2str(ret));
goto fail;
}
return 0;
fail:
if (ret >= 0)
ret = AVERROR(ENOMEM);
tq_free(&d->queue_in);
tq_free(&d->queue_out);
return ret;
}
static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
{
InputStream *ist = s->opaque;
@ -781,5 +1067,12 @@ int dec_open(InputStream *ist)
}
assert_avoptions(ist->decoder_opts);
ret = dec_thread_start(ist);
if (ret < 0) {
av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
av_err2str(ret));
return ret;
}
return 0;
}

@ -475,11 +475,6 @@ static int thread_stop(Muxer *mux)
return (int)(intptr_t)ret;
}
static void pkt_move(void *dst, void *src)
{
av_packet_move_ref(dst, src);
}
static int thread_start(Muxer *mux)
{
AVFormatContext *fc = mux->fc;

Loading…
Cancel
Save