@ -32,13 +32,18 @@
# include "thread.h"
# define MAX_THREADS 64
# define BUFFER_SIZE (2*MAX_THREADS)
/* There can be as many as MAX_THREADS + 1 outstanding tasks.
* An additional + 1 is needed so that one can distinguish
* the case of zero and MAX_THREADS + 1 outstanding tasks modulo
* the number of buffers . */
# define BUFFER_SIZE (MAX_THREADS + 2)
typedef struct {
AVFrame * indata ;
AVPacket * outdata ;
int64_t return_code ;
unsigned index ;
int finished ;
} Task ;
typedef struct {
@ -49,8 +54,9 @@ typedef struct{
pthread_mutex_t task_fifo_mutex ;
pthread_cond_t task_fifo_cond ;
Task finished_tasks [ BUFFER_SIZE ] ;
pthread_mutex_t finished_task_mutex ;
unsigned max_tasks ;
Task tasks [ BUFFER_SIZE ] ;
pthread_mutex_t finished_task_mutex ; /* Guards tasks[i].finished */
pthread_cond_t finished_task_cond ;
unsigned task_index ;
@ -63,17 +69,13 @@ typedef struct{
static void * attribute_align_arg worker ( void * v ) {
AVCodecContext * avctx = v ;
ThreadContext * c = avctx - > internal - > frame_thread_encoder ;
AVPacket * pkt = NULL ;
while ( ! atomic_load ( & c - > exit ) ) {
int got_packet = 0 , ret ;
AVPacket * pkt ;
AVFrame * frame ;
Task task ;
if ( ! pkt ) pkt = av_packet_alloc ( ) ;
if ( ! pkt ) continue ;
av_init_packet ( pkt ) ;
pthread_mutex_lock ( & c - > task_fifo_mutex ) ;
while ( av_fifo_size ( c - > task_fifo ) < = 0 | | atomic_load ( & c - > exit ) ) {
if ( atomic_load ( & c - > exit ) ) {
@ -84,7 +86,12 @@ static void * attribute_align_arg worker(void *v){
}
av_fifo_generic_read ( c - > task_fifo , & task , sizeof ( task ) , NULL ) ;
pthread_mutex_unlock ( & c - > task_fifo_mutex ) ;
/* The main thread ensures that any two outstanding tasks have
* different indices , ergo each worker thread owns its element
* of c - > tasks with the exception of finished , which is shared
* with the main thread and guarded by finished_task_mutex . */
frame = task . indata ;
pkt = c - > tasks [ task . index ] . outdata ;
ret = avctx - > codec - > encode2 ( avctx , pkt , frame , & got_packet ) ;
if ( got_packet ) {
@ -101,13 +108,12 @@ static void * attribute_align_arg worker(void *v){
pthread_mutex_unlock ( & c - > buffer_mutex ) ;
av_frame_free ( & frame ) ;
pthread_mutex_lock ( & c - > finished_task_mutex ) ;
c - > finished_ tasks[ task . index ] . outdata = pkt ; pkt = NULL ;
c - > finished_ tasks[ task . index ] . return_code = ret ;
c - > tasks [ task . index ] . return_code = ret ;
c - > tasks [ task . index ] . finished = 1 ;
pthread_cond_signal ( & c - > finished_task_cond ) ;
pthread_mutex_unlock ( & c - > finished_task_mutex ) ;
}
end :
av_free ( pkt ) ;
pthread_mutex_lock ( & c - > buffer_mutex ) ;
avcodec_close ( avctx ) ;
pthread_mutex_unlock ( & c - > buffer_mutex ) ;
@ -194,6 +200,12 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
pthread_cond_init ( & c - > finished_task_cond , NULL ) ;
atomic_init ( & c - > exit , 0 ) ;
c - > max_tasks = avctx - > thread_count + 2 ;
for ( unsigned i = 0 ; i < c - > max_tasks ; i + + ) {
if ( ! ( c - > tasks [ i ] . outdata = av_packet_alloc ( ) ) )
goto fail ;
}
for ( i = 0 ; i < avctx - > thread_count ; i + + ) {
AVDictionary * tmp = NULL ;
int ret ;
@ -261,8 +273,8 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){
av_frame_free ( & task . indata ) ;
}
for ( i = 0 ; i < BUFFER_SIZE ; i + + ) {
av_packet_free ( & c - > finished_ tasks[ i ] . outdata ) ;
for ( unsigned i = 0 ; i < c - > max_tasks ; i + + ) {
av_packet_free ( & c - > tasks [ i ] . outdata ) ;
}
pthread_mutex_destroy ( & c - > task_fifo_mutex ) ;
@ -276,7 +288,7 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){
int ff_thread_video_encode_frame ( AVCodecContext * avctx , AVPacket * pkt , const AVFrame * frame , int * got_packet_ptr ) {
ThreadContext * c = avctx - > internal - > frame_thread_encoder ;
Task task ;
Task * outtask , task ;
int ret ;
av_assert1 ( ! * got_packet_ptr ) ;
@ -298,27 +310,28 @@ int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVF
pthread_cond_signal ( & c - > task_fifo_cond ) ;
pthread_mutex_unlock ( & c - > task_fifo_mutex ) ;
c - > task_index = ( c - > task_index + 1 ) % BUFFER_SIZE ;
c - > task_index = ( c - > task_index + 1 ) % c - > max_tasks ;
}
outtask = & c - > tasks [ c - > finished_task_index ] ;
pthread_mutex_lock ( & c - > finished_task_mutex ) ;
if ( c - > task_index = = c - > finished_task_index | |
( frame & & ! c - > finished_tasks [ c - > finished_task_index ] . outdata & &
( c - > task_index - c - > finished_task_index ) % BUFFER_SIZE < = avctx - > thread_count ) ) {
( frame & & ! outtask - > finished & &
( c - > task_index - c - > finished_task_index + c - > max_tasks ) % c - > max_tasks < = avctx - > thread_count ) ) {
pthread_mutex_unlock ( & c - > finished_task_mutex ) ;
return 0 ;
}
while ( ! c - > finished_tasks [ c - > finished_task_index ] . outdata ) {
while ( ! outtask - > finished ) {
pthread_cond_wait ( & c - > finished_task_cond , & c - > finished_task_mutex ) ;
}
task = c - > finished_tasks [ c - > finished_task_index ] ;
* pkt = * ( AVPacket * ) ( task . outdata ) ;
/* We now own outtask completely: No worker thread touches it any more,
* because there is no outstanding task with this index . */
outtask - > finished = 0 ;
av_packet_move_ref ( pkt , outtask - > outdata ) ;
if ( pkt - > data )
* got_packet_ptr = 1 ;
av_freep ( & c - > finished_tasks [ c - > finished_task_index ] . outdata ) ;
c - > finished_task_index = ( c - > finished_task_index + 1 ) % BUFFER_SIZE ;
c - > finished_task_index = ( c - > finished_task_index + 1 ) % c - > max_tasks ;
pthread_mutex_unlock ( & c - > finished_task_mutex ) ;
return task . return_code ;
return outtask - > return_code ;
}