@ -21,6 +21,8 @@
# include "ffmpeg.h"
# include "ffmpeg_utils.h"
# include "objpool.h"
# include "thread_queue.h"
# include "libavutil/avassert.h"
# include "libavutil/avstring.h"
@ -33,7 +35,6 @@
# include "libavutil/time.h"
# include "libavutil/timestamp.h"
# include "libavutil/thread.h"
# include "libavutil/threadmessage.h"
# include "libavcodec/packet.h"
@ -107,19 +108,13 @@ typedef struct Demuxer {
double readrate_initial_burst ;
AV ThreadMessage Queue * in_ thread_queue;
ThreadQueue * thread_queue ;
int thread_queue_size ;
pthread_t thread ;
int non_blocking ;
int read_started ;
} Demuxer ;
typedef struct DemuxMsg {
AVPacket * pkt ;
int looping ;
} DemuxMsg ;
static DemuxStream * ds_from_ist ( InputStream * ist )
{
return ( DemuxStream * ) ist ;
@ -440,26 +435,16 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
return 0 ;
}
// process an input packet into a message to send to the consumer thread
// src is always cleared by this function
static int input_packet_process ( Demuxer * d , DemuxMsg * msg , AVPacket * src )
static int input_packet_process ( Demuxer * d , AVPacket * pkt )
{
InputFile * f = & d - > f ;
InputStream * ist = f - > streams [ src - > stream_index ] ;
InputStream * ist = f - > streams [ pkt - > stream_index ] ;
DemuxStream * ds = ds_from_ist ( ist ) ;
AVPacket * pkt ;
int ret = 0 ;
pkt = av_packet_alloc ( ) ;
if ( ! pkt ) {
av_packet_unref ( src ) ;
return AVERROR ( ENOMEM ) ;
}
av_packet_move_ref ( pkt , src ) ;
ret = ts_fixup ( d , pkt ) ;
if ( ret < 0 )
goto fail ;
return ret ;
ds - > data_size + = pkt - > size ;
ds - > nb_packets + + ;
@ -475,13 +460,7 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src)
av_ts2timestr ( input_files [ ist - > file_index ] - > ts_offset , & AV_TIME_BASE_Q ) ) ;
}
msg - > pkt = pkt ;
pkt = NULL ;
fail :
av_packet_free ( & pkt ) ;
return ret ;
return 0 ;
}
static void readrate_sleep ( Demuxer * d )
@ -531,7 +510,6 @@ static void *input_thread(void *arg)
Demuxer * d = arg ;
InputFile * f = & d - > f ;
AVPacket * pkt ;
unsigned flags = d - > non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0 ;
int ret = 0 ;
pkt = av_packet_alloc ( ) ;
@ -547,8 +525,6 @@ static void *input_thread(void *arg)
d - > wallclock_start = av_gettime_relative ( ) ;
while ( 1 ) {
DemuxMsg msg = { NULL } ;
ret = av_read_frame ( f - > ctx , pkt ) ;
if ( ret = = AVERROR ( EAGAIN ) ) {
@ -558,8 +534,8 @@ static void *input_thread(void *arg)
if ( ret < 0 ) {
if ( d - > loop ) {
/* signal looping to the consumer thread */
msg . looping = 1 ;
ret = av_ thread_message_ queue _send( d - > in_ thread_queue, & msg , 0 ) ;
pkt - > stream_index = - 1 ;
ret = tq_send ( d - > thread_queue , 0 , pkt ) ;
if ( ret > = 0 )
ret = seek_to_start ( d ) ;
if ( ret > = 0 )
@ -602,35 +578,26 @@ static void *input_thread(void *arg)
}
}
ret = input_packet_process ( d , & msg , pkt ) ;
ret = input_packet_process ( d , pkt ) ;
if ( ret < 0 )
break ;
if ( f - > readrate )
readrate_sleep ( d ) ;
ret = av_thread_message_queue_send ( d - > in_thread_queue , & msg , flags ) ;
if ( flags & & ret = = AVERROR ( EAGAIN ) ) {
flags = 0 ;
ret = av_thread_message_queue_send ( d - > in_thread_queue , & msg , flags ) ;
av_log ( f , AV_LOG_WARNING ,
" Thread message queue blocking; consider raising the "
" thread_queue_size option (current value: %d) \n " ,
d - > thread_queue_size ) ;
}
ret = tq_send ( d - > thread_queue , 0 , pkt ) ;
if ( ret < 0 ) {
if ( ret ! = AVERROR_EOF )
av_log ( f , AV_LOG_ERROR ,
" Unable to send packet to main thread: %s \n " ,
av_err2str ( ret ) ) ;
av_packet_free ( & msg . pkt ) ;
break ;
}
}
finish :
av_assert0 ( ret < 0 ) ;
av_thread_message_queue_set_err_recv ( d - > in_ thread_queue, ret ) ;
tq_send_finish ( d - > thread_queue , 0 ) ;
av_packet_free ( & pkt ) ;
@ -642,16 +609,16 @@ finish:
static void thread_stop ( Demuxer * d )
{
InputFile * f = & d - > f ;
DemuxMsg msg ;
if ( ! d - > in_ thread_queue)
if ( ! d - > thread_queue )
return ;
av_thread_message_queue_set_err_send ( d - > in_thread_queue , AVERROR_EOF ) ;
while ( av_thread_message_queue_recv ( d - > in_thread_queue , & msg , 0 ) > = 0 )
av_packet_free ( & msg . pkt ) ;
tq_receive_finish ( d - > thread_queue , 0 ) ;
pthread_join ( d - > thread , NULL ) ;
av_thread_message_queue_free ( & d - > in_thread_queue ) ;
tq_free ( & d - > thread_queue ) ;
av_thread_message_queue_free ( & f - > audio_ts_queue ) ;
}
@ -659,18 +626,20 @@ static int thread_start(Demuxer *d)
{
int ret ;
InputFile * f = & d - > f ;
ObjPool * op ;
if ( d - > thread_queue_size < = 0 )
d - > thread_queue_size = ( nb_input_files > 1 ? 8 : 1 ) ;
if ( nb_input_files > 1 & &
( f - > ctx - > pb ? ! f - > ctx - > pb - > seekable :
strcmp ( f - > ctx - > iformat - > name , " lavfi " ) ) )
d - > non_blocking = 1 ;
ret = av_thread_message_queue_alloc ( & d - > in_thread_queue ,
d - > thread_queue_size , sizeof ( DemuxMsg ) ) ;
if ( ret < 0 )
return ret ;
op = objpool_alloc_packets ( ) ;
if ( ! op )
return AVERROR ( ENOMEM ) ;
d - > thread_queue = tq_alloc ( 1 , d - > thread_queue_size , op , pkt_move ) ;
if ( ! d - > thread_queue ) {
objpool_free ( & op ) ;
return AVERROR ( ENOMEM ) ;
}
if ( d - > loop ) {
int nb_audio_dec = 0 ;
@ -700,31 +669,30 @@ static int thread_start(Demuxer *d)
return 0 ;
fail :
av_ thread_message_ queue _free( & d - > in_ thread_queue) ;
tq_free ( & d - > thread_queue ) ;
return ret ;
}
int ifile_get_packet ( InputFile * f , AVPacket * * pkt )
int ifile_get_packet ( InputFile * f , AVPacket * pkt )
{
Demuxer * d = demuxer_from_ifile ( f ) ;
DemuxMsg msg ;
int ret ;
int ret , dummy ;
if ( ! d - > in_ thread_queue) {
if ( ! d - > thread_queue ) {
ret = thread_start ( d ) ;
if ( ret < 0 )
return ret ;
}
ret = av_thread_message_queue_recv ( d - > in_thread_queue , & msg ,
d - > non_blocking ?
AV_THREAD_MESSAGE_NONBLOCK : 0 ) ;
ret = tq_receive ( d - > thread_queue , & dummy , pkt ) ;
if ( ret < 0 )
return ret ;
if ( msg . looping )
if ( pkt - > stream_index = = - 1 ) {
av_assert0 ( ! pkt - > data & & ! pkt - > side_data_elems ) ;
return 1 ;
}
* pkt = msg . pkt ;
return 0 ;
}