@ -1,5 +1,9 @@
/*
* Copyright ( c ) 2004 Michael Niedermayer < michaelni @ gmx . at >
* Copyright ( c ) 2004 Roman Shaposhnik .
*
* Many thanks to Steven M . Schultz for providing clever ideas and
* to Michael Niedermayer < michaelni @ gmx . at > for writing initial
* implementation .
*
* This library is free software ; you can redistribute it and / or
* modify it under the terms of the GNU Lesser General Public
@ -16,186 +20,149 @@
* Foundation , Inc . , 59 Temple Place , Suite 330 , Boston , MA 02111 - 1307 USA
*
*/
# include <semaphore.h>
# include <pthread.h>
//#define DEBUG
# include "avcodec.h"
# include "common.h"
typedef struct JobContext {
sem_t available_sem ;
int assigned ;
int ( * func ) ( AVCodecContext * c , void * arg ) ;
void * arg ;
int ret ;
} JobContext ;
typedef struct WorkerContext {
AVCodecContext * avctx ;
pthread_t thread ;
int start_index ;
sem_t work_sem ;
sem_t done_sem ;
} WorkerContext ;
typedef int ( action_t ) ( AVCodecContext * c , void * arg ) ;
typedef struct ThreadContext {
WorkerContext * worker ;
JobContext * job ;
typedef struct ThreadContext {
pthread_t * workers ;
action_t * func ;
void * * args ;
int * rets ;
int rets_count ;
int job_count ;
int allocated_job_count ;
} ThreadContext ;
static void * thread_func ( void * v ) {
WorkerContext * w = v ;
ThreadContext * c = w - > avctx - > thread_opaque ;
int i ;
for ( ; ; ) {
//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X enter wait\n", (int)v);
sem_wait ( & w - > work_sem ) ;
//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X after wait\n", (int)v);
if ( c - > job_count = = 0 )
break ;
for ( i = 0 ; i < c - > job_count ; i + + ) {
int index = ( i + w - > start_index ) % c - > job_count ;
JobContext * j = & c - > job [ index ] ;
//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X first check of %d\n", (int)v, index);
if ( j - > assigned ) continue ; //unsynced check, if != 0 it is already given to another worker, it never becomes available before the next execute() call so this should be safe
pthread_cond_t last_job_cond ;
pthread_cond_t current_job_cond ;
pthread_mutex_t current_job_lock ;
int current_job ;
int done ;
} ThreadContext ;
static void * worker ( void * v )
{
AVCodecContext * avctx = v ;
ThreadContext * c = avctx - > thread_opaque ;
int our_job = c - > job_count ;
int thread_count = avctx - > thread_count ;
int self_id ;
pthread_mutex_lock ( & c - > current_job_lock ) ;
self_id = c - > current_job + + ;
for ( ; ; ) {
while ( our_job > = c - > job_count ) {
if ( c - > current_job = = thread_count + c - > job_count )
pthread_cond_signal ( & c - > last_job_cond ) ;
//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X second check of %d\n", (int)v, index);
if ( sem_trywait ( & j - > available_sem ) = = 0 ) {
j - > assigned = 1 ;
j - > ret = j - > func ( w - > avctx , j - > arg ) ;
//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X done %d\n", (int)v, index);
}
}
//av_log(w->avctx, AV_LOG_DEBUG, "thread_func %X complete\n", (int)v);
sem_post ( & w - > done_sem ) ;
pthread_cond_wait ( & c - > current_job_cond , & c - > current_job_lock ) ;
our_job = self_id ;
if ( c - > done ) {
pthread_mutex_unlock ( & c - > current_job_lock ) ;
return NULL ;
}
}
pthread_mutex_unlock ( & c - > current_job_lock ) ;
c - > rets [ our_job % c - > rets_count ] = c - > func ( avctx , c - > args [ our_job ] ) ;
pthread_mutex_lock ( & c - > current_job_lock ) ;
our_job = c - > current_job + + ;
}
return NULL ;
}
/**
* free what has been allocated by avcodec_thread_init ( ) .
* must be called after decoding has finished , especially dont call while avcodec_thread_execute ( ) is running
*/
void avcodec_thread_free ( AVCodecContext * s ) {
ThreadContext * c = s - > thread_opaque ;
int i , val ;
for ( i = 0 ; i < c - > allocated_job_count ; i + + ) {
sem_getvalue ( & c - > job [ i ] . available_sem , & val ) ; assert ( val = = 0 ) ;
sem_destroy ( & c - > job [ i ] . available_sem ) ;
}
c - > job_count = 0 ;
for ( i = 0 ; i < s - > thread_count ; i + + ) {
sem_getvalue ( & c - > worker [ i ] . work_sem , & val ) ; assert ( val = = 0 ) ;
sem_getvalue ( & c - > worker [ i ] . done_sem , & val ) ; assert ( val = = 0 ) ;
sem_post ( & c - > worker [ i ] . work_sem ) ;
pthread_join ( c - > worker [ i ] . thread , NULL ) ;
sem_destroy ( & c - > worker [ i ] . work_sem ) ;
sem_destroy ( & c - > worker [ i ] . done_sem ) ;
}
av_freep ( & c - > job ) ;
av_freep ( & c - > worker ) ;
av_freep ( & s - > thread_opaque ) ;
static always_inline void avcodec_thread_park_workers ( ThreadContext * c , int thread_count )
{
pthread_cond_wait ( & c - > last_job_cond , & c - > current_job_lock ) ;
pthread_mutex_unlock ( & c - > current_job_lock ) ;
}
int avcodec_thread_execute ( AVCodecContext * s , int ( * func ) ( AVCodecContext * c2 , void * arg2 ) , void * * arg , int * ret , int job_count ) {
ThreadContext * c = s - > thread_opaque ;
int i , val ;
void avcodec_thread_free ( AVCodecContext * avctx )
{
ThreadContext * c = avctx - > thread_opaque ;
int i ;
assert ( s = = c - > avctx ) ;
if ( job_count > c - > allocated_job_count ) {
c - > job = av_realloc ( c - > job , job_count * sizeof ( JobContext ) ) ;
for ( i = c - > allocated_job_count ; i < job_count ; i + + ) {
memset ( & c - > job [ i ] , 0 , sizeof ( JobContext ) ) ;
c - > allocated_job_count + + ;
pthread_mutex_lock ( & c - > current_job_lock ) ;
c - > done = 1 ;
pthread_cond_signal ( & c - > current_job_cond ) ;
pthread_mutex_unlock ( & c - > current_job_lock ) ;
for ( i = 0 ; i < avctx - > thread_count ; i + + )
pthread_join ( c - > workers [ i ] , NULL ) ;
pthread_mutex_destroy ( & c - > current_job_lock ) ;
pthread_cond_destroy ( & c - > current_job_cond ) ;
pthread_cond_destroy ( & c - > last_job_cond ) ;
av_free ( c - > workers ) ;
av_freep ( c ) ;
}
if ( sem_init ( & c - > job [ i ] . available_sem , 0 , 0 ) )
return - 1 ;
}
}
c - > job_count = job_count ;
int avcodec_thread_execute ( AVCodecContext * avctx , action_t * func , void * * arg , int * ret , int job_count )
{
ThreadContext * c = avctx - > thread_opaque ;
int dummy_ret ;
if ( job_count < = 0 )
return 0 ;
/* note, we can be certain that this is not called with the same AVCodecContext by different threads at the same time */
for ( i = 0 ; i < job_count ; i + + ) {
sem_getvalue ( & c - > job [ i ] . available_sem , & val ) ; assert ( val = = 0 ) ;
c - > job [ i ] . arg = arg [ i ] ;
c - > job [ i ] . func = func ;
c - > job [ i ] . ret = 12345 ;
c - > job [ i ] . assigned = 0 ;
sem_post ( & c - > job [ i ] . available_sem ) ;
}
for ( i = 0 ; i < s - > thread_count & & i < job_count ; i + + ) {
sem_getvalue ( & c - > worker [ i ] . work_sem , & val ) ; assert ( val = = 0 ) ;
sem_getvalue ( & c - > worker [ i ] . done_sem , & val ) ; assert ( val = = 0 ) ;
c - > worker [ i ] . start_index = ( i + job_count / 2 ) / job_count ;
//av_log(s, AV_LOG_DEBUG, "start worker %d\n", i);
sem_post ( & c - > worker [ i ] . work_sem ) ;
}
for ( i = 0 ; i < s - > thread_count & & i < job_count ; i + + ) {
//av_log(s, AV_LOG_DEBUG, "wait for worker %d\n", i);
sem_wait ( & c - > worker [ i ] . done_sem ) ;
sem_getvalue ( & c - > worker [ i ] . work_sem , & val ) ; assert ( val = = 0 ) ;
sem_getvalue ( & c - > worker [ i ] . done_sem , & val ) ; assert ( val = = 0 ) ;
}
for ( i = 0 ; i < job_count ; i + + ) {
sem_getvalue ( & c - > job [ i ] . available_sem , & val ) ; assert ( val = = 0 ) ;
c - > job [ i ] . func = NULL ;
if ( ret ) ret [ i ] = c - > job [ i ] . ret ;
pthread_mutex_lock ( & c - > current_job_lock ) ;
c - > current_job = avctx - > thread_count ;
c - > job_count = job_count ;
c - > args = arg ;
c - > func = func ;
if ( ret ) {
c - > rets = ret ;
c - > rets_count = job_count ;
} else {
c - > rets = & dummy_ret ;
c - > rets_count = 1 ;
}
pthread_cond_broadcast ( & c - > current_job_cond ) ;
avcodec_thread_park_workers ( c , avctx - > thread_count ) ;
return 0 ;
}
int avcodec_thread_init ( AVCodecContext * s , int thread_count ) {
int avcodec_thread_init ( AVCodecContext * avctx , int thread_count )
{
int i ;
ThreadContext * c ;
WorkerContext * worker ;
s - > thread_count = thread_count ;
c = av_mallocz ( sizeof ( ThreadContext ) ) ;
if ( ! c )
return - 1 ;
c - > workers = av_mallocz ( sizeof ( pthread_t ) * thread_count ) ;
if ( ! c - > workers ) {
av_free ( c ) ;
return - 1 ;
}
assert ( ! s - > thread_opaque ) ;
c = av_mallocz ( sizeof ( ThreadContext ) ) ;
worker = av_mallocz ( sizeof ( WorkerContext ) * thread_count ) ;
s - > thread_opaque = c ;
c - > worker = worker ;
for ( i = 0 ; i < thread_count ; i + + ) {
//printf("init semaphors %d\n", i); fflush(stdout);
worker [ i ] . avctx = s ;
if ( sem_init ( & worker [ i ] . work_sem , 0 , 0 ) )
goto fail ;
if ( sem_init ( & worker [ i ] . done_sem , 0 , 0 ) )
goto fail ;
//printf("create thread %d\n", i); fflush(stdout);
if ( pthread_create ( & worker [ i ] . thread , NULL , thread_func , & worker [ i ] ) )
goto fail ;
avctx - > thread_opaque = c ;
avctx - > thread_count = thread_count ;
c - > current_job = 0 ;
c - > job_count = 0 ;
c - > done = 0 ;
pthread_cond_init ( & c - > current_job_cond , NULL ) ;
pthread_cond_init ( & c - > last_job_cond , NULL ) ;
pthread_mutex_init ( & c - > current_job_lock , NULL ) ;
pthread_mutex_lock ( & c - > current_job_lock ) ;
for ( i = 0 ; i < thread_count ; i + + ) {
if ( pthread_create ( & c - > workers [ i ] , NULL , worker , avctx ) ) {
avctx - > thread_count = i ;
pthread_mutex_unlock ( & c - > current_job_lock ) ;
avcodec_thread_free ( avctx ) ;
return - 1 ;
}
}
//printf("init done\n"); fflush(stdout);
s - > execute = avcodec_thread_execute ;
avcodec_thread_park_workers ( c , thread_count ) ;
avctx - > execute = avcodec_thread_execute ;
return 0 ;
fail :
avcodec_thread_free ( s ) ;
return - 1 ;
}