diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 6cd92a71c1..5019a4b287 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -3628,6 +3628,37 @@ static void reset_eagain(void) output_streams[i]->unavailable = 0; } +static void decode_flush(InputFile *ifile) +{ + for (int i = 0; i < ifile->nb_streams; i++) { + InputStream *ist = input_streams[ifile->ist_index + i]; + int ret; + + if (!ist->processing_needed) + continue; + + do { + ret = process_input_packet(ist, NULL, 1); + } while (ret > 0); + + if (ist->decoding_needed) { + /* report last frame duration to the demuxer thread */ + if (ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { + LastFrameDuration dur; + + dur.stream_idx = i; + dur.duration = av_rescale_q(ist->nb_samples, + (AVRational){ 1, ist->dec_ctx->sample_rate}, + ist->st->time_base); + + av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0); + } + + avcodec_flush_buffers(ist->dec_ctx); + } + } +} + /* * Return * - 0 -- one packet was read and processed @@ -3641,7 +3672,7 @@ static int process_input(int file_index) AVFormatContext *is; InputStream *ist; AVPacket *pkt; - int ret, thread_ret, i, j; + int ret, i, j; int64_t duration; int64_t pkt_dts; int disable_discontinuity_correction = copy_ts; @@ -3653,30 +3684,10 @@ static int process_input(int file_index) ifile->eagain = 1; return ret; } - if (ret < 0 && ifile->loop) { - for (i = 0; i < ifile->nb_streams; i++) { - ist = input_streams[ifile->ist_index + i]; - if (ist->processing_needed) { - ret = process_input_packet(ist, NULL, 1); - if (ret>0) - return 0; - if (ist->decoding_needed) - avcodec_flush_buffers(ist->dec_ctx); - } - } - free_input_thread(file_index); - ret = seek_to_start(ifile, is); - thread_ret = init_input_thread(file_index); - if (thread_ret < 0) - return thread_ret; - if (ret < 0) - av_log(NULL, AV_LOG_WARNING, "Seek to start failed.\n"); - else - ret = ifile_get_packet(ifile, &pkt); - if (ret == AVERROR(EAGAIN)) { - ifile->eagain = 1; - return ret; - } + if (ret == 1) { + /* the input file is looped: flush the decoders */ + decode_flush(ifile); + return AVERROR(EAGAIN); } if (ret < 0) { if (ret != AVERROR_EOF) { diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 2a9c34eb93..aa97f35310 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -407,6 +407,11 @@ typedef struct InputStream { int got_output; } InputStream; +typedef struct LastFrameDuration { + int stream_idx; + int64_t duration; +} LastFrameDuration; + typedef struct InputFile { int index; @@ -438,6 +443,11 @@ typedef struct InputFile { pthread_t thread; /* thread reading from this file */ int non_blocking; /* reading packets from the thread should not block */ int thread_queue_size; /* maximum number of queued packets */ + + /* when looping the input file, this queue is used by decoders to report + * the last frame duration back to the demuxer thread */ + AVThreadMessageQueue *audio_duration_queue; + int audio_duration_queue_size; } InputFile; enum forced_keyframes_const { @@ -710,11 +720,18 @@ int64_t of_filesize(OutputFile *of); AVChapter * const * of_get_chapters(OutputFile *of, unsigned int *nb_chapters); +/** + * Get next input packet from the demuxer. + * + * @param pkt the packet is written here when this function returns 0 + * @return + * - 0 when a packet has been read successfully + * - 1 when stream end was reached, but the stream is looped; + * caller should flush decoders and read from this demuxer again + * - a negative error code on failure + */ int ifile_get_packet(InputFile *f, AVPacket **pkt); int init_input_threads(void); -int init_input_thread(int i); void free_input_threads(void); -void free_input_thread(int i); -int seek_to_start(InputFile *ifile, AVFormatContext *is); #endif /* FFTOOLS_FFMPEG_H */ diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index bfdb209f00..d03210d9e6 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -28,6 +28,11 @@ #include "libavformat/avformat.h" +typedef struct DemuxMsg { + AVPacket *pkt; + int looping; +} DemuxMsg; + static void report_new_stream(InputFile *file, const AVPacket *pkt) { AVStream *st = file->ctx->streams[pkt->stream_index]; @@ -42,61 +47,54 @@ static void report_new_stream(InputFile *file, const AVPacket *pkt) file->nb_streams_warn = pkt->stream_index + 1; } -// set duration to max(tmp, duration) in a proper time base and return duration's time_base -static AVRational duration_max(int64_t tmp, int64_t *duration, AVRational tmp_time_base, - AVRational time_base) +static void ifile_duration_update(InputFile *f, InputStream *ist, + int64_t last_duration) { - int ret; - - if (!*duration) { - *duration = tmp; - return tmp_time_base; + /* the total duration of the stream, max_pts - min_pts is + * the duration of the stream without the last frame */ + if (ist->max_pts > ist->min_pts && + ist->max_pts - (uint64_t)ist->min_pts < INT64_MAX - last_duration) + last_duration += ist->max_pts - ist->min_pts; + + if (!f->duration || + av_compare_ts(f->duration, f->time_base, + last_duration, ist->st->time_base) < 0) { + f->duration = last_duration; + f->time_base = ist->st->time_base; } - - ret = av_compare_ts(*duration, time_base, tmp, tmp_time_base); - if (ret < 0) { - *duration = tmp; - return tmp_time_base; - } - - return time_base; } -int seek_to_start(InputFile *ifile, AVFormatContext *is) +static int seek_to_start(InputFile *ifile) { + AVFormatContext *is = ifile->ctx; InputStream *ist; - AVCodecContext *avctx; - int i, ret, has_audio = 0; - int64_t duration = 0; + int ret; ret = avformat_seek_file(is, -1, INT64_MIN, is->start_time, is->start_time, 0); if (ret < 0) return ret; - for (i = 0; i < ifile->nb_streams; i++) { - ist = input_streams[ifile->ist_index + i]; - avctx = ist->dec_ctx; - + if (ifile->audio_duration_queue_size) { /* duration is the length of the last frame in a stream * when audio stream is present we don't care about * last video frame length because it's not defined exactly */ - if (avctx->codec_type == AVMEDIA_TYPE_AUDIO && ist->nb_samples) - has_audio = 1; - } + int got_durations = 0; - for (i = 0; i < ifile->nb_streams; i++) { - ist = input_streams[ifile->ist_index + i]; - avctx = ist->dec_ctx; + while (got_durations < ifile->audio_duration_queue_size) { + LastFrameDuration dur; + ret = av_thread_message_queue_recv(ifile->audio_duration_queue, &dur, 0); + if (ret < 0) + return ret; + got_durations++; - if (has_audio) { - if (avctx->codec_type == AVMEDIA_TYPE_AUDIO && ist->nb_samples) { - AVRational sample_rate = {1, avctx->sample_rate}; + ist = input_streams[ifile->ist_index + dur.stream_idx]; + ifile_duration_update(ifile, ist, dur.duration); + } + } else { + for (int i = 0; i < ifile->nb_streams; i++) { + int64_t duration = 0; + ist = input_streams[ifile->ist_index + i]; - duration = av_rescale_q(ist->nb_samples, sample_rate, ist->st->time_base); - } else { - continue; - } - } else { if (ist->framerate.num) { duration = av_rescale_q(1, av_inv_q(ist->framerate), ist->st->time_base); } else if (ist->st->avg_frame_rate.num) { @@ -104,15 +102,9 @@ int seek_to_start(InputFile *ifile, AVFormatContext *is) } else { duration = 1; } + + ifile_duration_update(ifile, ist, duration); } - if (!ifile->duration) - ifile->time_base = ist->st->time_base; - /* the total duration of the stream, max_pts - min_pts is - * the duration of the stream without the last frame */ - if (ist->max_pts > ist->min_pts && ist->max_pts - (uint64_t)ist->min_pts < INT64_MAX - duration) - duration += ist->max_pts - ist->min_pts; - ifile->time_base = duration_max(duration, &ifile->duration, ist->st->time_base, - ifile->time_base); } if (ifile->loop > 0) @@ -124,11 +116,13 @@ int seek_to_start(InputFile *ifile, AVFormatContext *is) static void *input_thread(void *arg) { InputFile *f = arg; - AVPacket *pkt = f->pkt, *queue_pkt; + AVPacket *pkt = f->pkt; unsigned flags = f->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0; int ret = 0; while (1) { + DemuxMsg msg = { NULL }; + ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -136,6 +130,18 @@ static void *input_thread(void *arg) continue; } if (ret < 0) { + if (f->loop) { + /* signal looping to the consumer thread */ + msg.looping = 1; + ret = av_thread_message_queue_send(f->in_thread_queue, &msg, 0); + if (ret >= 0) + ret = seek_to_start(f); + if (ret >= 0) + continue; + + /* fallthrough to the error path */ + } + av_thread_message_queue_set_err_recv(f->in_thread_queue, ret); break; } @@ -153,17 +159,17 @@ static void *input_thread(void *arg) continue; } - queue_pkt = av_packet_alloc(); - if (!queue_pkt) { + msg.pkt = av_packet_alloc(); + if (!msg.pkt) { av_packet_unref(pkt); av_thread_message_queue_set_err_recv(f->in_thread_queue, AVERROR(ENOMEM)); break; } - av_packet_move_ref(queue_pkt, pkt); - ret = av_thread_message_queue_send(f->in_thread_queue, &queue_pkt, flags); + av_packet_move_ref(msg.pkt, pkt); + ret = av_thread_message_queue_send(f->in_thread_queue, &msg, flags); if (flags && ret == AVERROR(EAGAIN)) { flags = 0; - ret = av_thread_message_queue_send(f->in_thread_queue, &queue_pkt, flags); + ret = av_thread_message_queue_send(f->in_thread_queue, &msg, flags); av_log(f->ctx, AV_LOG_WARNING, "Thread message queue blocking; consider raising the " "thread_queue_size option (current value: %d)\n", @@ -174,7 +180,7 @@ static void *input_thread(void *arg) av_log(f->ctx, AV_LOG_ERROR, "Unable to send packet to main thread: %s\n", av_err2str(ret)); - av_packet_free(&queue_pkt); + av_packet_free(&msg.pkt); av_thread_message_queue_set_err_recv(f->in_thread_queue, ret); break; } @@ -183,19 +189,20 @@ static void *input_thread(void *arg) return NULL; } -void free_input_thread(int i) +static void free_input_thread(int i) { InputFile *f = input_files[i]; - AVPacket *pkt; + DemuxMsg msg; if (!f || !f->in_thread_queue) return; av_thread_message_queue_set_err_send(f->in_thread_queue, AVERROR_EOF); - while (av_thread_message_queue_recv(f->in_thread_queue, &pkt, 0) >= 0) - av_packet_free(&pkt); + while (av_thread_message_queue_recv(f->in_thread_queue, &msg, 0) >= 0) + av_packet_free(&msg.pkt); pthread_join(f->thread, NULL); av_thread_message_queue_free(&f->in_thread_queue); + av_thread_message_queue_free(&f->audio_duration_queue); } void free_input_threads(void) @@ -206,7 +213,7 @@ void free_input_threads(void) free_input_thread(i); } -int init_input_thread(int i) +static int init_input_thread(int i) { int ret; InputFile *f = input_files[i]; @@ -218,17 +225,38 @@ int init_input_thread(int i) strcmp(f->ctx->iformat->name, "lavfi")) f->non_blocking = 1; ret = av_thread_message_queue_alloc(&f->in_thread_queue, - f->thread_queue_size, sizeof(f->pkt)); + f->thread_queue_size, sizeof(DemuxMsg)); if (ret < 0) return ret; + if (f->loop) { + int nb_audio_dec = 0; + + for (int i = 0; i < f->nb_streams; i++) { + InputStream *ist = input_streams[f->ist_index + i]; + nb_audio_dec += !!(ist->decoding_needed && + ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO); + } + + if (nb_audio_dec) { + ret = av_thread_message_queue_alloc(&f->audio_duration_queue, + nb_audio_dec, sizeof(LastFrameDuration)); + if (ret < 0) + goto fail; + f->audio_duration_queue_size = nb_audio_dec; + } + } + if ((ret = pthread_create(&f->thread, NULL, input_thread, f))) { av_log(NULL, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); - av_thread_message_queue_free(&f->in_thread_queue); - return AVERROR(ret); + ret = AVERROR(ret); + goto fail; } return 0; +fail: + av_thread_message_queue_free(&f->in_thread_queue); + return ret; } int init_input_threads(void) @@ -245,6 +273,9 @@ int init_input_threads(void) int ifile_get_packet(InputFile *f, AVPacket **pkt) { + DemuxMsg msg; + int ret; + if (f->readrate || f->rate_emu) { int i; int64_t file_start = copy_ts * ( @@ -264,7 +295,14 @@ int ifile_get_packet(InputFile *f, AVPacket **pkt) } } - return av_thread_message_queue_recv(f->in_thread_queue, pkt, - f->non_blocking ? - AV_THREAD_MESSAGE_NONBLOCK : 0); + ret = av_thread_message_queue_recv(f->in_thread_queue, &msg, + f->non_blocking ? + AV_THREAD_MESSAGE_NONBLOCK : 0); + if (ret < 0) + return ret; + if (msg.looping) + return 1; + + *pkt = msg.pkt; + return 0; }