@ -32,6 +32,7 @@
# include "hwaccel_internal.h"
# include "hwconfig.h"
# include "internal.h"
# include "packet_internal.h"
# include "pthread_internal.h"
# include "refstruct.h"
# include "thread.h"
@ -64,6 +65,12 @@ enum {
INITIALIZED , ///< Thread has been properly set up
} ;
typedef struct DecodedFrames {
AVFrame * * f ;
size_t nb_f ;
size_t nb_f_allocated ;
} DecodedFrames ;
typedef struct ThreadFrameProgress {
atomic_int progress [ 2 ] ;
} ThreadFrameProgress ;
@ -88,8 +95,10 @@ typedef struct PerThreadContext {
AVPacket * avpkt ; ///< Input packet (for decoding) or output (for encoding).
AVFrame * frame ; ///< Output frame (for decoding) or input (for encoding).
int got_frame ; ///< The output of got_picture_ptr from the last avcodec_decode_video() call.
/**
* Decoded frames from a single decode iteration .
*/
DecodedFrames df ;
int result ; ///< The result of the last codec decode/encode() call.
atomic_int state ;
@ -130,14 +139,17 @@ typedef struct FrameThreadContext {
pthread_cond_t async_cond ;
int async_lock ;
DecodedFrames df ;
int result ;
/**
* Packet to be submitted to the next thread for decoding .
*/
AVPacket * next_pkt ;
int next_decoding ; ///< The next context to submit a packet to.
int next_finished ; ///< The next context to return output from.
int delaying ; /**<
* Set for the first N packets , where N is the number of threads .
* While it is set , ff_thread_en / decode_frame won ' t return any results .
*/
/* hwaccel state for thread-unsafe hwaccels is temporarily stored here in
* order to transfer its ownership to the next decoding thread without the
* need for extra synchronization */
@ -180,6 +192,52 @@ static void thread_set_name(PerThreadContext *p)
ff_thread_setname ( name ) ;
}
// get a free frame to decode into
static AVFrame * decoded_frames_get_free ( DecodedFrames * df )
{
if ( df - > nb_f = = df - > nb_f_allocated ) {
AVFrame * * tmp = av_realloc_array ( df - > f , df - > nb_f + 1 ,
sizeof ( * df - > f ) ) ;
if ( ! tmp )
return NULL ;
df - > f = tmp ;
df - > f [ df - > nb_f ] = av_frame_alloc ( ) ;
if ( ! df - > f [ df - > nb_f ] )
return NULL ;
df - > nb_f_allocated + + ;
}
av_assert0 ( ! df - > f [ df - > nb_f ] - > buf [ 0 ] ) ;
return df - > f [ df - > nb_f ] ;
}
static void decoded_frames_pop ( DecodedFrames * df , AVFrame * dst )
{
AVFrame * tmp_frame = df - > f [ 0 ] ;
av_frame_move_ref ( dst , tmp_frame ) ;
memmove ( df - > f , df - > f + 1 , ( df - > nb_f - 1 ) * sizeof ( * df - > f ) ) ;
df - > f [ - - df - > nb_f ] = tmp_frame ;
}
static void decoded_frames_flush ( DecodedFrames * df )
{
for ( size_t i = 0 ; i < df - > nb_f ; i + + )
av_frame_unref ( df - > f [ i ] ) ;
df - > nb_f = 0 ;
}
static void decoded_frames_free ( DecodedFrames * df )
{
for ( size_t i = 0 ; i < df - > nb_f_allocated ; i + + )
av_frame_free ( & df - > f [ i ] ) ;
av_freep ( & df - > f ) ;
df - > nb_f = 0 ;
df - > nb_f_allocated = 0 ;
}
/**
* Codec worker thread .
*
@ -197,6 +255,8 @@ static attribute_align_arg void *frame_worker_thread(void *arg)
pthread_mutex_lock ( & p - > mutex ) ;
while ( 1 ) {
int ret ;
while ( atomic_load ( & p - > state ) = = STATE_INPUT_READY & & ! p - > die )
pthread_cond_wait ( & p - > input_cond , & p - > mutex ) ;
@ -220,18 +280,31 @@ static attribute_align_arg void *frame_worker_thread(void *arg)
p - > hwaccel_serializing = 1 ;
}
av_frame_unref ( p - > frame ) ;
p - > got_frame = 0 ;
p - > frame - > pict_type = p - > initial_pict_type ;
p - > frame - > flags | = p - > intra_only_flag ;
p - > result = codec - > cb . decode ( avctx , p - > frame , & p - > got_frame , p - > avpkt ) ;
ret = 0 ;
while ( ret > = 0 ) {
AVFrame * frame ;
if ( ( p - > result < 0 | | ! p - > got_frame ) & & p - > frame - > buf [ 0 ] )
av_frame_unref ( p - > frame ) ;
/* get the frame which will store the output */
frame = decoded_frames_get_free ( & p - > df ) ;
if ( ! frame ) {
p - > result = AVERROR ( ENOMEM ) ;
goto alloc_fail ;
}
/* do the actual decoding */
ret = ff_decode_receive_frame_internal ( avctx , frame ) ;
if ( ret = = 0 )
p - > df . nb_f + + ;
else if ( ret < 0 & & frame - > buf [ 0 ] )
av_frame_unref ( frame ) ;
p - > result = ( ret = = AVERROR ( EAGAIN ) ) ? 0 : ret ;
}
if ( atomic_load ( & p - > state ) = = STATE_SETTING_UP )
ff_thread_finish_setup ( avctx ) ;
alloc_fail :
if ( p - > hwaccel_serializing ) {
/* wipe hwaccel state for thread-unsafe hwaccels to avoid stale
* pointers lying around ;
@ -426,18 +499,21 @@ static int update_context_from_user(AVCodecContext *dst, const AVCodecContext *s
}
static int submit_packet ( PerThreadContext * p , AVCodecContext * user_avctx ,
AVPacket * av pkt)
AVPacket * in_ pkt)
{
FrameThreadContext * fctx = p - > parent ;
PerThreadContext * prev_thread = fctx - > prev_thread ;
const AVCodec * codec = p - > avctx - > codec ;
int ret ;
if ( ! avpkt - > size & & ! ( codec - > capabilities & AV_CODEC_CAP_DELAY ) )
return 0 ;
pthread_mutex_lock ( & p - > mutex ) ;
av_packet_unref ( p - > avpkt ) ;
av_packet_move_ref ( p - > avpkt , in_pkt ) ;
if ( AVPACKET_IS_EMPTY ( p - > avpkt ) )
p - > avctx - > internal - > draining = 1 ;
ret = update_context_from_user ( p - > avctx , user_avctx ) ;
if ( ret ) {
pthread_mutex_unlock ( & p - > mutex ) ;
@ -448,7 +524,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
memory_order_relaxed ) ;
if ( prev_thread ) {
int err ;
if ( atomic_load ( & prev_thread - > state ) = = STATE_SETTING_UP ) {
pthread_mutex_lock ( & prev_thread - > progress_mutex ) ;
while ( atomic_load ( & prev_thread - > state ) = = STATE_SETTING_UP )
@ -456,10 +531,16 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
pthread_mutex_unlock ( & prev_thread - > progress_mutex ) ;
}
err = update_context_from_thread ( p - > avctx , prev_thread - > avctx , 0 ) ;
if ( err ) {
pthread_mutex_unlock ( & p - > mutex ) ;
return err ;
/* codecs without delay might not be prepared to be called repeatedly here during
* flushing ( vp3 / theora ) , and also don ' t need to be , since from this point on , they
* will always return EOF anyway */
if ( ! p - > avctx - > internal - > draining | |
( codec - > capabilities & AV_CODEC_CAP_DELAY ) ) {
ret = update_context_from_thread ( p - > avctx , prev_thread - > avctx , 0 ) ;
if ( ret ) {
pthread_mutex_unlock ( & p - > mutex ) ;
return ret ;
}
}
}
@ -471,70 +552,47 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
FFSWAP ( void * , p - > avctx - > internal - > hwaccel_priv_data , fctx - > stash_hwaccel_priv ) ;
}
av_packet_unref ( p - > avpkt ) ;
ret = av_packet_ref ( p - > avpkt , avpkt ) ;
if ( ret < 0 ) {
pthread_mutex_unlock ( & p - > mutex ) ;
av_log ( p - > avctx , AV_LOG_ERROR , " av_packet_ref() failed in submit_packet() \n " ) ;
return ret ;
}
atomic_store ( & p - > state , STATE_SETTING_UP ) ;
pthread_cond_signal ( & p - > input_cond ) ;
pthread_mutex_unlock ( & p - > mutex ) ;
fctx - > prev_thread = p ;
fctx - > next_decoding + + ;
fctx - > next_decoding = ( fctx - > next_decoding + 1 ) % p - > avctx - > thread_count ;
return 0 ;
}
int ff_thread_decode_frame ( AVCodecContext * avctx ,
AVFrame * picture , int * got_picture_ptr ,
AVPacket * avpkt )
int ff_thread_receive_frame ( AVCodecContext * avctx , AVFrame * frame )
{
FrameThreadContext * fctx = avctx - > internal - > thread_ctx ;
int finished = fctx - > next_finished ;
PerThreadContext * p ;
int err ;
int ret = 0 ;
/* release the async lock, permitting blocked hwaccel threads to
* go forward while we are in this function */
async_unlock ( fctx ) ;
/*
* Submit a packet to the next decoding thread .
*/
p = & fctx - > threads [ fctx - > next_decoding ] ;
err = submit_packet ( p , avctx , avpkt ) ;
if ( err )
goto finish ;
/*
* If we ' re still receiving the initial packets , don ' t return a frame .
*/
if ( fctx - > next_decoding > ( avctx - > thread_count - 1 - ( avctx - > codec_id = = AV_CODEC_ID_FFV1 ) ) )
fctx - > delaying = 0 ;
/* submit packets to threads while there are no buffered results to return */
while ( ! fctx - > df . nb_f & & ! fctx - > result ) {
PerThreadContext * p ;
if ( fctx - > delaying ) {
* got_picture_ptr = 0 ;
if ( avpkt - > size ) {
err = avpkt - > size ;
/* get a packet to be submitted to the next thread */
av_packet_unref ( fctx - > next_pkt ) ;
ret = ff_decode_get_packet ( avctx , fctx - > next_pkt ) ;
if ( ret < 0 & & ret ! = AVERROR_EOF )
goto finish ;
}
}
/*
* Return the next available frame from the oldest thread .
* If we ' re at the end of the stream , then we have to skip threads that
* didn ' t output a frame / error , because we don ' t want to accidentally signal
* EOF ( avpkt - > size = = 0 & & * got_picture_ptr = = 0 & & err > = 0 ) .
*/
ret = submit_packet ( & fctx - > threads [ fctx - > next_decoding ] , avctx ,
fctx - > next_pkt ) ;
if ( ret < 0 )
goto finish ;
do {
p = & fctx - > threads [ finished + + ] ;
/* do not return any frames until all threads have something to do */
if ( fctx - > next_decoding ! = fctx - > next_finished & &
! avctx - > internal - > draining )
continue ;
p = & fctx - > threads [ fctx - > next_finished ] ;
fctx - > next_finished = ( fctx - > next_finished + 1 ) % avctx - > thread_count ;
if ( atomic_load ( & p - > state ) ! = STATE_INPUT_READY ) {
pthread_mutex_lock ( & p - > progress_mutex ) ;
@ -543,35 +601,26 @@ int ff_thread_decode_frame(AVCodecContext *avctx,
pthread_mutex_unlock ( & p - > progress_mutex ) ;
}
av_frame_move_ref ( picture , p - > frame ) ;
* got_picture_ptr = p - > got_frame ;
picture - > pkt_dts = p - > avpkt - > dts ;
err = p - > result ;
/*
* A later call with avkpt - > size = = 0 may loop over all threads ,
* including this one , searching for a frame / error to return before being
* stopped by the " finished != fctx->next_finished " condition .
* Make sure we don ' t mistakenly return the same frame / error again .
*/
p - > got_frame = 0 ;
p - > result = 0 ;
if ( finished > = avctx - > thread_count ) finished = 0 ;
} while ( ! avpkt - > size & & ! * got_picture_ptr & & err > = 0 & & finished ! = fctx - > next_finished ) ;
update_context_from_thread ( avctx , p - > avctx , 1 ) ;
if ( fctx - > next_decoding > = avctx - > thread_count ) fctx - > next_decoding = 0 ;
update_context_from_thread ( avctx , p - > avctx , 1 ) ;
fctx - > result = p - > result ;
p - > result = 0 ;
if ( p - > df . nb_f )
FFSWAP ( DecodedFrames , fctx - > df , p - > df ) ;
}
fctx - > next_finished = finished ;
/* a thread may return multiple frames AND an error
* we first return all the frames , then the error */
if ( fctx - > df . nb_f ) {
decoded_frames_pop ( & fctx - > df , frame ) ;
ret = 0 ;
} else {
ret = fctx - > result ;
fctx - > result = 0 ;
}
/* return the size of the consumed packet if no error occurred */
if ( err > = 0 )
err = avpkt - > size ;
finish :
async_lock ( fctx ) ;
return err ;
return ret ;
}
void ff_thread_report_progress ( ThreadFrame * f , int n , int field )
@ -679,7 +728,6 @@ static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count
pthread_cond_wait ( & p - > output_cond , & p - > progress_mutex ) ;
pthread_mutex_unlock ( & p - > progress_mutex ) ;
}
p - > got_frame = 0 ;
}
async_lock ( fctx ) ;
@ -732,6 +780,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
}
ff_refstruct_unref ( & ctx - > internal - > pool ) ;
av_packet_free ( & ctx - > internal - > in_pkt ) ;
av_packet_free ( & ctx - > internal - > last_pkt_props ) ;
av_freep ( & ctx - > internal ) ;
av_buffer_unref ( & ctx - > hw_frames_ctx ) ;
@ -739,7 +788,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
& ctx - > nb_decoded_side_data ) ;
}
av_frame _free( & p - > frame ) ;
decoded_frames _free( & p - > d f) ;
ff_pthread_free ( p , per_thread_offsets ) ;
av_packet_free ( & p - > avpkt ) ;
@ -747,6 +796,9 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_freep ( & p - > avctx ) ;
}
decoded_frames_free ( & fctx - > df ) ;
av_packet_free ( & fctx - > next_pkt ) ;
av_freep ( & fctx - > threads ) ;
ff_pthread_free ( fctx , thread_ctx_offsets ) ;
@ -815,13 +867,17 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
if ( err < 0 )
return err ;
if ( ! ( p - > frame = av_frame_alloc ( ) ) | |
! ( p - > avpkt = av_packet_alloc ( ) ) )
if ( ! ( p - > avpkt = av_packet_alloc ( ) ) )
return AVERROR ( ENOMEM ) ;
copy - > internal - > is_frame_mt = 1 ;
if ( ! first )
copy - > internal - > is_copy = 1 ;
copy - > internal - > in_pkt = av_packet_alloc ( ) ;
if ( ! copy - > internal - > in_pkt )
return AVERROR ( ENOMEM ) ;
copy - > internal - > last_pkt_props = av_packet_alloc ( ) ;
if ( ! copy - > internal - > last_pkt_props )
return AVERROR ( ENOMEM ) ;
@ -891,8 +947,11 @@ int ff_frame_thread_init(AVCodecContext *avctx)
return err ;
}
fctx - > next_pkt = av_packet_alloc ( ) ;
if ( ! fctx - > next_pkt )
return AVERROR ( ENOMEM ) ;
fctx - > async_lock = 1 ;
fctx - > delaying = 1 ;
if ( codec - > p . type = = AVMEDIA_TYPE_VIDEO )
avctx - > delay = avctx - > thread_count - 1 ;
@ -933,17 +992,18 @@ void ff_thread_flush(AVCodecContext *avctx)
}
fctx - > next_decoding = fctx - > next_finished = 0 ;
fctx - > delaying = 1 ;
fctx - > prev_thread = NULL ;
decoded_frames_flush ( & fctx - > df ) ;
fctx - > result = 0 ;
for ( i = 0 ; i < avctx - > thread_count ; i + + ) {
PerThreadContext * p = & fctx - > threads [ i ] ;
// Make sure decode flush calls with size=0 won't return old frames
p - > got_frame = 0 ;
av_frame_unref ( p - > frame ) ;
decoded_frames_flush ( & p - > df ) ;
p - > result = 0 ;
if ( ffcodec ( avctx - > codec ) - > flush )
ffcodec ( avctx - > codec ) - > flush ( p - > avctx ) ;
avcodec_flush_buffers ( p - > avctx ) ;
}
}
@ -1039,3 +1099,15 @@ enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset)
return FF_THREAD_IS_COPY ;
}
int ff_thread_get_packet ( AVCodecContext * avctx , AVPacket * pkt )
{
PerThreadContext * p = avctx - > internal - > thread_ctx ;
if ( ! AVPACKET_IS_EMPTY ( p - > avpkt ) ) {
av_packet_move_ref ( pkt , p - > avpkt ) ;
return 0 ;
}
return avctx - > internal - > draining ? AVERROR_EOF : AVERROR ( EAGAIN ) ;
}