diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 45e71ed626..4e6205e3cb 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -728,6 +728,46 @@ cleanup: return ret; } +static void subtitle_free(void *opaque, uint8_t *data) +{ + AVSubtitle *sub = (AVSubtitle*)data; + avsubtitle_free(sub); + av_free(sub); +} + +int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy) +{ + AVBufferRef *buf; + AVSubtitle *sub; + int ret; + + if (copy) { + sub = av_mallocz(sizeof(*sub)); + ret = sub ? copy_av_subtitle(sub, subtitle) : AVERROR(ENOMEM); + if (ret < 0) { + av_freep(&sub); + return ret; + } + } else { + sub = av_memdup(subtitle, sizeof(*subtitle)); + if (!sub) + return AVERROR(ENOMEM); + memset(subtitle, 0, sizeof(*subtitle)); + } + + buf = av_buffer_create((uint8_t*)sub, sizeof(*sub), + subtitle_free, NULL, 0); + if (!buf) { + avsubtitle_free(sub); + av_freep(&sub); + return AVERROR(ENOMEM); + } + + frame->buf[0] = buf; + + return 0; +} + static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts) { int ret = AVERROR_BUG; @@ -1038,30 +1078,11 @@ static void decode_flush(InputFile *ifile) { for (int i = 0; i < ifile->nb_streams; i++) { InputStream *ist = ifile->streams[i]; - int ret; - if (ist->discard) + if (ist->discard || !ist->decoding_needed) continue; - do { - ret = process_input_packet(ist, NULL, 1); - } while (ret > 0); - - if (ist->decoding_needed) { - /* report last frame duration to the demuxer thread */ - if (ist->par->codec_type == AVMEDIA_TYPE_AUDIO) { - LastFrameDuration dur; - - dur.stream_idx = i; - dur.duration = av_rescale_q(ist->nb_samples, - (AVRational){ 1, ist->dec_ctx->sample_rate}, - ist->st->time_base); - - av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0); - } - - avcodec_flush_buffers(ist->dec_ctx); - } + dec_packet(ist, NULL, 1); } } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index abc1a21d73..5d60da085b 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -730,6 +730,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost, int init_complex_filtergraph(FilterGraph *fg); int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src); +int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy); /** * Get our axiliary frame data attached to the frame, allocating it @@ -941,4 +942,14 @@ extern const char * const opt_name_codec_tags[]; extern const char * const opt_name_frame_rates[]; extern const char * const opt_name_top_field_first[]; +static inline void pkt_move(void *dst, void *src) +{ + av_packet_move_ref(dst, src); +} + +static inline void frame_move(void *dst, void *src) +{ + av_frame_move_ref(dst, src); +} + #endif /* FFTOOLS_FFMPEG_H */ diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index e6c6e22b04..d2505345b5 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -30,6 +30,7 @@ #include "libavfilter/buffersrc.h" #include "ffmpeg.h" +#include "thread_queue.h" struct Decoder { AVFrame *frame; @@ -45,8 +46,50 @@ struct Decoder { AVRational last_frame_tb; int64_t last_filter_in_rescale_delta; int last_frame_sample_rate; + + pthread_t thread; + /** + * Queue for sending coded packets from the main thread to + * the decoder thread. + * + * An empty packet is sent to flush the decoder without terminating + * decoding. + */ + ThreadQueue *queue_in; + /** + * Queue for sending decoded frames from the decoder thread + * to the main thread. + * + * An empty frame is sent to signal that a single packet has been fully + * processed. + */ + ThreadQueue *queue_out; }; +// data that is local to the decoder thread and not visible outside of it +typedef struct DecThreadContext { + AVFrame *frame; + AVPacket *pkt; +} DecThreadContext; + +static int dec_thread_stop(Decoder *d) +{ + void *ret; + + if (!d->queue_in) + return 0; + + tq_send_finish(d->queue_in, 0); + tq_receive_finish(d->queue_out, 0); + + pthread_join(d->thread, &ret); + + tq_free(&d->queue_in); + tq_free(&d->queue_out); + + return (intptr_t)ret; +} + void dec_free(Decoder **pdec) { Decoder *dec = *pdec; @@ -54,6 +97,8 @@ void dec_free(Decoder **pdec) if (!dec) return; + dec_thread_stop(dec); + av_frame_free(&dec->frame); av_packet_free(&dec->pkt); @@ -383,8 +428,10 @@ out: return ret; } -static int transcode_subtitles(InputStream *ist, const AVPacket *pkt) +static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, + AVFrame *frame) { + Decoder *d = ist->decoder; AVPacket *flush_pkt = NULL; AVSubtitle subtitle; int got_output; @@ -403,20 +450,30 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt) if (ret < 0) { av_log(ist, AV_LOG_ERROR, "Error decoding subtitles: %s\n", av_err2str(ret)); - if (exit_on_error) - exit_program(1); ist->decode_errors++; + return exit_on_error ? ret : 0; } - if (ret < 0 || !got_output) { - if (!pkt) - sub2video_flush(ist); - return ret < 0 ? ret : AVERROR_EOF; - } + if (!got_output) + return pkt ? 0 : AVERROR_EOF; ist->frames_decoded++; - return process_subtitle(ist, &subtitle); + // XXX the queue for transferring data back to the main thread runs + // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that + // inside the frame + // eventually, subtitles should be switched to use AVFrames natively + ret = subtitle_wrap_frame(frame, &subtitle, 0); + if (ret < 0) { + avsubtitle_free(&subtitle); + return ret; + } + + ret = tq_send(d->queue_out, 0, frame); + if (ret < 0) + av_frame_unref(frame); + + return ret; } static int send_filter_eof(InputStream *ist) @@ -434,7 +491,7 @@ static int send_filter_eof(InputStream *ist) return 0; } -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) +static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame) { Decoder *d = ist->decoder; AVCodecContext *dec = ist->dec_ctx; @@ -442,7 +499,7 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) int ret; if (dec->codec_type == AVMEDIA_TYPE_SUBTITLE) - return transcode_subtitles(ist, pkt); + return transcode_subtitles(ist, pkt, frame); // With fate-indeo3-2, we're getting 0-sized packets before EOF for some // reason. This seems like a semi-critical bug. Don't trigger EOF, and @@ -457,23 +514,25 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) if (ret == AVERROR(EAGAIN)) { av_log(ist, AV_LOG_FATAL, "A decoder returned an unexpected error code. " "This is a bug, please report it.\n"); - exit_program(1); + return AVERROR_BUG; } av_log(ist, AV_LOG_ERROR, "Error submitting %s to decoder: %s\n", pkt ? "packet" : "EOF", av_err2str(ret)); - if (exit_on_error) - exit_program(1); - if (ret != AVERROR_EOF) + if (ret != AVERROR_EOF) { ist->decode_errors++; + if (!exit_on_error) + ret = 0; + } return ret; } while (1) { - AVFrame *frame = d->frame; FrameData *fd; + av_frame_unref(frame); + update_benchmark(NULL); ret = avcodec_receive_frame(dec, frame); update_benchmark("decode_%s %d.%d", type_desc, @@ -483,30 +542,22 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) av_assert0(pkt); // should never happen during flushing return 0; } else if (ret == AVERROR_EOF) { - /* after flushing, send an EOF on all the filter inputs attached to the stream */ - /* except when looping we need to flush but not to send an EOF */ - if (!no_eof) { - ret = send_filter_eof(ist); - if (ret < 0) { - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); - exit_program(1); - } - } - - return AVERROR_EOF; + return ret; } else if (ret < 0) { av_log(ist, AV_LOG_ERROR, "Decoding error: %s\n", av_err2str(ret)); - if (exit_on_error) - exit_program(1); ist->decode_errors++; - return ret; + + if (exit_on_error) + return ret; + + continue; } if (frame->decode_error_flags || (frame->flags & AV_FRAME_FLAG_CORRUPT)) { av_log(ist, exit_on_error ? AV_LOG_FATAL : AV_LOG_WARNING, "corrupt decoded frame\n"); if (exit_on_error) - exit_program(1); + return AVERROR_INVALIDDATA; } @@ -514,7 +565,7 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) fd = frame_data(frame); if (!fd) { av_frame_unref(frame); - report_and_exit(AVERROR(ENOMEM)); + return AVERROR(ENOMEM); } fd->pts = frame->pts; fd->tb = dec->pkt_timebase; @@ -533,17 +584,252 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) if (ret < 0) { av_log(NULL, AV_LOG_FATAL, "Error while processing the decoded " "data for stream #%d:%d\n", ist->file_index, ist->index); - exit_program(1); + return ret; } } ist->frames_decoded++; - ret = send_frame_to_filters(ist, frame); - av_frame_unref(frame); + ret = tq_send(d->queue_out, 0, frame); if (ret < 0) + return ret; + } +} + +static void dec_thread_set_name(const InputStream *ist) +{ + char name[16]; + snprintf(name, sizeof(name), "dec%d:%d:%s", ist->file_index, ist->index, + ist->dec_ctx->codec->name); + ff_thread_setname(name); +} + +static void dec_thread_uninit(DecThreadContext *dt) +{ + av_packet_free(&dt->pkt); + av_frame_free(&dt->frame); + + memset(dt, 0, sizeof(*dt)); +} + +static int dec_thread_init(DecThreadContext *dt) +{ + memset(dt, 0, sizeof(*dt)); + + dt->frame = av_frame_alloc(); + if (!dt->frame) + goto fail; + + dt->pkt = av_packet_alloc(); + if (!dt->pkt) + goto fail; + + return 0; + +fail: + dec_thread_uninit(dt); + return AVERROR(ENOMEM); +} + +static void *decoder_thread(void *arg) +{ + InputStream *ist = arg; + InputFile *ifile = input_files[ist->file_index]; + Decoder *d = ist->decoder; + DecThreadContext dt; + int ret = 0, input_status = 0; + + ret = dec_thread_init(&dt); + if (ret < 0) + goto finish; + + dec_thread_set_name(ist); + + while (!input_status) { + int dummy, flush_buffers; + + input_status = tq_receive(d->queue_in, &dummy, dt.pkt); + flush_buffers = input_status >= 0 && !dt.pkt->buf; + if (!dt.pkt->buf) + av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n", + flush_buffers ? "flush" : "EOF"); + + ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame); + + av_packet_unref(dt.pkt); + av_frame_unref(dt.frame); + + if (ret == AVERROR_EOF) { + av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n", + flush_buffers ? "resetting" : "finishing"); + + if (!flush_buffers) + break; + + /* report last frame duration to the demuxer thread */ + if (ist->dec->type == AVMEDIA_TYPE_AUDIO) { + LastFrameDuration dur; + + dur.stream_idx = ist->index; + dur.duration = av_rescale_q(ist->nb_samples, + (AVRational){ 1, ist->dec_ctx->sample_rate}, + ist->st->time_base); + + av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0); + } + + avcodec_flush_buffers(ist->dec_ctx); + } else if (ret < 0) { + av_log(ist, AV_LOG_ERROR, "Error processing packet in decoder: %s\n", + av_err2str(ret)); + break; + } + + // signal to the consumer thread that the entire packet was processed + ret = tq_send(d->queue_out, 0, dt.frame); + if (ret < 0) { + if (ret != AVERROR_EOF) + av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n"); + break; + } + } + + // EOF is normal thread termination + if (ret == AVERROR_EOF) + ret = 0; + +finish: + tq_receive_finish(d->queue_in, 0); + tq_send_finish (d->queue_out, 0); + + // make sure the demuxer does not get stuck waiting for audio durations + // that will never arrive + if (ifile->audio_duration_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO) + av_thread_message_queue_set_err_recv(ifile->audio_duration_queue, AVERROR_EOF); + + dec_thread_uninit(&dt); + + av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n"); + + return (void*)(intptr_t)ret; +} + +int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) +{ + Decoder *d = ist->decoder; + int ret = 0, thread_ret; + + // thread already joined + if (!d->queue_in) + return AVERROR_EOF; + + // send the packet/flush request/EOF to the decoder thread + if (pkt || no_eof) { + av_packet_unref(d->pkt); + + if (pkt) { + ret = av_packet_ref(d->pkt, pkt); + if (ret < 0) + goto finish; + } + + ret = tq_send(d->queue_in, 0, d->pkt); + if (ret < 0) + goto finish; + } else + tq_send_finish(d->queue_in, 0); + + // retrieve all decoded data for the packet + while (1) { + int dummy; + + ret = tq_receive(d->queue_out, &dummy, d->frame); + if (ret < 0) + goto finish; + + // packet fully processed + if (!d->frame->buf[0]) + return 0; + + // process the decoded frame + if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) { + AVSubtitle *sub = (AVSubtitle*)d->frame->buf[0]->data; + ret = process_subtitle(ist, sub); + } else { + ret = send_frame_to_filters(ist, d->frame); + } + av_frame_unref(d->frame); + if (ret < 0) + goto finish; + } + +finish: + thread_ret = dec_thread_stop(d); + if (thread_ret < 0) { + av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n", + av_err2str(thread_ret)); + ret = err_merge(ret, thread_ret); + } + // non-EOF errors here are all fatal + if (ret < 0 && ret != AVERROR_EOF) + report_and_exit(ret); + + // signal EOF to our downstreams + if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) + sub2video_flush(ist); + else { + ret = send_filter_eof(ist); + if (ret < 0) { + av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); exit_program(1); + } } + + return AVERROR_EOF; +} + +static int dec_thread_start(InputStream *ist) +{ + Decoder *d = ist->decoder; + ObjPool *op; + int ret = 0; + + op = objpool_alloc_packets(); + if (!op) + return AVERROR(ENOMEM); + + d->queue_in = tq_alloc(1, 1, op, pkt_move); + if (!d->queue_in) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + op = objpool_alloc_frames(); + if (!op) + goto fail; + + d->queue_out = tq_alloc(1, 4, op, frame_move); + if (!d->queue_out) { + objpool_free(&op); + goto fail; + } + + ret = pthread_create(&d->thread, NULL, decoder_thread, ist); + if (ret) { + ret = AVERROR(ret); + av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n", + av_err2str(ret)); + goto fail; + } + + return 0; +fail: + if (ret >= 0) + ret = AVERROR(ENOMEM); + + tq_free(&d->queue_in); + tq_free(&d->queue_out); + return ret; } static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts) @@ -781,5 +1067,12 @@ int dec_open(InputStream *ist) } assert_avoptions(ist->decoder_opts); + ret = dec_thread_start(ist); + if (ret < 0) { + av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n", + av_err2str(ret)); + return ret; + } + return 0; } diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 66b2324bb3..026796f7e6 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -475,11 +475,6 @@ static int thread_stop(Muxer *mux) return (int)(intptr_t)ret; } -static void pkt_move(void *dst, void *src) -{ - av_packet_move_ref(dst, src); -} - static int thread_start(Muxer *mux) { AVFormatContext *fc = mux->fc;