@ -132,6 +132,177 @@ void sync_data(cv::GRunArgs &results, cv::GRunArgsP &outputs)
}
}
// Pops an item from every input queue and combine it to the final
// result. Blocks the current thread. Returns true if the vector has
// been obtained successfully and false if a Stop message has been
// received. Handles Stop x-queue synchronization gracefully.
//
// In fact, the logic behind this method is a little bit more complex.
// The complexity comes from handling the pipeline termination
// messages. This version if GStreamerExecutable is running every
// graph island in its own thread, and threads communicate via bounded
// (limited in size) queues. Threads poll their input queues in the
// infinite loops and pass the data to their Island executables when
// the full input vector (a "stack frame") arrives.
//
// If the input stream is over or stop() is called, "Stop" messages
// are broadcasted in the graph from island to island via queues,
// starting with the emitters (sources). Since queues are bounded,
// thread may block on push() if the queue is full already and is not
// popped for some reason in the reader thread. In order to avoid
// this, once an Island gets Stop on an input, it start reading all
// other input queues until it reaches Stop messages there as well.
// Only then the thread terminates so in theory queues are left
// free'd.
//
// "Stop" messages are sent to the pipeline in these three cases:
// 1. User has called stop(): a "Stop" message is sent to every input
// queue.
// 2. Input video stream has reached its end -- its emitter sends Stop
// to its readers AND asks constant emitters (emitters attached to
// const data -- infinite data generators) to push Stop messages as
// well - in order to maintain a regular Stop procedure as defined
// above.
// 3. "Stop" message coming from a constant emitter after triggering an
// EOS notification -- see (2).
//
// There is a problem with (3). Sometimes it terminates the pipeline
// too early while some frames could still be produced with no issue,
// and our test fails with error like "got 99 frames, expected 100".
// This is how it reproduces:
//
// q1
// [const input] -----------------------> [ ISL2 ] --> [output]
// q0 q2 .->
// [stream input] ---> [ ISL1 ] -------'
//
// Video emitter is pushing frames to q0, and ISL1 is taking every
// frame from this queue and processes it. Meanwhile, q1 is a
// const-input-queue staffed with const data already, ISL2 already
// popped one, and is waiting for data from q2 (of ISL1) to arrive.
//
// When the stream is over, stream emitter pushes the last frame to
// q0, followed by a Stop sign, and _immediately_ notifies const
// emitters to broadcast Stop messages as well. In the above
// configuration, the replicated Stop message via q1 may reach ISL2
// faster than the real Stop message via q2 -- moreover, somewhere in
// q1 or q2 there may be real frames awaiting processing. ISL2 gets
// Stop via q1 and _discards_ any pending data coming from q2 -- so a
// last frame or two may be lost.
//
// A working but not very elegant solution to this problem is to tag
// Stop messages. Stop got via stop() is really a hard stop, while
// broadcasted Stop from a Const input shouldn't initiate the Island
// execution termination. Instead, its associated const data should
// remain somewhere in islands' thread local storage until a real
// "Stop" is received.
//
// Queue reader is the class which encapsulates all this logic and
// provies threads with a managed storage and an easy API to obtain
// data.
class QueueReader
{
bool m_finishing = false ; // Set to true once a "soft" stop is received
std : : vector < Cmd > m_cmd ;
public :
bool getInputVector ( std : : vector < Q * > & in_queues ,
cv : : GRunArgs & in_constants ,
cv : : GRunArgs & isl_inputs ) ;
} ;
bool QueueReader : : getInputVector ( std : : vector < Q * > & in_queues ,
cv : : GRunArgs & in_constants ,
cv : : GRunArgs & isl_inputs )
{
// NOTE: in order to maintain the GRunArg's underlying object
// lifetime, keep the whole cmd vector (of size == # of inputs)
// in memory.
m_cmd . resize ( in_queues . size ( ) ) ;
isl_inputs . resize ( in_queues . size ( ) ) ;
for ( auto & & it : ade : : util : : indexed ( in_queues ) )
{
auto id = ade : : util : : index ( it ) ;
auto & q = ade : : util : : value ( it ) ;
if ( q = = nullptr )
{
GAPI_Assert ( ! in_constants . empty ( ) ) ;
// NULL queue means a graph-constant value (like a
// value-initialized scalar)
// It can also hold a constant value received with
// Stop::Kind::CNST message (see above).
// FIXME: Variant move problem
isl_inputs [ id ] = const_cast < const cv : : GRunArg & > ( in_constants [ id ] ) ;
continue ;
}
q - > pop ( m_cmd [ id ] ) ;
if ( ! cv : : util : : holds_alternative < Stop > ( m_cmd [ id ] ) )
{
// FIXME: Variant move problem
isl_inputs [ id ] = const_cast < const cv : : GRunArg & > ( cv : : util : : get < cv : : GRunArg > ( m_cmd [ id ] ) ) ;
}
else // A Stop sign
{
const auto & stop = cv : : util : : get < Stop > ( m_cmd [ id ] ) ;
if ( stop . kind = = Stop : : Kind : : CNST )
{
// We've got a Stop signal from a const source,
// propagated as a result of real stream reaching its
// end. Sometimes these signals come earlier than
// real EOS Stops so are deprioritized -- just
// remember the Const value here and continue
// processing other queues. Set queue pointer to
// nullptr and update the const_val vector
// appropriately
m_finishing = true ;
in_queues [ id ] = nullptr ;
in_constants . resize ( in_queues . size ( ) ) ;
in_constants [ id ] = std : : move ( stop . cdata ) ;
// NEXT time (on a next call to getInputVector()), the
// "q==nullptr" check above will be triggered, but now
// we need to make it manually:
isl_inputs [ id ] = const_cast < const cv : : GRunArg & > ( in_constants [ id ] ) ;
}
else
{
GAPI_Assert ( stop . kind = = Stop : : Kind : : HARD ) ;
// Just got a stop sign. Reiterate through all
// _remaining valid_ queues (some of them can be
// set to nullptr already -- see above) and rewind
// data to every Stop sign per queue
for ( auto & & qit : ade : : util : : indexed ( in_queues ) )
{
auto id2 = ade : : util : : index ( qit ) ;
auto & q2 = ade : : util : : value ( qit ) ;
if ( id = = id2 ) continue ;
Cmd cmd2 ;
while ( q2 & & ! cv : : util : : holds_alternative < Stop > ( cmd2 ) )
q2 - > pop ( cmd2 ) ;
}
// After queues are read to the proper indicator,
// indicate end-of-stream
return false ;
} // if(Cnst)
} // if(Stop)
} // for(in_queues)
if ( m_finishing )
{
// If the process is about to end (a soft Stop was received
// already) and an island has no other inputs than constant
// inputs, its queues may all become nullptrs. Indicate it as
// "no data".
return ! ade : : util : : all_of ( in_queues , [ ] ( Q * ptr ) { return ptr = = nullptr ; } ) ;
}
return true ; // A regular case - there is data to process.
}
// This thread is a plain dump source actor. What it do is just:
// - Check input queue (the only one) for a control command
// - Depending on the state, obtains next data object and pushes it to the
@ -202,90 +373,62 @@ void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs, //
cv : : GMetaArgs out_metas , // ...
std : : shared_ptr < cv : : gimpl : : GIslandExecutable > island , // FIXME: ...a copy of OpDesc{}.
std : : vector < Q * > in_queues ,
std : : vector < cv : : GRunArg > in_constants ,
cv : : GRunArgs in_constants ,
std : : vector < std : : vector < Q * > > out_queues )
{
GAPI_Assert ( in_queues . size ( ) = = in_rcs . size ( ) ) ;
GAPI_Assert ( out_queues . size ( ) = = out_rcs . size ( ) ) ;
GAPI_Assert ( out_queues . size ( ) = = out_metas . size ( ) ) ;
QueueReader qr ;
while ( true )
{
std : : vector < cv : : gimpl : : GIslandExecutable : : InObj > isl_inputs ;
isl_inputs . resize ( in_rcs . size ( ) ) ;
// Try to obtain the full input vector.
// Note this may block us. We also may get Stop signal here
// and then exit the thread.
// NOTE: in order to maintain the GRunArg's underlying object
// lifetime, keep the whole cmd vector (of size == # of inputs)
// in memory.
std : : vector < Cmd > cmd ( in_queues . size ( ) ) ;
for ( auto & & it : ade : : util : : indexed ( in_queues ) )
cv : : GRunArgs isl_input_args ;
if ( ! qr . getInputVector ( in_queues , in_constants , isl_input_args ) )
{
auto id = ade : : util : : index ( it ) ;
auto & q = ade : : util : : value ( it ) ;
isl_inputs [ id ] . first = in_rcs [ id ] ;
if ( q = = nullptr )
// Stop received -- broadcast Stop down to the pipeline and quit
for ( auto & & out_qq : out_queues )
{
// NULL queue means a graph-constant value
// (like a value-initialized scalar)
// FIXME: Variant move problem
isl_inputs [ id ] . second = const_cast < const cv : : GRunArg & > ( in_constants [ id ] ) ;
for ( auto & & out_q : out_qq ) out_q - > push ( Cmd { Stop { } } ) ;
}
else
{
q - > pop ( cmd [ id ] ) ;
if ( cv : : util : : holds_alternative < Stop > ( cmd [ id ] ) )
{
// FIXME: This logic must be unified with what collectorThread is doing!
// Just got a stop sign. Reiterate through all queues
// and rewind data to every Stop sign per queue
for ( auto & & qit : ade : : util : : indexed ( in_queues ) )
{
auto id2 = ade : : util : : index ( qit ) ;
auto & q2 = ade : : util : : value ( qit ) ;
if ( id = = id2 ) continue ;
Cmd cmd2 ;
while ( q2 & & ! cv : : util : : holds_alternative < Stop > ( cmd2 ) )
q2 - > pop ( cmd2 ) ;
}
// Broadcast Stop down to the pipeline and quit
for ( auto & & out_qq : out_queues )
{
for ( auto & & out_q : out_qq ) out_q - > push ( Cmd { Stop { } } ) ;
}
return ;
}
// FIXME: MOVE PROBLEM
const cv : : GRunArg & in_arg = cv : : util : : get < cv : : GRunArg > ( cmd [ id ] ) ;
return ;
}
GAPI_Assert ( isl_inputs . size ( ) = = isl_input_args . size ( ) ) ;
for ( auto & & it : ade : : util : : zip ( ade : : util : : toRange ( in_rcs ) ,
ade : : util : : toRange ( isl_inputs ) ,
ade : : util : : toRange ( isl_input_args ) ) )
{
const auto & in_rc = std : : get < 0 > ( it ) ;
auto & isl_input = std : : get < 1 > ( it ) ;
const auto & in_arg = std : : get < 2 > ( it ) ; // FIXME: MOVE PROBLEM
isl_input . first = in_rc ;
# if defined(GAPI_STANDALONE)
// Standalone mode - simply store input argument in the vector as-is
isl_inputs [ id ] . second = in_arg ;
// Standalone mode - simply store input argument in the vector as-is
isl_inputs [ id ] . second = in_arg ;
# else
// Make Islands operate on own:: data types (i.e. in the same
// environment as GExecutor provides)
// This way several backends (e.g. Fluid) remain OpenCV-independent.
switch ( in_arg . index ( ) ) {
case cv : : GRunArg : : index_of < cv : : Mat > ( ) :
isl_inputs [ id ] . second = cv : : GRunArg { cv : : to_own ( cv : : util : : get < cv : : Mat > ( in_arg ) ) } ;
break ;
case cv : : GRunArg : : index_of < cv : : Scalar > ( ) :
isl_inputs [ id ] . second = cv : : GRunArg { cv : : to_own ( cv : : util : : get < cv : : Scalar > ( in_arg ) ) } ;
break ;
default :
isl_inputs [ id ] . second = in_arg ;
break ;
}
# endif // GAPI_STANDALONE
// Make Islands operate on own:: data types (i.e. in the same
// environment as GExecutor provides)
// This way several backends (e.g. Fluid) remain OpenCV-independent.
switch ( in_arg . index ( ) ) {
case cv : : GRunArg : : index_of < cv : : Mat > ( ) :
isl_input . second = cv : : GRunArg { cv : : to_own ( cv : : util : : get < cv : : Mat > ( in_arg ) ) } ;
break ;
case cv : : GRunArg : : index_of < cv : : Scalar > ( ) :
isl_input . second = cv : : GRunArg { cv : : to_own ( cv : : util : : get < cv : : Scalar > ( in_arg ) ) } ;
break ;
default :
isl_input . second = in_arg ;
break ;
}
# endif // GAPI_STANDALONE
}
// Once the vector is obtained, prepare data for island execution
// Note - we first allocate output vector via GRunArg!
// Then it is converted to a GRunArgP.
std : : vector < cv : : gimpl : : GIslandExecutable : : OutObj > isl_outputs ;
std : : vector < cv : : GRunArg > out_data ;
cv : : GRunArgs out_data ;
isl_outputs . resize ( out_rcs . size ( ) ) ;
out_data . resize ( out_rcs . size ( ) ) ;
for ( auto & & it : ade : : util : : indexed ( out_rcs ) )
@ -363,33 +506,15 @@ void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs, //
void collectorThread ( std : : vector < Q * > in_queues ,
Q & out_queue )
{
QueueReader qr ;
while ( true )
{
cv : : GRunArgs this_result ( in_queues . size ( ) ) ;
for ( auto & & it : ade : : util : : indexed ( in_queues ) )
cv : : GRunArgs this_const ( in_queues . size ( ) ) ;
if ( ! qr . getInputVector ( in_queues , this_const , this_result ) )
{
Cmd cmd ;
ade : : util : : value ( it ) - > pop ( cmd ) ;
if ( cv : : util : : holds_alternative < Stop > ( cmd ) )
{
// FIXME: Unify this code with island thread
for ( auto & & qit : ade : : util : : indexed ( in_queues ) )
{
if ( ade : : util : : index ( qit ) = = ade : : util : : index ( it ) ) continue ;
Cmd cmd2 ;
while ( ! cv : : util : : holds_alternative < Stop > ( cmd2 ) )
ade : : util : : value ( qit ) - > pop ( cmd2 ) ;
}
out_queue . push ( Cmd { Stop { } } ) ;
return ;
}
else
{
// FIXME: MOVE_PROBLEM
const cv : : GRunArg & in_arg = cv : : util : : get < cv : : GRunArg > ( cmd ) ;
this_result [ ade : : util : : index ( it ) ] = in_arg ;
// FIXME: Check for other message types.
}
out_queue . push ( Cmd { Stop { } } ) ;
return ;
}
out_queue . push ( Cmd { this_result } ) ;
}
@ -654,6 +779,7 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
// Create a constant emitter.
// Produces always the same ("constant") value when pulled.
emitter . reset ( new ConstEmitter { emit_arg } ) ;
m_const_vals . push_back ( const_cast < cv : : GRunArg & > ( emit_arg ) ) ; // FIXME: move problem
m_const_emitter_queues . push_back ( & m_emitter_queues [ emit_idx ] ) ;
break ;
}
@ -664,9 +790,17 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
// all other inputs are "constant" generators.
// Craft here a completion callback to notify Const emitters that
// a video source is over
GAPI_Assert ( m_const_emitter_queues . size ( ) = = m_const_vals . size ( ) ) ;
auto real_video_completion_cb = [ this ] ( )
{
for ( auto q : m_const_emitter_queues ) q - > push ( Cmd { Stop { } } ) ;
for ( auto it : ade : : util : : zip ( ade : : util : : toRange ( m_const_emitter_queues ) ,
ade : : util : : toRange ( m_const_vals ) ) )
{
Stop stop ;
stop . kind = Stop : : Kind : : CNST ;
stop . cdata = std : : get < 1 > ( it ) ;
std : : get < 0 > ( it ) - > push ( Cmd { std : : move ( stop ) } ) ;
}
} ;
// FIXME: ONLY now, after all executable objects are created,