@ -34,8 +34,12 @@ typedef struct SyncQueueStream {
/* stream head: largest timestamp seen */
int64_t head_ts ;
int limiting ;
/* no more frames will be sent for this stream */
int finished ;
uint64_t frames_sent ;
uint64_t frames_max ;
} SyncQueueStream ;
struct SyncQueue {
@ -86,7 +90,7 @@ static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
st - > finished = 1 ;
if ( st - > head_ts ! = AV_NOPTS_VALUE ) {
if ( st - > limiting & & st - > head_ts ! = AV_NOPTS_VALUE ) {
/* check if this stream is the new finished head */
if ( sq - > head_finished_stream < 0 | |
av_compare_ts ( st - > head_ts , st - > tb ,
@ -121,7 +125,7 @@ static void queue_head_update(SyncQueue *sq)
* the queue head */
for ( unsigned int i = 0 ; i < sq - > nb_streams ; i + + ) {
SyncQueueStream * st = & sq - > streams [ i ] ;
if ( st - > head_ts = = AV_NOPTS_VALUE )
if ( st - > limiting & & st - > head_ts = = AV_NOPTS_VALUE )
return ;
}
@ -132,7 +136,7 @@ static void queue_head_update(SyncQueue *sq)
for ( unsigned int i = 0 ; i < sq - > nb_streams ; i + + ) {
SyncQueueStream * st_head = & sq - > streams [ sq - > head_stream ] ;
SyncQueueStream * st_other = & sq - > streams [ i ] ;
if ( st_other - > head_ts ! = AV_NOPTS_VALUE & &
if ( st_other - > limiting & & st_other - > head_ts ! = AV_NOPTS_VALUE & &
av_compare_ts ( st_other - > head_ts , st_other - > tb ,
st_head - > head_ts , st_head - > tb ) < 0 )
sq - > head_stream = i ;
@ -159,7 +163,8 @@ static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts)
finish_stream ( sq , stream_idx ) ;
/* update the overall head timestamp if it could have changed */
if ( sq - > head_stream < 0 | | sq - > head_stream = = stream_idx )
if ( st - > limiting & &
( sq - > head_stream < 0 | | sq - > head_stream = = stream_idx ) )
queue_head_update ( sq ) ;
}
@ -262,6 +267,10 @@ int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
stream_update_ts ( sq , stream_idx , ts ) ;
st - > frames_sent + + ;
if ( st - > frames_sent > = st - > frames_max )
finish_stream ( sq , stream_idx ) ;
return 0 ;
}
@ -339,7 +348,7 @@ int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
return ret ;
}
int sq_add_stream ( SyncQueue * sq )
int sq_add_stream ( SyncQueue * sq , int limiting )
{
SyncQueueStream * tmp , * st ;
@ -360,6 +369,8 @@ int sq_add_stream(SyncQueue *sq)
* streams forever ; cf . overflow_heartbeat ( ) */
st - > tb = ( AVRational ) { 1 , 1 } ;
st - > head_ts = AV_NOPTS_VALUE ;
st - > frames_max = UINT64_MAX ;
st - > limiting = limiting ;
return sq - > nb_streams + + ;
}
@ -379,6 +390,18 @@ void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
st - > tb = tb ;
}
void sq_limit_frames ( SyncQueue * sq , unsigned int stream_idx , uint64_t frames )
{
SyncQueueStream * st ;
av_assert0 ( stream_idx < sq - > nb_streams ) ;
st = & sq - > streams [ stream_idx ] ;
st - > frames_max = frames ;
if ( st - > frames_sent > = st - > frames_max )
finish_stream ( sq , stream_idx ) ;
}
SyncQueue * sq_alloc ( enum SyncQueueType type , int64_t buf_size_us )
{
SyncQueue * sq = av_mallocz ( sizeof ( * sq ) ) ;