You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1114 lines
35 KiB

/*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/**
* @file
* Frame multithreading support functions
* @see doc/multithreading.txt
*/
#include <stdatomic.h>
#include "avcodec.h"
#include "avcodec_internal.h"
#include "codec_desc.h"
#include "codec_internal.h"
#include "decode.h"
#include "hwaccel_internal.h"
#include "hwconfig.h"
#include "internal.h"
#include "packet_internal.h"
#include "pthread_internal.h"
#include "refstruct.h"
#include "thread.h"
#include "threadframe.h"
#include "version_major.h"
#include "libavutil/avassert.h"
#include "libavutil/buffer.h"
#include "libavutil/common.h"
#include "libavutil/cpu.h"
#include "libavutil/frame.h"
#include "libavutil/internal.h"
#include "libavutil/log.h"
#include "libavutil/mem.h"
#include "libavutil/opt.h"
#include "libavutil/thread.h"
enum {
/// Set when the thread is awaiting a packet.
STATE_INPUT_READY,
/// Set before the codec has called ff_thread_finish_setup().
STATE_SETTING_UP,
/// Set after the codec has called ff_thread_finish_setup().
STATE_SETUP_FINISHED,
};
enum {
UNINITIALIZED, ///< Thread has not been created, AVCodec->close mustn't be called
NEEDS_CLOSE, ///< FFCodec->close needs to be called
INITIALIZED, ///< Thread has been properly set up
};
typedef struct DecodedFrames {
AVFrame **f;
size_t nb_f;
size_t nb_f_allocated;
} DecodedFrames;
typedef struct ThreadFrameProgress {
atomic_int progress[2];
} ThreadFrameProgress;
/**
* Context used by codec threads and stored in their AVCodecInternal thread_ctx.
*/
typedef struct PerThreadContext {
struct FrameThreadContext *parent;
pthread_t thread;
int thread_init;
unsigned pthread_init_cnt;///< Number of successfully initialized mutexes/conditions
pthread_cond_t input_cond; ///< Used to wait for a new packet from the main thread.
pthread_cond_t progress_cond; ///< Used by child threads to wait for progress to change.
pthread_cond_t output_cond; ///< Used by the main thread to wait for frames to finish.
pthread_mutex_t mutex; ///< Mutex used to protect the contents of the PerThreadContext.
pthread_mutex_t progress_mutex; ///< Mutex used to protect frame progress values and progress_cond.
AVCodecContext *avctx; ///< Context used to decode packets passed to this thread.
AVPacket *avpkt; ///< Input packet (for decoding) or output (for encoding).
/**
* Decoded frames from a single decode iteration.
*/
DecodedFrames df;
int result; ///< The result of the last codec decode/encode() call.
atomic_int state;
int die; ///< Set when the thread should exit.
int hwaccel_serializing;
int async_serializing;
// set to 1 in ff_thread_finish_setup() when a threadsafe hwaccel is used;
// cannot check hwaccel caps directly, because
// worked threads clear hwaccel state for thread-unsafe hwaccels
// after each decode call
int hwaccel_threadsafe;
atomic_int debug_threads; ///< Set if the FF_DEBUG_THREADS option is set.
/// The following two fields have the same semantics as the DecodeContext field
int intra_only_flag;
enum AVPictureType initial_pict_type;
} PerThreadContext;
/**
* Context stored in the client AVCodecInternal thread_ctx.
*/
typedef struct FrameThreadContext {
PerThreadContext *threads; ///< The contexts for each thread.
PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on.
unsigned pthread_init_cnt; ///< Number of successfully initialized mutexes/conditions
pthread_mutex_t buffer_mutex; ///< Mutex used to protect get/release_buffer().
/**
* This lock is used for ensuring threads run in serial when thread-unsafe
* hwaccel is used.
*/
pthread_mutex_t hwaccel_mutex;
pthread_mutex_t async_mutex;
pthread_cond_t async_cond;
int async_lock;
DecodedFrames df;
int result;
/**
* Packet to be submitted to the next thread for decoding.
*/
AVPacket *next_pkt;
int next_decoding; ///< The next context to submit a packet to.
int next_finished; ///< The next context to return output from.
/* hwaccel state for thread-unsafe hwaccels is temporarily stored here in
* order to transfer its ownership to the next decoding thread without the
* need for extra synchronization */
const AVHWAccel *stash_hwaccel;
void *stash_hwaccel_context;
void *stash_hwaccel_priv;
} FrameThreadContext;
static int hwaccel_serial(const AVCodecContext *avctx)
{
return avctx->hwaccel && !(ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE);
}
static void async_lock(FrameThreadContext *fctx)
{
pthread_mutex_lock(&fctx->async_mutex);
while (fctx->async_lock)
pthread_cond_wait(&fctx->async_cond, &fctx->async_mutex);
fctx->async_lock = 1;
pthread_mutex_unlock(&fctx->async_mutex);
}
static void async_unlock(FrameThreadContext *fctx)
{
pthread_mutex_lock(&fctx->async_mutex);
av_assert0(fctx->async_lock);
fctx->async_lock = 0;
pthread_cond_broadcast(&fctx->async_cond);
pthread_mutex_unlock(&fctx->async_mutex);
}
static void thread_set_name(PerThreadContext *p)
{
AVCodecContext *avctx = p->avctx;
int idx = p - p->parent->threads;
char name[16];
snprintf(name, sizeof(name), "av:%.7s:df%d", avctx->codec->name, idx);
ff_thread_setname(name);
}
// get a free frame to decode into
static AVFrame *decoded_frames_get_free(DecodedFrames *df)
{
if (df->nb_f == df->nb_f_allocated) {
AVFrame **tmp = av_realloc_array(df->f, df->nb_f + 1,
sizeof(*df->f));
if (!tmp)
return NULL;
df->f = tmp;
df->f[df->nb_f] = av_frame_alloc();
if (!df->f[df->nb_f])
return NULL;
df->nb_f_allocated++;
}
av_assert0(!df->f[df->nb_f]->buf[0]);
return df->f[df->nb_f];
}
static void decoded_frames_pop(DecodedFrames *df, AVFrame *dst)
{
AVFrame *tmp_frame = df->f[0];
av_frame_move_ref(dst, tmp_frame);
memmove(df->f, df->f + 1, (df->nb_f - 1) * sizeof(*df->f));
df->f[--df->nb_f] = tmp_frame;
}
static void decoded_frames_flush(DecodedFrames *df)
{
for (size_t i = 0; i < df->nb_f; i++)
av_frame_unref(df->f[i]);
df->nb_f = 0;
}
static void decoded_frames_free(DecodedFrames *df)
{
for (size_t i = 0; i < df->nb_f_allocated; i++)
av_frame_free(&df->f[i]);
av_freep(&df->f);
df->nb_f = 0;
df->nb_f_allocated = 0;
}
/**
* Codec worker thread.
*
* Automatically calls ff_thread_finish_setup() if the codec does
* not provide an update_thread_context method, or if the codec returns
* before calling it.
*/
static attribute_align_arg void *frame_worker_thread(void *arg)
{
PerThreadContext *p = arg;
AVCodecContext *avctx = p->avctx;
const FFCodec *codec = ffcodec(avctx->codec);
thread_set_name(p);
pthread_mutex_lock(&p->mutex);
while (1) {
int ret;
while (atomic_load(&p->state) == STATE_INPUT_READY && !p->die)
pthread_cond_wait(&p->input_cond, &p->mutex);
if (p->die) break;
if (!codec->update_thread_context)
ff_thread_finish_setup(avctx);
/* If a decoder supports hwaccel, then it must call ff_get_format().
* Since that call must happen before ff_thread_finish_setup(), the
* decoder is required to implement update_thread_context() and call
* ff_thread_finish_setup() manually. Therefore the above
* ff_thread_finish_setup() call did not happen and hwaccel_serializing
* cannot be true here. */
av_assert0(!p->hwaccel_serializing);
/* if the previous thread uses thread-unsafe hwaccel then we take the
* lock to ensure the threads don't run concurrently */
if (hwaccel_serial(avctx)) {
pthread_mutex_lock(&p->parent->hwaccel_mutex);
p->hwaccel_serializing = 1;
}
ret = 0;
while (ret >= 0) {
AVFrame *frame;
/* get the frame which will store the output */
frame = decoded_frames_get_free(&p->df);
if (!frame) {
p->result = AVERROR(ENOMEM);
goto alloc_fail;
}
/* do the actual decoding */
ret = ff_decode_receive_frame_internal(avctx, frame);
if (ret == 0)
p->df.nb_f++;
else if (ret < 0 && frame->buf[0])
av_frame_unref(frame);
p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret;
}
if (atomic_load(&p->state) == STATE_SETTING_UP)
ff_thread_finish_setup(avctx);
alloc_fail:
if (p->hwaccel_serializing) {
/* wipe hwaccel state for thread-unsafe hwaccels to avoid stale
* pointers lying around;
* the state was transferred to FrameThreadContext in
* ff_thread_finish_setup(), so nothing is leaked */
avctx->hwaccel = NULL;
avctx->hwaccel_context = NULL;
avctx->internal->hwaccel_priv_data = NULL;
p->hwaccel_serializing = 0;
pthread_mutex_unlock(&p->parent->hwaccel_mutex);
}
av_assert0(!avctx->hwaccel ||
(ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE));
if (p->async_serializing) {
p->async_serializing = 0;
async_unlock(p->parent);
}
pthread_mutex_lock(&p->progress_mutex);
atomic_store(&p->state, STATE_INPUT_READY);
pthread_cond_broadcast(&p->progress_cond);
pthread_cond_signal(&p->output_cond);
pthread_mutex_unlock(&p->progress_mutex);
}
pthread_mutex_unlock(&p->mutex);
return NULL;
}
/**
* Update the next thread's AVCodecContext with values from the reference thread's context.
*
* @param dst The destination context.
* @param src The source context.
* @param for_user 0 if the destination is a codec thread, 1 if the destination is the user's thread
* @return 0 on success, negative error code on failure
*/
static int update_context_from_thread(AVCodecContext *dst, const AVCodecContext *src, int for_user)
{
const FFCodec *const codec = ffcodec(dst->codec);
int err = 0;
if (dst != src && (for_user || codec->update_thread_context)) {
dst->time_base = src->time_base;
dst->framerate = src->framerate;
dst->width = src->width;
dst->height = src->height;
dst->pix_fmt = src->pix_fmt;
dst->sw_pix_fmt = src->sw_pix_fmt;
dst->coded_width = src->coded_width;
dst->coded_height = src->coded_height;
dst->has_b_frames = src->has_b_frames;
dst->idct_algo = src->idct_algo;
dst->properties = src->properties;
dst->bits_per_coded_sample = src->bits_per_coded_sample;
dst->sample_aspect_ratio = src->sample_aspect_ratio;
dst->profile = src->profile;
dst->level = src->level;
dst->bits_per_raw_sample = src->bits_per_raw_sample;
#if FF_API_TICKS_PER_FRAME
FF_DISABLE_DEPRECATION_WARNINGS
dst->ticks_per_frame = src->ticks_per_frame;
FF_ENABLE_DEPRECATION_WARNINGS
#endif
dst->color_primaries = src->color_primaries;
dst->color_trc = src->color_trc;
dst->colorspace = src->colorspace;
dst->color_range = src->color_range;
dst->chroma_sample_location = src->chroma_sample_location;
dst->sample_rate = src->sample_rate;
dst->sample_fmt = src->sample_fmt;
err = av_channel_layout_copy(&dst->ch_layout, &src->ch_layout);
if (err < 0)
return err;
if (!!dst->hw_frames_ctx != !!src->hw_frames_ctx ||
(dst->hw_frames_ctx && dst->hw_frames_ctx->data != src->hw_frames_ctx->data)) {
av_buffer_unref(&dst->hw_frames_ctx);
if (src->hw_frames_ctx) {
dst->hw_frames_ctx = av_buffer_ref(src->hw_frames_ctx);
if (!dst->hw_frames_ctx)
return AVERROR(ENOMEM);
}
}
dst->hwaccel_flags = src->hwaccel_flags;
ff_refstruct_replace(&dst->internal->pool, src->internal->pool);
}
if (for_user) {
if (codec->update_thread_context_for_user)
err = codec->update_thread_context_for_user(dst, src);
} else {
const PerThreadContext *p_src = src->internal->thread_ctx;
PerThreadContext *p_dst = dst->internal->thread_ctx;
if (codec->update_thread_context) {
err = codec->update_thread_context(dst, src);
if (err < 0)
return err;
}
// reset dst hwaccel state if needed
av_assert0(p_dst->hwaccel_threadsafe ||
(!dst->hwaccel && !dst->internal->hwaccel_priv_data));
if (p_dst->hwaccel_threadsafe &&
(!p_src->hwaccel_threadsafe || dst->hwaccel != src->hwaccel)) {
ff_hwaccel_uninit(dst);
p_dst->hwaccel_threadsafe = 0;
}
// propagate hwaccel state for threadsafe hwaccels
if (p_src->hwaccel_threadsafe) {
const FFHWAccel *hwaccel = ffhwaccel(src->hwaccel);
if (!dst->hwaccel) {
if (hwaccel->priv_data_size) {
av_assert0(hwaccel->update_thread_context);
dst->internal->hwaccel_priv_data =
av_mallocz(hwaccel->priv_data_size);
if (!dst->internal->hwaccel_priv_data)
return AVERROR(ENOMEM);
}
dst->hwaccel = src->hwaccel;
}
av_assert0(dst->hwaccel == src->hwaccel);
if (hwaccel->update_thread_context) {
err = hwaccel->update_thread_context(dst, src);
if (err < 0) {
av_log(dst, AV_LOG_ERROR, "Error propagating hwaccel state\n");
ff_hwaccel_uninit(dst);
return err;
}
}
p_dst->hwaccel_threadsafe = 1;
}
}
return err;
}
/**
* Update the next thread's AVCodecContext with values set by the user.
*
* @param dst The destination context.
* @param src The source context.
* @return 0 on success, negative error code on failure
*/
static int update_context_from_user(AVCodecContext *dst, const AVCodecContext *src)
{
int err;
dst->flags = src->flags;
dst->draw_horiz_band= src->draw_horiz_band;
dst->get_buffer2 = src->get_buffer2;
dst->opaque = src->opaque;
dst->debug = src->debug;
dst->slice_flags = src->slice_flags;
dst->flags2 = src->flags2;
dst->export_side_data = src->export_side_data;
dst->skip_loop_filter = src->skip_loop_filter;
dst->skip_idct = src->skip_idct;
dst->skip_frame = src->skip_frame;
dst->frame_num = src->frame_num;
av_packet_unref(dst->internal->last_pkt_props);
err = av_packet_copy_props(dst->internal->last_pkt_props, src->internal->last_pkt_props);
if (err < 0)
return err;
return 0;
}
static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
AVPacket *in_pkt)
{
FrameThreadContext *fctx = p->parent;
PerThreadContext *prev_thread = fctx->prev_thread;
const AVCodec *codec = p->avctx->codec;
int ret;
pthread_mutex_lock(&p->mutex);
av_packet_unref(p->avpkt);
av_packet_move_ref(p->avpkt, in_pkt);
if (AVPACKET_IS_EMPTY(p->avpkt))
p->avctx->internal->draining = 1;
ret = update_context_from_user(p->avctx, user_avctx);
if (ret) {
pthread_mutex_unlock(&p->mutex);
return ret;
}
atomic_store_explicit(&p->debug_threads,
(p->avctx->debug & FF_DEBUG_THREADS) != 0,
memory_order_relaxed);
if (prev_thread) {
if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
pthread_mutex_lock(&prev_thread->progress_mutex);
while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex);
pthread_mutex_unlock(&prev_thread->progress_mutex);
}
/* codecs without delay might not be prepared to be called repeatedly here during
* flushing (vp3/theora), and also don't need to be, since from this point on, they
* will always return EOF anyway */
if (!p->avctx->internal->draining ||
(codec->capabilities & AV_CODEC_CAP_DELAY)) {
ret = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
if (ret) {
pthread_mutex_unlock(&p->mutex);
return ret;
}
}
}
/* transfer the stashed hwaccel state, if any */
av_assert0(!p->avctx->hwaccel || p->hwaccel_threadsafe);
if (!p->hwaccel_threadsafe) {
FFSWAP(const AVHWAccel*, p->avctx->hwaccel, fctx->stash_hwaccel);
FFSWAP(void*, p->avctx->hwaccel_context, fctx->stash_hwaccel_context);
FFSWAP(void*, p->avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv);
}
atomic_store(&p->state, STATE_SETTING_UP);
pthread_cond_signal(&p->input_cond);
pthread_mutex_unlock(&p->mutex);
fctx->prev_thread = p;
fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count;
return 0;
}
int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
{
FrameThreadContext *fctx = avctx->internal->thread_ctx;
int ret = 0;
/* release the async lock, permitting blocked hwaccel threads to
* go forward while we are in this function */
async_unlock(fctx);
/* submit packets to threads while there are no buffered results to return */
while (!fctx->df.nb_f && !fctx->result) {
PerThreadContext *p;
/* get a packet to be submitted to the next thread */
av_packet_unref(fctx->next_pkt);
ret = ff_decode_get_packet(avctx, fctx->next_pkt);
if (ret < 0 && ret != AVERROR_EOF)
goto finish;
ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx,
fctx->next_pkt);
if (ret < 0)
goto finish;
/* do not return any frames until all threads have something to do */
if (fctx->next_decoding != fctx->next_finished &&
!avctx->internal->draining)
continue;
p = &fctx->threads[fctx->next_finished];
fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count;
if (atomic_load(&p->state) != STATE_INPUT_READY) {
pthread_mutex_lock(&p->progress_mutex);
while (atomic_load_explicit(&p->state, memory_order_relaxed) != STATE_INPUT_READY)
pthread_cond_wait(&p->output_cond, &p->progress_mutex);
pthread_mutex_unlock(&p->progress_mutex);
}
update_context_from_thread(avctx, p->avctx, 1);
fctx->result = p->result;
p->result = 0;
if (p->df.nb_f)
FFSWAP(DecodedFrames, fctx->df, p->df);
}
/* a thread may return multiple frames AND an error
* we first return all the frames, then the error */
if (fctx->df.nb_f) {
decoded_frames_pop(&fctx->df, frame);
ret = 0;
} else {
ret = fctx->result;
fctx->result = 0;
}
finish:
async_lock(fctx);
return ret;
}
void ff_thread_report_progress(ThreadFrame *f, int n, int field)
{
PerThreadContext *p;
atomic_int *progress = f->progress ? f->progress->progress : NULL;
if (!progress ||
atomic_load_explicit(&progress[field], memory_order_relaxed) >= n)
return;
p = f->owner[field]->internal->thread_ctx;
if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
av_log(f->owner[field], AV_LOG_DEBUG,
"%p finished %d field %d\n", progress, n, field);
pthread_mutex_lock(&p->progress_mutex);
atomic_store_explicit(&progress[field], n, memory_order_release);
pthread_cond_broadcast(&p->progress_cond);
pthread_mutex_unlock(&p->progress_mutex);
}
void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
{
PerThreadContext *p;
atomic_int *progress = f->progress ? f->progress->progress : NULL;
if (!progress ||
atomic_load_explicit(&progress[field], memory_order_acquire) >= n)
return;
p = f->owner[field]->internal->thread_ctx;
if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
av_log(f->owner[field], AV_LOG_DEBUG,
"thread awaiting %d field %d from %p\n", n, field, progress);
pthread_mutex_lock(&p->progress_mutex);
while (atomic_load_explicit(&progress[field], memory_order_relaxed) < n)
pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
pthread_mutex_unlock(&p->progress_mutex);
}
void ff_thread_finish_setup(AVCodecContext *avctx) {
PerThreadContext *p;
if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return;
p = avctx->internal->thread_ctx;
p->hwaccel_threadsafe = avctx->hwaccel &&
(ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE);
if (hwaccel_serial(avctx) && !p->hwaccel_serializing) {
pthread_mutex_lock(&p->parent->hwaccel_mutex);
p->hwaccel_serializing = 1;
}
/* this assumes that no hwaccel calls happen before ff_thread_finish_setup() */
if (avctx->hwaccel &&
!(ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_ASYNC_SAFE)) {
p->async_serializing = 1;
async_lock(p->parent);
}
/* thread-unsafe hwaccels share a single private data instance, so we
* save hwaccel state for passing to the next thread;
* this is done here so that this worker thread can wipe its own hwaccel
* state after decoding, without requiring synchronization */
av_assert0(!p->parent->stash_hwaccel);
if (hwaccel_serial(avctx)) {
p->parent->stash_hwaccel = avctx->hwaccel;
p->parent->stash_hwaccel_context = avctx->hwaccel_context;
p->parent->stash_hwaccel_priv = avctx->internal->hwaccel_priv_data;
}
pthread_mutex_lock(&p->progress_mutex);
if(atomic_load(&p->state) == STATE_SETUP_FINISHED){
av_log(avctx, AV_LOG_WARNING, "Multiple ff_thread_finish_setup() calls\n");
}
atomic_store(&p->state, STATE_SETUP_FINISHED);
pthread_cond_broadcast(&p->progress_cond);
pthread_mutex_unlock(&p->progress_mutex);
}
/// Waits for all threads to finish.
static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count)
{
int i;
async_unlock(fctx);
for (i = 0; i < thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
if (atomic_load(&p->state) != STATE_INPUT_READY) {
pthread_mutex_lock(&p->progress_mutex);
while (atomic_load(&p->state) != STATE_INPUT_READY)
pthread_cond_wait(&p->output_cond, &p->progress_mutex);
pthread_mutex_unlock(&p->progress_mutex);
}
}
async_lock(fctx);
}
#define OFF(member) offsetof(FrameThreadContext, member)
DEFINE_OFFSET_ARRAY(FrameThreadContext, thread_ctx, pthread_init_cnt,
(OFF(buffer_mutex), OFF(hwaccel_mutex), OFF(async_mutex)),
(OFF(async_cond)));
#undef OFF
#define OFF(member) offsetof(PerThreadContext, member)
DEFINE_OFFSET_ARRAY(PerThreadContext, per_thread, pthread_init_cnt,
(OFF(progress_mutex), OFF(mutex)),
(OFF(input_cond), OFF(progress_cond), OFF(output_cond)));
#undef OFF
void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
{
FrameThreadContext *fctx = avctx->internal->thread_ctx;
const FFCodec *codec = ffcodec(avctx->codec);
int i;
park_frame_worker_threads(fctx, thread_count);
for (i = 0; i < thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
AVCodecContext *ctx = p->avctx;
if (ctx->internal) {
if (p->thread_init == INITIALIZED) {
pthread_mutex_lock(&p->mutex);
p->die = 1;
pthread_cond_signal(&p->input_cond);
pthread_mutex_unlock(&p->mutex);
pthread_join(p->thread, NULL);
}
if (codec->close && p->thread_init != UNINITIALIZED)
codec->close(ctx);
/* When using a threadsafe hwaccel, this is where
* each thread's context is uninit'd and freed. */
ff_hwaccel_uninit(ctx);
if (ctx->priv_data) {
if (codec->p.priv_class)
av_opt_free(ctx->priv_data);
av_freep(&ctx->priv_data);
}
ff_refstruct_unref(&ctx->internal->pool);
av_packet_free(&ctx->internal->in_pkt);
av_packet_free(&ctx->internal->last_pkt_props);
av_freep(&ctx->internal);
av_buffer_unref(&ctx->hw_frames_ctx);
av_frame_side_data_free(&ctx->decoded_side_data,
&ctx->nb_decoded_side_data);
}
decoded_frames_free(&p->df);
ff_pthread_free(p, per_thread_offsets);
av_packet_free(&p->avpkt);
av_freep(&p->avctx);
}
decoded_frames_free(&fctx->df);
av_packet_free(&fctx->next_pkt);
av_freep(&fctx->threads);
ff_pthread_free(fctx, thread_ctx_offsets);
/* if we have stashed hwaccel state, move it to the user-facing context,
* so it will be freed in ff_codec_close() */
av_assert0(!avctx->hwaccel);
FFSWAP(const AVHWAccel*, avctx->hwaccel, fctx->stash_hwaccel);
FFSWAP(void*, avctx->hwaccel_context, fctx->stash_hwaccel_context);
FFSWAP(void*, avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv);
av_freep(&avctx->internal->thread_ctx);
}
static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
FrameThreadContext *fctx, AVCodecContext *avctx,
const FFCodec *codec, int first)
{
AVCodecContext *copy;
int err;
p->initial_pict_type = AV_PICTURE_TYPE_NONE;
if (avctx->codec_descriptor->props & AV_CODEC_PROP_INTRA_ONLY) {
p->intra_only_flag = AV_FRAME_FLAG_KEY;
if (avctx->codec_type == AVMEDIA_TYPE_VIDEO)
p->initial_pict_type = AV_PICTURE_TYPE_I;
}
atomic_init(&p->state, STATE_INPUT_READY);
copy = av_memdup(avctx, sizeof(*avctx));
if (!copy)
return AVERROR(ENOMEM);
copy->priv_data = NULL;
copy->decoded_side_data = NULL;
copy->nb_decoded_side_data = 0;
/* From now on, this PerThreadContext will be cleaned up by
* ff_frame_thread_free in case of errors. */
(*threads_to_free)++;
p->parent = fctx;
p->avctx = copy;
copy->internal = ff_decode_internal_alloc();
if (!copy->internal)
return AVERROR(ENOMEM);
copy->internal->thread_ctx = p;
avcodec/decode: Add new ProgressFrame API Frame-threaded decoders with inter-frame dependencies use the ThreadFrame API for syncing. It works as follows: During init each thread allocates an AVFrame for every ThreadFrame. Thread A reads the header of its packet and allocates a buffer for an AVFrame with ff_thread_get_ext_buffer() (which also allocates a small structure that is shared with other references to this frame) and sets its fields, including side data. Then said thread calls ff_thread_finish_setup(). From that moment onward it is not allowed to change any of the AVFrame fields at all any more, but it may change fields which are an indirection away, like the content of AVFrame.data or already existing side data. After thread A has called ff_thread_finish_setup(), another thread (the user one) calls the codec's update_thread_context callback which in turn calls ff_thread_ref_frame() which calls av_frame_ref() which reads every field of A's AVFrame; hence the above restriction on modifications of the AVFrame (as any modification of the AVFrame by A after ff_thread_finish_setup() would be a data race). Of course, this av_frame_ref() also incurs allocations and therefore needs to be checked. ff_thread_ref_frame() also references the small structure used for communicating progress. This av_frame_ref() makes it awkward to propagate values that only become known during decoding to later threads (in case of frame reordering or other mechanisms of delayed output (like show-existing-frames) it's not the decoding thread, but a later thread that returns the AVFrame). E.g. for VP9 when exporting video encoding parameters as side data the number of blocks only becomes known during decoding, so one can't allocate the side data before ff_thread_finish_setup(). It is currently being done afterwards and this leads to a data race in the vp9-encparams test when using frame-threading. Returning decode_error_flags is also complicated by this. To perform this exchange a buffer shared between the references is needed (notice that simply giving the later threads a pointer to the original AVFrame does not work, because said AVFrame will be reused lateron when thread A decodes the next packet given to it). One could extend the buffer already used for progress for this or use a new one (requiring yet another allocation), yet both of these approaches have the drawback of being unnatural, ugly and requiring quite a lot of ad-hoc code. E.g. in case of the VP9 side data mentioned above one could not simply use the helper that allocates and adds the side data to an AVFrame in one go. The ProgressFrame API meanwhile offers a different solution to all of this. It is based around the idea that the most natural shared object for sharing information about an AVFrame between decoding threads is the AVFrame itself. To actually implement this the AVFrame needs to be reference counted. This is achieved by putting a (ownership) pointer into a shared (and opaque) structure that is managed by the RefStruct API and which also contains the stuff necessary for progress reporting. The users get a pointer to this AVFrame with the understanding that the owner may set all the fields until it has indicated that it has finished decoding this AVFrame; then the users are allowed to read everything. Every decoder may of course employ a different contract than the one outlined above. Given that there is no underlying av_frame_ref(), creating references to a ProgressFrame can't fail. Only ff_thread_progress_get_buffer() can fail, but given that it will replace calls to ff_thread_get_ext_buffer() it is at places where errors are already expected and properly taken care of. The ProgressFrames are empty (i.e. the AVFrame pointer is NULL and the AVFrames are not allocated during init at all) while not being in use; ff_thread_progress_get_buffer() both sets up the actual ProgressFrame and already calls ff_thread_get_buffer(). So instead of checking for ThreadFrame.f->data[0] or ThreadFrame.f->buf[0] being NULL for "this reference frame is non-existing" one should check for ProgressFrame.f. This also implies that one can only set AVFrame properties after having allocated the buffer. This restriction is not deep: if it becomes onerous for any codec, ff_thread_progress_get_buffer() can be broken up. The user would then have to get a buffer himself. In order to avoid unnecessary allocations, the shared structure is pooled, so that both the structure as well as the AVFrame itself are reused. This means that there won't be lots of unnecessary allocations in case of non-frame-threaded decoding. It might even turn out to have fewer than the current code (the current code allocates AVFrames for every DPB slot, but these are often excessively large and not completely used; the new code allocates them on demand). Pooling relies on the reset function of the RefStruct pool API, it would be impossible to implement with the AVBufferPool API. Finally, ProgressFrames have no notion of owner; they are built on top of the ThreadProgress API which also lacks such a concept. Instead every ThreadProgress and every ProgressFrame contains its own mutex and condition variable, making it completely independent of pthread_frame.c. Just like the ThreadFrame API it is simply presumed that only the actual owner/producer of a frame reports progress on said frame. Signed-off-by: Andreas Rheinhardt <andreas.rheinhardt@outlook.com>
2 years ago
copy->internal->progress_frame_pool = avctx->internal->progress_frame_pool;
copy->delay = avctx->delay;
if (codec->priv_data_size) {
copy->priv_data = av_mallocz(codec->priv_data_size);
if (!copy->priv_data)
return AVERROR(ENOMEM);
if (codec->p.priv_class) {
*(const AVClass **)copy->priv_data = codec->p.priv_class;
err = av_opt_copy(copy->priv_data, avctx->priv_data);
if (err < 0)
return err;
}
}
err = ff_pthread_init(p, per_thread_offsets);
if (err < 0)
return err;
if (!(p->avpkt = av_packet_alloc()))
return AVERROR(ENOMEM);
copy->internal->is_frame_mt = 1;
if (!first)
copy->internal->is_copy = 1;
copy->internal->in_pkt = av_packet_alloc();
if (!copy->internal->in_pkt)
return AVERROR(ENOMEM);
copy->internal->last_pkt_props = av_packet_alloc();
if (!copy->internal->last_pkt_props)
return AVERROR(ENOMEM);
if (codec->init) {
err = codec->init(copy);
if (err < 0) {
if (codec->caps_internal & FF_CODEC_CAP_INIT_CLEANUP)
p->thread_init = NEEDS_CLOSE;
return err;
}
}
p->thread_init = NEEDS_CLOSE;
if (first) {
update_context_from_thread(avctx, copy, 1);
av_frame_side_data_free(&avctx->decoded_side_data, &avctx->nb_decoded_side_data);
for (int i = 0; i < copy->nb_decoded_side_data; i++) {
err = av_frame_side_data_clone(&avctx->decoded_side_data,
&avctx->nb_decoded_side_data,
copy->decoded_side_data[i], 0);
if (err < 0)
return err;
}
}
atomic_init(&p->debug_threads, (copy->debug & FF_DEBUG_THREADS) != 0);
err = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p));
if (err < 0)
return err;
p->thread_init = INITIALIZED;
return 0;
}
int ff_frame_thread_init(AVCodecContext *avctx)
{
int thread_count = avctx->thread_count;
const FFCodec *codec = ffcodec(avctx->codec);
FrameThreadContext *fctx;
int err, i = 0;
if (!thread_count) {
int nb_cpus = av_cpu_count();
// use number of cores + 1 as thread count if there is more than one
if (nb_cpus > 1)
thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
else
thread_count = avctx->thread_count = 1;
}
if (thread_count <= 1) {
avctx->active_thread_type = 0;
return 0;
}
avctx->internal->thread_ctx = fctx = av_mallocz(sizeof(FrameThreadContext));
if (!fctx)
return AVERROR(ENOMEM);
err = ff_pthread_init(fctx, thread_ctx_offsets);
if (err < 0) {
ff_pthread_free(fctx, thread_ctx_offsets);
av_freep(&avctx->internal->thread_ctx);
return err;
}
fctx->next_pkt = av_packet_alloc();
if (!fctx->next_pkt)
return AVERROR(ENOMEM);
fctx->async_lock = 1;
if (codec->p.type == AVMEDIA_TYPE_VIDEO)
avctx->delay = avctx->thread_count - 1;
fctx->threads = av_calloc(thread_count, sizeof(*fctx->threads));
if (!fctx->threads) {
err = AVERROR(ENOMEM);
goto error;
}
for (; i < thread_count; ) {
PerThreadContext *p = &fctx->threads[i];
int first = !i;
err = init_thread(p, &i, fctx, avctx, codec, first);
if (err < 0)
goto error;
}
return 0;
error:
ff_frame_thread_free(avctx, i);
return err;
}
void ff_thread_flush(AVCodecContext *avctx)
{
int i;
FrameThreadContext *fctx = avctx->internal->thread_ctx;
if (!fctx) return;
park_frame_worker_threads(fctx, avctx->thread_count);
if (fctx->prev_thread) {
if (fctx->prev_thread != &fctx->threads[0])
update_context_from_thread(fctx->threads[0].avctx, fctx->prev_thread->avctx, 0);
}
fctx->next_decoding = fctx->next_finished = 0;
fctx->prev_thread = NULL;
decoded_frames_flush(&fctx->df);
fctx->result = 0;
for (i = 0; i < avctx->thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
decoded_frames_flush(&p->df);
p->result = 0;
avcodec_flush_buffers(p->avctx);
}
}
int ff_thread_can_start_frame(AVCodecContext *avctx)
{
if ((avctx->active_thread_type & FF_THREAD_FRAME) &&
ffcodec(avctx->codec)->update_thread_context) {
PerThreadContext *p = avctx->internal->thread_ctx;
if (atomic_load(&p->state) != STATE_SETTING_UP)
return 0;
}
return 1;
}
static int thread_get_buffer_internal(AVCodecContext *avctx, AVFrame *f, int flags)
{
PerThreadContext *p;
int err;
if (!(avctx->active_thread_type & FF_THREAD_FRAME))
return ff_get_buffer(avctx, f, flags);
p = avctx->internal->thread_ctx;
if (atomic_load(&p->state) != STATE_SETTING_UP &&
ffcodec(avctx->codec)->update_thread_context) {
av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n");
return -1;
}
pthread_mutex_lock(&p->parent->buffer_mutex);
err = ff_get_buffer(avctx, f, flags);
pthread_mutex_unlock(&p->parent->buffer_mutex);
return err;
}
int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f, int flags)
{
int ret = thread_get_buffer_internal(avctx, f, flags);
if (ret < 0)
av_log(avctx, AV_LOG_ERROR, "thread_get_buffer() failed\n");
return ret;
}
int ff_thread_get_ext_buffer(AVCodecContext *avctx, ThreadFrame *f, int flags)
{
int ret;
f->owner[0] = f->owner[1] = avctx;
if (!(avctx->active_thread_type & FF_THREAD_FRAME))
return ff_get_buffer(avctx, f->f, flags);
avcodec/codec_internal: Remove FF_CODEC_CAP_ALLOCATE_PROGRESS Before commit f025b8e110b36c1cdb4fb56c4cd57aeca1767b5b, every frame-threaded decoder used ThreadFrames, even when they did not have any inter-frame dependencies at all. In order to distinguish those decoders that need the AVBuffer for progress communication from those that do not (to avoid the allocation for the latter), the former decoders were marked with the FF_CODEC_CAP_ALLOCATE_PROGRESS internal codec cap. Yet distinguishing these two can be done in a more natural way: Don't use ThreadFrames when not needed and split ff_thread_get_buffer() into a core function that calls the user's get_buffer2 callback and a wrapper around it that also allocates the progress AVBuffer. This has been done in 02220b88fc38ef9dd4f2d519f5d3e4151258b60c and since that commit the ALLOCATE_PROGRESS cap was nearly redundant. The only exception was WebP and VP8. WebP can contain VP8 and uses the VP8 decoder directly (i.e. they share the same AVCodecContext). Both decoders are frame-threaded and VP8 has inter-frame dependencies (in general, not in valid WebP) and therefore the ALLOCATE_PROGRESS cap. In order to avoid allocating progress in case of a frame-threaded WebP decoder the cap and the check for the cap has been kept in place. Yet now the VP8 decoder has been switched to use ProgressFrames and therefore there is just no reason any more for this check and the cap. This commit therefore removes both. Also change the value of FF_CODEC_CAP_USES_PROGRESSFRAMES to leave no gaps. Signed-off-by: Andreas Rheinhardt <andreas.rheinhardt@outlook.com>
1 year ago
f->progress = ff_refstruct_allocz(sizeof(*f->progress));
if (!f->progress)
return AVERROR(ENOMEM);
avcodec/codec_internal: Remove FF_CODEC_CAP_ALLOCATE_PROGRESS Before commit f025b8e110b36c1cdb4fb56c4cd57aeca1767b5b, every frame-threaded decoder used ThreadFrames, even when they did not have any inter-frame dependencies at all. In order to distinguish those decoders that need the AVBuffer for progress communication from those that do not (to avoid the allocation for the latter), the former decoders were marked with the FF_CODEC_CAP_ALLOCATE_PROGRESS internal codec cap. Yet distinguishing these two can be done in a more natural way: Don't use ThreadFrames when not needed and split ff_thread_get_buffer() into a core function that calls the user's get_buffer2 callback and a wrapper around it that also allocates the progress AVBuffer. This has been done in 02220b88fc38ef9dd4f2d519f5d3e4151258b60c and since that commit the ALLOCATE_PROGRESS cap was nearly redundant. The only exception was WebP and VP8. WebP can contain VP8 and uses the VP8 decoder directly (i.e. they share the same AVCodecContext). Both decoders are frame-threaded and VP8 has inter-frame dependencies (in general, not in valid WebP) and therefore the ALLOCATE_PROGRESS cap. In order to avoid allocating progress in case of a frame-threaded WebP decoder the cap and the check for the cap has been kept in place. Yet now the VP8 decoder has been switched to use ProgressFrames and therefore there is just no reason any more for this check and the cap. This commit therefore removes both. Also change the value of FF_CODEC_CAP_USES_PROGRESSFRAMES to leave no gaps. Signed-off-by: Andreas Rheinhardt <andreas.rheinhardt@outlook.com>
1 year ago
atomic_init(&f->progress->progress[0], -1);
atomic_init(&f->progress->progress[1], -1);
ret = ff_thread_get_buffer(avctx, f->f, flags);
if (ret)
ff_refstruct_unref(&f->progress);
return ret;
}
void ff_thread_release_ext_buffer(ThreadFrame *f)
{
ff_refstruct_unref(&f->progress);
f->owner[0] = f->owner[1] = NULL;
if (f->f)
av_frame_unref(f->f);
}
enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset)
{
PerThreadContext *p;
const void *ref;
if (!avctx->internal->is_copy)
return avctx->active_thread_type & FF_THREAD_FRAME ?
FF_THREAD_IS_FIRST_THREAD : FF_THREAD_NO_FRAME_THREADING;
p = avctx->internal->thread_ctx;
av_assert1(memcpy(&ref, (char*)avctx->priv_data + offset, sizeof(ref)) && ref == NULL);
memcpy(&ref, (const char*)p->parent->threads[0].avctx->priv_data + offset, sizeof(ref));
av_assert1(ref);
ff_refstruct_replace((char*)avctx->priv_data + offset, ref);
return FF_THREAD_IS_COPY;
}
int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt)
{
PerThreadContext *p = avctx->internal->thread_ctx;
if (!AVPACKET_IS_EMPTY(p->avpkt)) {
av_packet_move_ref(pkt, p->avpkt);
return 0;
}
return avctx->internal->draining ? AVERROR_EOF : AVERROR(EAGAIN);
}