diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 61fcda2526..cf8a50bffc 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1043,9 +1043,6 @@ static int check_keyboard_interaction(int64_t cur_time) static void reset_eagain(void) { - int i; - for (i = 0; i < nb_input_files; i++) - input_files[i]->eagain = 0; for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) ost->unavailable = 0; } @@ -1069,19 +1066,14 @@ static void decode_flush(InputFile *ifile) * this function should be called again * - AVERROR_EOF -- this function should not be called again */ -static int process_input(int file_index) +static int process_input(int file_index, AVPacket *pkt) { InputFile *ifile = input_files[file_index]; InputStream *ist; - AVPacket *pkt; int ret, i; - ret = ifile_get_packet(ifile, &pkt); + ret = ifile_get_packet(ifile, pkt); - if (ret == AVERROR(EAGAIN)) { - ifile->eagain = 1; - return ret; - } if (ret == 1) { /* the input file is looped: flush the decoders */ decode_flush(ifile); @@ -1128,7 +1120,7 @@ static int process_input(int file_index) ret = process_input_packet(ist, pkt, 0); - av_packet_free(&pkt); + av_packet_unref(pkt); return ret < 0 ? ret : 0; } @@ -1138,7 +1130,7 @@ static int process_input(int file_index) * * @return 0 for success, <0 for error */ -static int transcode_step(OutputStream *ost) +static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) { InputStream *ist = NULL; int ret; @@ -1153,10 +1145,8 @@ static int transcode_step(OutputStream *ost) av_assert0(ist); } - ret = process_input(ist->file_index); + ret = process_input(ist->file_index, demux_pkt); if (ret == AVERROR(EAGAIN)) { - if (input_files[ist->file_index]->eagain) - ost->unavailable = 1; return 0; } @@ -1182,12 +1172,19 @@ static int transcode(int *err_rate_exceeded) int ret = 0, i; InputStream *ist; int64_t timer_start; + AVPacket *demux_pkt = NULL; print_stream_maps(); *err_rate_exceeded = 0; atomic_store(&transcode_init_done, 1); + demux_pkt = av_packet_alloc(); + if (!demux_pkt) { + ret = AVERROR(ENOMEM); + goto fail; + } + if (stdin_interaction) { av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n"); } @@ -1215,7 +1212,7 @@ static int transcode(int *err_rate_exceeded) break; } - ret = transcode_step(ost); + ret = transcode_step(ost, demux_pkt); if (ret < 0 && ret != AVERROR_EOF) { av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); break; @@ -1256,6 +1253,9 @@ static int transcode(int *err_rate_exceeded) /* dump report by using the first video and audio streams */ print_report(1, timer_start, av_gettime_relative()); +fail: + av_packet_free(&demux_pkt); + return ret; } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index f50222472c..3c153021f8 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -407,7 +407,6 @@ typedef struct InputFile { AVFormatContext *ctx; int eof_reached; /* true if eof reached */ - int eagain; /* true if last read attempt returned EAGAIN */ int64_t input_ts_offset; int input_sync_ref; /** @@ -859,7 +858,7 @@ void ifile_close(InputFile **f); * caller should flush decoders and read from this demuxer again * - a negative error code on failure */ -int ifile_get_packet(InputFile *f, AVPacket **pkt); +int ifile_get_packet(InputFile *f, AVPacket *pkt); int ist_output_add(InputStream *ist, OutputStream *ost); int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 791952f120..65a5e08ca5 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -21,6 +21,8 @@ #include "ffmpeg.h" #include "ffmpeg_utils.h" +#include "objpool.h" +#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -33,7 +35,6 @@ #include "libavutil/time.h" #include "libavutil/timestamp.h" #include "libavutil/thread.h" -#include "libavutil/threadmessage.h" #include "libavcodec/packet.h" @@ -107,19 +108,13 @@ typedef struct Demuxer { double readrate_initial_burst; - AVThreadMessageQueue *in_thread_queue; + ThreadQueue *thread_queue; int thread_queue_size; pthread_t thread; - int non_blocking; int read_started; } Demuxer; -typedef struct DemuxMsg { - AVPacket *pkt; - int looping; -} DemuxMsg; - static DemuxStream *ds_from_ist(InputStream *ist) { return (DemuxStream*)ist; @@ -440,26 +435,16 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) return 0; } -// process an input packet into a message to send to the consumer thread -// src is always cleared by this function -static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) +static int input_packet_process(Demuxer *d, AVPacket *pkt) { InputFile *f = &d->f; - InputStream *ist = f->streams[src->stream_index]; + InputStream *ist = f->streams[pkt->stream_index]; DemuxStream *ds = ds_from_ist(ist); - AVPacket *pkt; int ret = 0; - pkt = av_packet_alloc(); - if (!pkt) { - av_packet_unref(src); - return AVERROR(ENOMEM); - } - av_packet_move_ref(pkt, src); - ret = ts_fixup(d, pkt); if (ret < 0) - goto fail; + return ret; ds->data_size += pkt->size; ds->nb_packets++; @@ -475,13 +460,7 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q)); } - msg->pkt = pkt; - pkt = NULL; - -fail: - av_packet_free(&pkt); - - return ret; + return 0; } static void readrate_sleep(Demuxer *d) @@ -531,7 +510,6 @@ static void *input_thread(void *arg) Demuxer *d = arg; InputFile *f = &d->f; AVPacket *pkt; - unsigned flags = d->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0; int ret = 0; pkt = av_packet_alloc(); @@ -547,8 +525,6 @@ static void *input_thread(void *arg) d->wallclock_start = av_gettime_relative(); while (1) { - DemuxMsg msg = { NULL }; - ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -558,8 +534,8 @@ static void *input_thread(void *arg) if (ret < 0) { if (d->loop) { /* signal looping to the consumer thread */ - msg.looping = 1; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, 0); + pkt->stream_index = -1; + ret = tq_send(d->thread_queue, 0, pkt); if (ret >= 0) ret = seek_to_start(d); if (ret >= 0) @@ -602,35 +578,26 @@ static void *input_thread(void *arg) } } - ret = input_packet_process(d, &msg, pkt); + ret = input_packet_process(d, pkt); if (ret < 0) break; if (f->readrate) readrate_sleep(d); - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - if (flags && ret == AVERROR(EAGAIN)) { - flags = 0; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - av_log(f, AV_LOG_WARNING, - "Thread message queue blocking; consider raising the " - "thread_queue_size option (current value: %d)\n", - d->thread_queue_size); - } + ret = tq_send(d->thread_queue, 0, pkt); if (ret < 0) { if (ret != AVERROR_EOF) av_log(f, AV_LOG_ERROR, "Unable to send packet to main thread: %s\n", av_err2str(ret)); - av_packet_free(&msg.pkt); break; } } finish: av_assert0(ret < 0); - av_thread_message_queue_set_err_recv(d->in_thread_queue, ret); + tq_send_finish(d->thread_queue, 0); av_packet_free(&pkt); @@ -642,16 +609,16 @@ finish: static void thread_stop(Demuxer *d) { InputFile *f = &d->f; - DemuxMsg msg; - if (!d->in_thread_queue) + if (!d->thread_queue) return; - av_thread_message_queue_set_err_send(d->in_thread_queue, AVERROR_EOF); - while (av_thread_message_queue_recv(d->in_thread_queue, &msg, 0) >= 0) - av_packet_free(&msg.pkt); + + tq_receive_finish(d->thread_queue, 0); pthread_join(d->thread, NULL); - av_thread_message_queue_free(&d->in_thread_queue); + + tq_free(&d->thread_queue); + av_thread_message_queue_free(&f->audio_ts_queue); } @@ -659,18 +626,20 @@ static int thread_start(Demuxer *d) { int ret; InputFile *f = &d->f; + ObjPool *op; if (d->thread_queue_size <= 0) d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); - if (nb_input_files > 1 && - (f->ctx->pb ? !f->ctx->pb->seekable : - strcmp(f->ctx->iformat->name, "lavfi"))) - d->non_blocking = 1; - ret = av_thread_message_queue_alloc(&d->in_thread_queue, - d->thread_queue_size, sizeof(DemuxMsg)); - if (ret < 0) - return ret; + op = objpool_alloc_packets(); + if (!op) + return AVERROR(ENOMEM); + + d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); + if (!d->thread_queue) { + objpool_free(&op); + return AVERROR(ENOMEM); + } if (d->loop) { int nb_audio_dec = 0; @@ -700,31 +669,30 @@ static int thread_start(Demuxer *d) return 0; fail: - av_thread_message_queue_free(&d->in_thread_queue); + tq_free(&d->thread_queue); return ret; } -int ifile_get_packet(InputFile *f, AVPacket **pkt) +int ifile_get_packet(InputFile *f, AVPacket *pkt) { Demuxer *d = demuxer_from_ifile(f); - DemuxMsg msg; - int ret; + int ret, dummy; - if (!d->in_thread_queue) { + if (!d->thread_queue) { ret = thread_start(d); if (ret < 0) return ret; } - ret = av_thread_message_queue_recv(d->in_thread_queue, &msg, - d->non_blocking ? - AV_THREAD_MESSAGE_NONBLOCK : 0); + ret = tq_receive(d->thread_queue, &dummy, pkt); if (ret < 0) return ret; - if (msg.looping) + + if (pkt->stream_index == -1) { + av_assert0(!pkt->data && !pkt->side_data_elems); return 1; + } - *pkt = msg.pkt; return 0; } diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index 1df1212442..d93fc74e07 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -2010,9 +2010,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) for (int i = 0; i < fg->nb_inputs; i++) { InputFilter *ifilter = fg->inputs[i]; InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - InputStream *ist = ifp->ist; - if (input_files[ist->file_index]->eagain || fgt->eof_in[i]) + if (fgt->eof_in[i]) continue; nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter); @@ -2022,6 +2021,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) } } + av_assert0(best_input >= 0); + return best_input; }