@ -62,96 +62,22 @@ typedef struct inproc_transport {
struct inproc_stream * stream_list ;
} inproc_transport ;
typedef struct sb_list_entry {
grpc_slice_buffer sb ;
struct sb_list_entry * next ;
} sb_list_entry ;
// Specialize grpc_byte_stream for our use case
typedef struct {
grpc_byte_stream base ;
sb_list_entry * le ;
grpc_error * shutdown_error ;
} inproc_slice_byte_stream ;
typedef struct {
// TODO (vjpai): Add some inlined elements to avoid alloc in simple cases
sb_list_entry * head ;
sb_list_entry * tail ;
} slice_buffer_list ;
static void slice_buffer_list_init ( slice_buffer_list * l ) {
l - > head = NULL ;
l - > tail = NULL ;
}
static void sb_list_entry_destroy ( grpc_exec_ctx * exec_ctx , sb_list_entry * le ) {
grpc_slice_buffer_destroy_internal ( exec_ctx , & le - > sb ) ;
gpr_free ( le ) ;
}
static void slice_buffer_list_destroy ( grpc_exec_ctx * exec_ctx ,
slice_buffer_list * l ) {
sb_list_entry * curr = l - > head ;
while ( curr ! = NULL ) {
sb_list_entry * le = curr ;
curr = curr - > next ;
sb_list_entry_destroy ( exec_ctx , le ) ;
}
l - > head = NULL ;
l - > tail = NULL ;
}
static bool slice_buffer_list_empty ( slice_buffer_list * l ) {
return l - > head = = NULL ;
}
static void slice_buffer_list_append_entry ( slice_buffer_list * l ,
sb_list_entry * next ) {
next - > next = NULL ;
if ( l - > tail ) {
l - > tail - > next = next ;
l - > tail = next ;
} else {
l - > head = next ;
l - > tail = next ;
}
}
static grpc_slice_buffer * slice_buffer_list_append ( slice_buffer_list * l ) {
sb_list_entry * next = ( sb_list_entry * ) gpr_malloc ( sizeof ( * next ) ) ;
grpc_slice_buffer_init ( & next - > sb ) ;
slice_buffer_list_append_entry ( l , next ) ;
return & next - > sb ;
}
static sb_list_entry * slice_buffer_list_pophead ( slice_buffer_list * l ) {
sb_list_entry * ret = l - > head ;
l - > head = l - > head - > next ;
if ( l - > head = = NULL ) {
l - > tail = NULL ;
}
return ret ;
}
typedef struct inproc_stream {
inproc_transport * t ;
grpc_metadata_batch to_read_initial_md ;
uint32_t to_read_initial_md_flags ;
bool to_read_initial_md_filled ;
slice_buffer_list to_read_message ;
grpc_metadata_batch to_read_trailing_md ;
bool to_read_trailing_md_filled ;
bool read s_needed;
bool read _closure_scheduled;
grpc_closure read _closure;
bool ops_needed ;
bool op_closure_scheduled ;
grpc_closure op_closure ;
// Write buffer used only during gap at init time when client-side
// stream is set up but server side stream is not yet set up
grpc_metadata_batch write_buffer_initial_md ;
bool write_buffer_initial_md_filled ;
uint32_t write_buffer_initial_md_flags ;
gpr_timespec write_buffer_deadline ;
slice_buffer_list write_buffer_message ;
grpc_millis write_buffer_deadline ;
grpc_metadata_batch write_buffer_trailing_md ;
bool write_buffer_trailing_md_filled ;
grpc_error * write_buffer_cancel_error ;
@ -164,11 +90,15 @@ typedef struct inproc_stream {
gpr_arena * arena ;
grpc_transport_stream_op_batch * send_message_op ;
grpc_transport_stream_op_batch * send_trailing_md_op ;
grpc_transport_stream_op_batch * recv_initial_md_op ;
grpc_transport_stream_op_batch * recv_message_op ;
grpc_transport_stream_op_batch * recv_trailing_md_op ;
inproc_slice_byte_stream recv_message_stream ;
grpc_slice_buffer recv_message ;
grpc_slice_buffer_stream recv_stream ;
bool recv_inited ;
bool initial_md_sent ;
bool trailing_md_sent ;
@ -180,61 +110,18 @@ typedef struct inproc_stream {
grpc_error * cancel_self_error ;
grpc_error * cancel_other_error ;
gpr_timespec deadline ;
grpc_millis deadline ;
bool listed ;
struct inproc_stream * stream_list_prev ;
struct inproc_stream * stream_list_next ;
} inproc_stream ;
static bool inproc_slice_byte_stream_next ( grpc_exec_ctx * exec_ctx ,
grpc_byte_stream * bs , size_t max ,
grpc_closure * on_complete ) {
// Because inproc transport always provides the entire message atomically,
// the byte stream always has data available when this function is called.
// Thus, this function always returns true (unlike other transports) and
// there is never any need to schedule a closure
return true ;
}
static grpc_error * inproc_slice_byte_stream_pull ( grpc_exec_ctx * exec_ctx ,
grpc_byte_stream * bs ,
grpc_slice * slice ) {
inproc_slice_byte_stream * stream = ( inproc_slice_byte_stream * ) bs ;
if ( stream - > shutdown_error ! = GRPC_ERROR_NONE ) {
return GRPC_ERROR_REF ( stream - > shutdown_error ) ;
}
* slice = grpc_slice_buffer_take_first ( & stream - > le - > sb ) ;
return GRPC_ERROR_NONE ;
}
static void inproc_slice_byte_stream_shutdown ( grpc_exec_ctx * exec_ctx ,
grpc_byte_stream * bs ,
grpc_error * error ) {
inproc_slice_byte_stream * stream = ( inproc_slice_byte_stream * ) bs ;
GRPC_ERROR_UNREF ( stream - > shutdown_error ) ;
stream - > shutdown_error = error ;
}
static void inproc_slice_byte_stream_destroy ( grpc_exec_ctx * exec_ctx ,
grpc_byte_stream * bs ) {
inproc_slice_byte_stream * stream = ( inproc_slice_byte_stream * ) bs ;
sb_list_entry_destroy ( exec_ctx , stream - > le ) ;
GRPC_ERROR_UNREF ( stream - > shutdown_error ) ;
}
static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
inproc_slice_byte_stream_next , inproc_slice_byte_stream_pull ,
inproc_slice_byte_stream_shutdown , inproc_slice_byte_stream_destroy } ;
void inproc_slice_byte_stream_init ( inproc_slice_byte_stream * s ,
sb_list_entry * le ) {
s - > base . length = ( uint32_t ) le - > sb . length ;
s - > base . flags = 0 ;
s - > base . vtable = & inproc_slice_byte_stream_vtable ;
s - > le = le ;
s - > shutdown_error = GRPC_ERROR_NONE ;
}
static grpc_closure do_nothing_closure ;
static bool cancel_stream_locked ( grpc_exec_ctx * exec_ctx , inproc_stream * s ,
grpc_error * error ) ;
static void op_state_machine ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) ;
static void ref_transport ( inproc_transport * t ) {
INPROC_LOG ( GPR_DEBUG , " ref_transport %p " , t ) ;
@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s,
static void really_destroy_stream ( grpc_exec_ctx * exec_ctx , inproc_stream * s ) {
INPROC_LOG ( GPR_DEBUG , " really_destroy_stream %p " , s ) ;
slice_buffer_list_destroy ( exec_ctx , & s - > to_read_message ) ;
slice_buffer_list_destroy ( exec_ctx , & s - > write_buffer_message ) ;
GRPC_ERROR_UNREF ( s - > write_buffer_cancel_error ) ;
GRPC_ERROR_UNREF ( s - > cancel_self_error ) ;
GRPC_ERROR_UNREF ( s - > cancel_other_error ) ;
if ( s - > recv_inited ) {
grpc_slice_buffer_destroy_internal ( exec_ctx , & s - > recv_message ) ;
}
unref_transport ( exec_ctx , s - > t ) ;
if ( s - > closure_at_destroy ) {
@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
}
}
static void read_state_machine ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) ;
static void log_metadata ( const grpc_metadata_batch * md_batch , bool is_client ,
bool is_initial ) {
for ( grpc_linked_mdelem * md = md_batch - > list . head ; md ! = NULL ;
@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s - > write_buffer_initial_md_filled = false ;
grpc_metadata_batch_init ( & s - > write_buffer_trailing_md ) ;
s - > write_buffer_trailing_md_filled = false ;
slice_buffer_list_init ( & s - > to_read_message ) ;
slice_buffer_list_init ( & s - > write_buffer_message ) ;
s - > reads_needed = false ;
s - > read_closure_scheduled = false ;
GRPC_CLOSURE_INIT ( & s - > read_closure , read_state_machine , s ,
s - > ops_needed = false ;
s - > op_closure_scheduled = false ;
GRPC_CLOSURE_INIT ( & s - > op_closure , op_state_machine , s ,
grpc_schedule_on_exec_ctx ) ;
s - > t = t ;
s - > closure_at_destroy = NULL ;
@ -377,8 +261,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s - > cancel_self_error = GRPC_ERROR_NONE ;
s - > cancel_other_error = GRPC_ERROR_NONE ;
s - > write_buffer_cancel_error = GRPC_ERROR_NONE ;
s - > deadline = gpr_inf_future ( GPR_CLOCK_MONOTONIC ) ;
s - > write_buffer_deadline = gpr_inf_future ( GPR_CLOCK_MONOTONIC ) ;
s - > deadline = GRPC_MILLIS_INF_FUTURE ;
s - > write_buffer_deadline = GRPC_MILLIS_INF_FUTURE ;
s - > stream_list_prev = NULL ;
gpr_mu_lock ( & t - > mu - > mu ) ;
@ -421,15 +305,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
cs - > write_buffer_initial_md_flags ,
& s - > to_read_initial_md , & s - > to_read_initial_md_flags ,
& s - > to_read_initial_md_filled ) ;
s - > deadline = gpr_time_min ( s - > deadline , cs - > write_buffer_deadline ) ;
s - > deadline = GPR_MIN ( s - > deadline , cs - > write_buffer_deadline ) ;
grpc_metadata_batch_clear ( exec_ctx , & cs - > write_buffer_initial_md ) ;
cs - > write_buffer_initial_md_filled = false ;
}
while ( ! slice_buffer_list_empty ( & cs - > write_buffer_message ) ) {
slice_buffer_list_append_entry (
& s - > to_read_message ,
slice_buffer_list_pophead ( & cs - > write_buffer_message ) ) ;
}
if ( cs - > write_buffer_trailing_md_filled ) {
fill_in_metadata ( exec_ctx , s , & cs - > write_buffer_trailing_md , 0 ,
& s - > to_read_trailing_md , NULL ,
@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
}
}
// Call the on_complete closure associated with this stream_op_batch if
// this stream_op_batch is only one of the pending operations for this
// stream. This is called when one of the pending operations for the stream
// is done and about to be NULLed out
static void complete_if_batch_end_locked ( grpc_exec_ctx * exec_ctx ,
inproc_stream * s , grpc_error * error ,
grpc_transport_stream_op_batch * op ,
const char * msg ) {
int is_sm = ( int ) ( op = = s - > send_message_op ) ;
int is_stm = ( int ) ( op = = s - > send_trailing_md_op ) ;
int is_rim = ( int ) ( op = = s - > recv_initial_md_op ) ;
int is_rm = ( int ) ( op = = s - > recv_message_op ) ;
int is_rtm = ( int ) ( op = = s - > recv_trailing_md_op ) ;
if ( ( is_sm + is_stm + is_rim + is_rm + is_rtm ) = = 1 ) {
INPROC_LOG ( GPR_DEBUG , " %s %p %p %p " , msg , s , op , error ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , op - > on_complete , GRPC_ERROR_REF ( error ) ) ;
}
}
static void maybe_schedule_op_closure_locked ( grpc_exec_ctx * exec_ctx ,
inproc_stream * s ,
grpc_error * error ) {
if ( s & & s - > ops_needed & & ! s - > op_closure_scheduled ) {
GRPC_CLOSURE_SCHED ( exec_ctx , & s - > op_closure , GRPC_ERROR_REF ( error ) ) ;
s - > op_closure_scheduled = true ;
s - > ops_needed = false ;
}
}
static void fail_helper_locked ( grpc_exec_ctx * exec_ctx , inproc_stream * s ,
grpc_error * error ) {
INPROC_LOG ( GPR_DEBUG , " read_state_machine %p fail_helper " , s ) ;
INPROC_LOG ( GPR_DEBUG , " op _state_machine %p fail_helper" , s ) ;
// If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata
if ( ! s - > trailing_md_sent ) {
@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if ( other - > cancel_other_error = = GRPC_ERROR_NONE ) {
other - > cancel_other_error = GRPC_ERROR_REF ( error ) ;
}
if ( other - > reads_needed ) {
if ( ! other - > read_closure_scheduled ) {
GRPC_CLOSURE_SCHED ( exec_ctx , & other - > read_closure ,
GRPC_ERROR_REF ( error ) ) ;
other - > read_closure_scheduled = true ;
}
other - > reads_needed = false ;
}
maybe_schedule_op_closure_locked ( exec_ctx , other , error ) ;
} else if ( s - > write_buffer_cancel_error = = GRPC_ERROR_NONE ) {
s - > write_buffer_cancel_error = GRPC_ERROR_REF ( error ) ;
}
@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
err ) ;
// Last use of err so no need to REF and then UNREF it
if ( ( s - > recv_initial_md_op ! = s - > recv_message_op ) & &
( s - > recv_initial_md_op ! = s - > recv_trailing_md_op ) ) {
INPROC_LOG ( GPR_DEBUG ,
" fail_helper %p scheduling initial-metadata-on-complete %p " ,
error , s ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_initial_md_op - > on_complete ,
GRPC_ERROR_REF ( error ) ) ;
}
complete_if_batch_end_locked (
exec_ctx , s , error , s - > recv_initial_md_op ,
" fail_helper scheduling recv-initial-metadata-on-complete " ) ;
s - > recv_initial_md_op = NULL ;
}
if ( s - > recv_message_op ) {
@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_CLOSURE_SCHED (
exec_ctx , s - > recv_message_op - > payload - > recv_message . recv_message_ready ,
GRPC_ERROR_REF ( error ) ) ;
if ( s - > recv_message_op ! = s - > recv_trailing_md_op ) {
INPROC_LOG ( GPR_DEBUG , " fail_helper %p scheduling message-on-complete %p " ,
s , error ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_message_op - > on_complete ,
GRPC_ERROR_REF ( error ) ) ;
}
complete_if_batch_end_locked (
exec_ctx , s , error , s - > recv_message_op ,
" fail_helper scheduling recv-message-on-complete " ) ;
s - > recv_message_op = NULL ;
}
if ( s - > send_message_op ) {
complete_if_batch_end_locked (
exec_ctx , s , error , s - > send_message_op ,
" fail_helper scheduling send-message-on-complete " ) ;
s - > send_message_op = NULL ;
}
if ( s - > send_trailing_md_op ) {
complete_if_batch_end_locked (
exec_ctx , s , error , s - > send_trailing_md_op ,
" fail_helper scheduling send-trailng-md-on-complete " ) ;
s - > send_trailing_md_op = NULL ;
}
if ( s - > recv_trailing_md_op ) {
INPROC_LOG ( GPR_DEBUG ,
" fail_helper %p scheduling trailing-md-on-complete %p " , s ,
error ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_trailing_md_op - > on_complete ,
GRPC_ERROR_REF ( error ) ) ;
complete_if_batch_end_locked (
exec_ctx , s , error , s - > recv_trailing_md_op ,
" fail_helper scheduling recv-trailing-metadata-on-complete " ) ;
s - > recv_trailing_md_op = NULL ;
}
close_other_side_locked ( exec_ctx , s , " fail_helper:other_side " ) ;
@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_ERROR_UNREF ( error ) ;
}
static void read_state_machine ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
static void message_transfer_locked ( grpc_exec_ctx * exec_ctx ,
inproc_stream * sender ,
inproc_stream * receiver ) {
size_t remaining =
sender - > send_message_op - > payload - > send_message . send_message - > length ;
if ( receiver - > recv_inited ) {
grpc_slice_buffer_destroy_internal ( exec_ctx , & receiver - > recv_message ) ;
}
grpc_slice_buffer_init ( & receiver - > recv_message ) ;
receiver - > recv_inited = true ;
do {
grpc_slice message_slice ;
grpc_closure unused ;
GPR_ASSERT ( grpc_byte_stream_next (
exec_ctx , sender - > send_message_op - > payload - > send_message . send_message ,
SIZE_MAX , & unused ) ) ;
grpc_error * error = grpc_byte_stream_pull (
exec_ctx , sender - > send_message_op - > payload - > send_message . send_message ,
& message_slice ) ;
if ( error ! = GRPC_ERROR_NONE ) {
cancel_stream_locked ( exec_ctx , sender , GRPC_ERROR_REF ( error ) ) ;
break ;
}
GPR_ASSERT ( error = = GRPC_ERROR_NONE ) ;
remaining - = GRPC_SLICE_LENGTH ( message_slice ) ;
grpc_slice_buffer_add ( & receiver - > recv_message , message_slice ) ;
} while ( remaining > 0 ) ;
grpc_slice_buffer_stream_init ( & receiver - > recv_stream , & receiver - > recv_message ,
0 ) ;
* receiver - > recv_message_op - > payload - > recv_message . recv_message =
& receiver - > recv_stream . base ;
INPROC_LOG ( GPR_DEBUG , " message_transfer_locked %p scheduling message-ready " ,
receiver ) ;
GRPC_CLOSURE_SCHED (
exec_ctx ,
receiver - > recv_message_op - > payload - > recv_message . recv_message_ready ,
GRPC_ERROR_NONE ) ;
complete_if_batch_end_locked (
exec_ctx , sender , GRPC_ERROR_NONE , sender - > send_message_op ,
" message_transfer scheduling sender on_complete " ) ;
complete_if_batch_end_locked (
exec_ctx , receiver , GRPC_ERROR_NONE , receiver - > recv_message_op ,
" message_transfer scheduling receiver on_complete " ) ;
receiver - > recv_message_op = NULL ;
sender - > send_message_op = NULL ;
}
static void op_state_machine ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
// This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted
// Schedule our appropriate closures
// and then return to reads_needed state if still needed
// and then return to op s_needed state if still needed
// Since this is a closure directly invoked by the combiner, it should not
// unref the error parameter explicitly; the combiner will do that implicitly
@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
bool needs_close = false ;
INPROC_LOG ( GPR_DEBUG , " read _state_machine %p" , arg ) ;
INPROC_LOG ( GPR_DEBUG , " op _state_machine %p" , arg ) ;
inproc_stream * s = ( inproc_stream * ) arg ;
gpr_mu * mu = & s - > t - > mu - > mu ; // keep aside in case s gets closed
gpr_mu_lock ( mu ) ;
s - > read _closure_scheduled = false ;
s - > op _closure_scheduled = false ;
// cancellation takes precedence
inproc_stream * other = s - > other_side ;
if ( s - > cancel_self_error ! = GRPC_ERROR_NONE ) {
fail_helper_locked ( exec_ctx , s , GRPC_ERROR_REF ( s - > cancel_self_error ) ) ;
goto done ;
@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
goto done ;
}
if ( s - > recv_initial_md_op ) {
if ( ! s - > to_read_initial_md_filled ) {
// We entered the state machine on some other kind of read even though
// we still haven't satisfied initial md . That's an error.
new_err =
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Unexpected frame sequencing " ) ;
INPROC_LOG ( GPR_DEBUG ,
" read_state_machine %p scheduling on_complete errors for no "
" initial md %p " ,
s , new_err ) ;
if ( s - > send_message_op & & other ) {
if ( other - > recv_message_op ) {
message_transfer_locked ( exec_ctx , s , other ) ;
maybe_schedule_op_closure_locked ( exec_ctx , other , GRPC_ERROR_NONE ) ;
} else if ( ! s - > t - > is_client & &
( s - > trailing_md_sent | | other - > recv_trailing_md_op ) ) {
// A server send will never be matched if the client is waiting
// for trailing metadata already
complete_if_batch_end_locked (
exec_ctx , s , GRPC_ERROR_NONE , s - > send_message_op ,
" op_state_machine scheduling send-message-on-complete " ) ;
s - > send_message_op = NULL ;
}
}
// Pause a send trailing metadata if there is still an outstanding
// send message unless we know that the send message will never get
// matched to a receive. This happens on the client if the server has
// already sent status.
if ( s - > send_trailing_md_op & &
( ! s - > send_message_op | |
( s - > t - > is_client & &
( s - > trailing_md_recvd | | s - > to_read_trailing_md_filled ) ) ) ) {
grpc_metadata_batch * dest = ( other = = NULL ) ? & s - > write_buffer_trailing_md
: & other - > to_read_trailing_md ;
bool * destfilled = ( other = = NULL ) ? & s - > write_buffer_trailing_md_filled
: & other - > to_read_trailing_md_filled ;
if ( * destfilled | | s - > trailing_md_sent ) {
// The buffer is already in use; that's an error!
INPROC_LOG ( GPR_DEBUG , " Extra trailing metadata %p " , s ) ;
new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Extra trailing metadata " ) ;
fail_helper_locked ( exec_ctx , s , GRPC_ERROR_REF ( new_err ) ) ;
goto done ;
} else if ( s - > initial_md_recvd ) {
} else {
if ( other & & ! other - > closed ) {
fill_in_metadata ( exec_ctx , s ,
s - > send_trailing_md_op - > payload - > send_trailing_metadata
. send_trailing_metadata ,
0 , dest , NULL , destfilled ) ;
}
s - > trailing_md_sent = true ;
if ( ! s - > t - > is_client & & s - > trailing_md_recvd & & s - > recv_trailing_md_op ) {
INPROC_LOG ( GPR_DEBUG ,
" op_state_machine %p scheduling trailing-md-on-complete " , s ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_trailing_md_op - > on_complete ,
GRPC_ERROR_NONE ) ;
s - > recv_trailing_md_op = NULL ;
needs_close = true ;
}
}
maybe_schedule_op_closure_locked ( exec_ctx , other , GRPC_ERROR_NONE ) ;
complete_if_batch_end_locked (
exec_ctx , s , GRPC_ERROR_NONE , s - > send_trailing_md_op ,
" op_state_machine scheduling send-trailing-metadata-on-complete " ) ;
s - > send_trailing_md_op = NULL ;
}
if ( s - > recv_initial_md_op ) {
if ( s - > initial_md_recvd ) {
new_err =
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Already recvd initial md " ) ;
INPROC_LOG (
GPR_DEBUG ,
" read_state_machine %p scheduling on_complete errors for already "
" op _state_machine %p scheduling on_complete errors for already "
" recvd initial md %p " ,
s , new_err ) ;
fail_helper_locked ( exec_ctx , s , GRPC_ERROR_REF ( new_err ) ) ;
goto done ;
}
s - > initial_md_recvd = true ;
new_err = fill_in_metadata (
exec_ctx , s , & s - > to_read_initial_md , s - > to_read_initial_md_flags ,
s - > recv_initial_md_op - > payload - > recv_initial_metadata
. recv_initial_metadata ,
s - > recv_initial_md_op - > payload - > recv_initial_metadata . recv_flags , NULL ) ;
s - > recv_initial_md_op - > payload - > recv_initial_metadata . recv_initial_metadata
- > deadline = s - > deadline ;
grpc_metadata_batch_clear ( exec_ctx , & s - > to_read_initial_md ) ;
s - > to_read_initial_md_filled = false ;
INPROC_LOG ( GPR_DEBUG ,
" read_state_machine %p scheduling initial-metadata-ready %p " , s ,
new_err ) ;
GRPC_CLOSURE_SCHED ( exec_ctx ,
s - > recv_initial_md_op - > payload - > recv_initial_metadata
. recv_initial_metadata_ready ,
GRPC_ERROR_REF ( new_err ) ) ;
if ( ( s - > recv_initial_md_op ! = s - > recv_message_op ) & &
( s - > recv_initial_md_op ! = s - > recv_trailing_md_op ) ) {
INPROC_LOG (
GPR_DEBUG ,
" read_state_machine %p scheduling initial-metadata-on-complete %p " , s ,
new_err ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_initial_md_op - > on_complete ,
GRPC_ERROR_REF ( new_err ) ) ;
}
s - > recv_initial_md_op = NULL ;
if ( new_err ! = GRPC_ERROR_NONE ) {
if ( s - > to_read_initial_md_filled ) {
s - > initial_md_recvd = true ;
new_err = fill_in_metadata (
exec_ctx , s , & s - > to_read_initial_md , s - > to_read_initial_md_flags ,
s - > recv_initial_md_op - > payload - > recv_initial_metadata
. recv_initial_metadata ,
s - > recv_initial_md_op - > payload - > recv_initial_metadata . recv_flags ,
NULL ) ;
s - > recv_initial_md_op - > payload - > recv_initial_metadata
. recv_initial_metadata - > deadline = s - > deadline ;
grpc_metadata_batch_clear ( exec_ctx , & s - > to_read_initial_md ) ;
s - > to_read_initial_md_filled = false ;
INPROC_LOG ( GPR_DEBUG ,
" read_state_machine %p scheduling on_complete errors2 %p" , s ,
" op_state_machine %p scheduling initial-metadata-ready %p " , s ,
new_err ) ;
fail_helper_locked ( exec_ctx , s , GRPC_ERROR_REF ( new_err ) ) ;
goto done ;
GRPC_CLOSURE_SCHED ( exec_ctx ,
s - > recv_initial_md_op - > payload - > recv_initial_metadata
. recv_initial_metadata_ready ,
GRPC_ERROR_REF ( new_err ) ) ;
complete_if_batch_end_locked (
exec_ctx , s , new_err , s - > recv_initial_md_op ,
" op_state_machine scheduling recv-initial-metadata-on-complete " ) ;
s - > recv_initial_md_op = NULL ;
if ( new_err ! = GRPC_ERROR_NONE ) {
INPROC_LOG ( GPR_DEBUG ,
" op_state_machine %p scheduling on_complete errors2 %p " , s ,
new_err ) ;
fail_helper_locked ( exec_ctx , s , GRPC_ERROR_REF ( new_err ) ) ;
goto done ;
}
}
}
if ( s - > to_read_initial_md_filled ) {
new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Unexpected recv frame " ) ;
fail_helper_locked ( exec_ctx , s , GRPC_ERROR_REF ( new_err ) ) ;
goto done ;
}
if ( ! slice_buffer_list_empty ( & s - > to_read_message ) & & s - > recv_message_op ) {
inproc_slice_byte_stream_init (
& s - > recv_message_stream ,
slice_buffer_list_pophead ( & s - > to_read_message ) ) ;
* s - > recv_message_op - > payload - > recv_message . recv_message =
& s - > recv_message_stream . base ;
INPROC_LOG ( GPR_DEBUG , " read_state_machine %p scheduling message-ready " , s ) ;
GRPC_CLOSURE_SCHED (
exec_ctx , s - > recv_message_op - > payload - > recv_message . recv_message_ready ,
GRPC_ERROR_NONE ) ;
if ( s - > recv_message_op ! = s - > recv_trailing_md_op ) {
INPROC_LOG ( GPR_DEBUG ,
" read_state_machine %p scheduling message-on-complete %p " , s ,
new_err ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_message_op - > on_complete ,
GRPC_ERROR_REF ( new_err ) ) ;
if ( s - > recv_message_op ) {
if ( other & & other - > send_message_op ) {
message_transfer_locked ( exec_ctx , other , s ) ;
maybe_schedule_op_closure_locked ( exec_ctx , other , GRPC_ERROR_NONE ) ;
}
s - > recv_message_op = NULL ;
}
if ( s - > recv_trailing_md_op & & s - > t - > is_client & & other & &
other - > send_message_op ) {
maybe_schedule_op_closure_locked ( exec_ctx , other , GRPC_ERROR_NONE ) ;
}
if ( s - > to_read_trailing_md_filled ) {
if ( s - > trailing_md_recvd ) {
@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Already recvd trailing md " ) ;
INPROC_LOG (
GPR_DEBUG ,
" read _state_machine %p scheduling on_complete errors for already "
" op _state_machine %p scheduling on_complete errors for already "
" recvd trailing md %p " ,
s , new_err ) ;
fail_helper_locked ( exec_ctx , s , GRPC_ERROR_REF ( new_err ) ) ;
@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
if ( s - > recv_message_op ! = NULL ) {
// This message needs to be wrapped up because it will never be
// satisfied
INPROC_LOG ( GPR_DEBUG , " read_state_machine %p scheduling message-ready " ,
s ) ;
INPROC_LOG ( GPR_DEBUG , " op_state_machine %p scheduling message-ready " , s ) ;
GRPC_CLOSURE_SCHED (
exec_ctx ,
s - > recv_message_op - > payload - > recv_message . recv_message_ready ,
GRPC_ERROR_NONE ) ;
if ( s - > recv_message_op ! = s - > recv_trailing_md_op ) {
INPROC_LOG ( GPR_DEBUG ,
" read_state_machine %p scheduling message-on-complete %p " , s ,
new_err ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_message_op - > on_complete ,
GRPC_ERROR_REF ( new_err ) ) ;
}
complete_if_batch_end_locked (
exec_ctx , s , new_err , s - > recv_message_op ,
" op_state_machine scheduling recv-message-on-complete " ) ;
s - > recv_message_op = NULL ;
}
if ( ( s - > trailing_md_sent | | s - > t - > is_client ) & & s - > send_message_op ) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
complete_if_batch_end_locked (
exec_ctx , s , new_err , s - > send_message_op ,
" op_state_machine scheduling send-message-on-complete " ) ;
s - > send_message_op = NULL ;
}
if ( s - > recv_trailing_md_op ! = NULL ) {
// We wanted trailing metadata and we got it
s - > trailing_md_recvd = true ;
@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// (If the server hasn't already sent its trailing md, it doesn't have
// a final status, so don't mark this op complete)
if ( s - > t - > is_client | | s - > trailing_md_sent ) {
INPROC_LOG (
GPR_DEBUG ,
" read_state_machine %p scheduling trailing-md-on-complete %p " , s ,
new_err ) ;
INPROC_LOG ( GPR_DEBUG ,
" op_state_machine %p scheduling trailing-md-on-complete %p " ,
s , new_err ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_trailing_md_op - > on_complete ,
GRPC_ERROR_REF ( new_err ) ) ;
s - > recv_trailing_md_op = NULL ;
needs_close = true ;
} else {
INPROC_LOG ( GPR_DEBUG ,
" read _state_machine %p server needs to delay handling "
" op _state_machine %p server needs to delay handling "
" trailing-md-on-complete %p " ,
s , new_err ) ;
}
} else {
INPROC_LOG (
GPR_DEBUG ,
" read_state_machine %p has trailing md but not yet waiting for it " ,
s ) ;
" op_state_machine %p has trailing md but not yet waiting for it " , s ) ;
}
}
if ( s - > trailing_md_recvd & & s - > recv_message_op ) {
// No further message will come on this stream, so finish off the
// recv_message_op
INPROC_LOG ( GPR_DEBUG , " read _state_machine %p scheduling message-ready" , s ) ;
INPROC_LOG ( GPR_DEBUG , " op _state_machine %p scheduling message-ready" , s ) ;
GRPC_CLOSURE_SCHED (
exec_ctx , s - > recv_message_op - > payload - > recv_message . recv_message_ready ,
GRPC_ERROR_NONE ) ;
if ( s - > recv_message_op ! = s - > recv_trailing_md_op ) {
INPROC_LOG ( GPR_DEBUG ,
" read_state_machine %p scheduling message-on-complete %p " , s ,
new_err ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_message_op - > on_complete ,
GRPC_ERROR_REF ( new_err ) ) ;
}
complete_if_batch_end_locked (
exec_ctx , s , new_err , s - > recv_message_op ,
" op_state_machine scheduling recv-message-on-complete " ) ;
s - > recv_message_op = NULL ;
}
if ( s - > recv_message_op | | s - > recv_trailing_md_op ) {
if ( s - > trailing_md_recvd & & ( s - > trailing_md_sent | | s - > t - > is_client ) & &
s - > send_message_op ) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
complete_if_batch_end_locked (
exec_ctx , s , new_err , s - > send_message_op ,
" op_state_machine scheduling send-message-on-complete " ) ;
s - > send_message_op = NULL ;
}
if ( s - > send_message_op | | s - > send_trailing_md_op | | s - > recv_initial_md_op | |
s - > recv_message_op | | s - > recv_trailing_md_op ) {
// Didn't get the item we wanted so we still need to get
// rescheduled
INPROC_LOG ( GPR_DEBUG , " read_state_machine %p still needs closure %p %p " , s ,
s - > recv_message_op , s - > recv_trailing_md_op ) ;
s - > reads_needed = true ;
INPROC_LOG (
GPR_DEBUG , " op_state_machine %p still needs closure %p %p %p %p %p " , s ,
s - > send_message_op , s - > send_trailing_md_op , s - > recv_initial_md_op ,
s - > recv_message_op , s - > recv_trailing_md_op ) ;
s - > ops_needed = true ;
}
done :
if ( needs_close ) {
close_other_side_locked ( exec_ctx , s , " read_state_machine " ) ;
close_other_side_locked ( exec_ctx , s , " op _state_machine" ) ;
close_stream_locked ( exec_ctx , s ) ;
}
gpr_mu_unlock ( mu ) ;
GRPC_ERROR_UNREF ( new_err ) ;
}
static grpc_closure do_nothing_closure ;
static bool cancel_stream_locked ( grpc_exec_ctx * exec_ctx , inproc_stream * s ,
grpc_error * error ) {
bool ret = false ; // was the cancel accepted
@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if ( s - > cancel_self_error = = GRPC_ERROR_NONE ) {
ret = true ;
s - > cancel_self_error = GRPC_ERROR_REF ( error ) ;
if ( s - > reads_needed ) {
if ( ! s - > read_closure_scheduled ) {
GRPC_CLOSURE_SCHED ( exec_ctx , & s - > read_closure ,
GRPC_ERROR_REF ( s - > cancel_self_error ) ) ;
s - > read_closure_scheduled = true ;
}
s - > reads_needed = false ;
}
maybe_schedule_op_closure_locked ( exec_ctx , s , s - > cancel_self_error ) ;
// Send trailing md to the other side indicating cancellation, even if we
// already have
s - > trailing_md_sent = true ;
@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if ( other - > cancel_other_error = = GRPC_ERROR_NONE ) {
other - > cancel_other_error = GRPC_ERROR_REF ( s - > cancel_self_error ) ;
}
if ( other - > reads_needed ) {
if ( ! other - > read_closure_scheduled ) {
GRPC_CLOSURE_SCHED ( exec_ctx , & other - > read_closure ,
GRPC_ERROR_REF ( other - > cancel_other_error ) ) ;
other - > read_closure_scheduled = true ;
}
other - > reads_needed = false ;
}
maybe_schedule_op_closure_locked ( exec_ctx , other ,
other - > cancel_other_error ) ;
} else if ( s - > write_buffer_cancel_error = = GRPC_ERROR_NONE ) {
s - > write_buffer_cancel_error = GRPC_ERROR_REF ( s - > cancel_self_error ) ;
}
@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if ( ! s - > t - > is_client & & s - > trailing_md_recvd & & s - > recv_trailing_md_op ) {
INPROC_LOG ( GPR_DEBUG ,
" cancel_stream %p scheduling trailing-md-on-complete %p " , s ,
s - > cancel_self_error ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_trailing_md_op - > on_complete ,
GRPC_ERROR_REF ( s - > cancel_self_error ) ) ;
complete_if_batch_end_locked (
exec_ctx , s , s - > cancel_self_error , s - > recv_trailing_md_op ,
" cancel_stream scheduling trailing-md-on-complete " ) ;
s - > recv_trailing_md_op = NULL ;
}
}
@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// already self-canceled so still give it an error
error = GRPC_ERROR_REF ( s - > cancel_self_error ) ;
} else {
INPROC_LOG ( GPR_DEBUG , " perform_stream_op %p%s%s%s%s%s%s " , s ,
INPROC_LOG ( GPR_DEBUG , " perform_stream_op %p %s%s%s%s%s%s%s " , s ,
s - > t - > is_client ? " client " : " server " ,
op - > send_initial_metadata ? " send_initial_metadata " : " " ,
op - > send_message ? " send_message " : " " ,
op - > send_trailing_metadata ? " send_trailing_metadata " : " " ,
@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
bool needs_close = false ;
inproc_stream * other = s - > other_side ;
if ( error = = GRPC_ERROR_NONE & &
( op - > send_initial_metadata | | op - > send_message | |
op - > send_trailing_metadata ) ) {
inproc_stream * other = s - > other_side ;
( op - > send_initial_metadata | | op - > send_trailing_metadata ) ) {
if ( s - > t - > is_closed ) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Endpoint already shutdown " ) ;
}
@ -956,79 +933,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
dest , destflags , destfilled ) ;
}
if ( s - > t - > is_client ) {
gpr_timespec * dl =
grpc_millis * dl =
( other = = NULL ) ? & s - > write_buffer_deadline : & other - > deadline ;
* dl = gpr_time_min ( * dl , op - > payload - > send_initial_metadata
. send_initial_metadata - > deadline ) ;
* dl = GPR_MIN ( * dl , op - > payload - > send_initial_metadata
. send_initial_metadata - > deadline ) ;
s - > initial_md_sent = true ;
}
}
}
if ( error = = GRPC_ERROR_NONE & & op - > send_message ) {
size_t remaining = op - > payload - > send_message . send_message - > length ;
grpc_slice_buffer * dest = slice_buffer_list_append (
( other = = NULL ) ? & s - > write_buffer_message : & other - > to_read_message ) ;
do {
grpc_slice message_slice ;
grpc_closure unused ;
GPR_ASSERT ( grpc_byte_stream_next ( exec_ctx ,
op - > payload - > send_message . send_message ,
SIZE_MAX , & unused ) ) ;
error = grpc_byte_stream_pull (
exec_ctx , op - > payload - > send_message . send_message , & message_slice ) ;
if ( error ! = GRPC_ERROR_NONE ) {
cancel_stream_locked ( exec_ctx , s , GRPC_ERROR_REF ( error ) ) ;
break ;
}
GPR_ASSERT ( error = = GRPC_ERROR_NONE ) ;
remaining - = GRPC_SLICE_LENGTH ( message_slice ) ;
grpc_slice_buffer_add ( dest , message_slice ) ;
} while ( remaining ! = 0 ) ;
grpc_byte_stream_destroy ( exec_ctx ,
op - > payload - > send_message . send_message ) ;
}
if ( error = = GRPC_ERROR_NONE & & op - > send_trailing_metadata ) {
grpc_metadata_batch * dest = ( other = = NULL ) ? & s - > write_buffer_trailing_md
: & other - > to_read_trailing_md ;
bool * destfilled = ( other = = NULL ) ? & s - > write_buffer_trailing_md_filled
: & other - > to_read_trailing_md_filled ;
if ( * destfilled | | s - > trailing_md_sent ) {
// The buffer is already in use; that's an error!
INPROC_LOG ( GPR_DEBUG , " Extra trailing metadata %p " , s ) ;
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Extra trailing metadata " ) ;
} else {
if ( ! other - > closed ) {
fill_in_metadata (
exec_ctx , s ,
op - > payload - > send_trailing_metadata . send_trailing_metadata , 0 ,
dest , NULL , destfilled ) ;
}
s - > trailing_md_sent = true ;
if ( ! s - > t - > is_client & & s - > trailing_md_recvd & &
s - > recv_trailing_md_op ) {
INPROC_LOG ( GPR_DEBUG ,
" perform_stream_op %p scheduling trailing-md-on-complete " ,
s ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > recv_trailing_md_op - > on_complete ,
GRPC_ERROR_NONE ) ;
s - > recv_trailing_md_op = NULL ;
needs_close = true ;
}
}
}
if ( other ! = NULL & & other - > reads_needed ) {
if ( ! other - > read_closure_scheduled ) {
GRPC_CLOSURE_SCHED ( exec_ctx , & other - > read_closure , error ) ;
other - > read_closure_scheduled = true ;
}
other - > reads_needed = false ;
maybe_schedule_op_closure_locked ( exec_ctx , other , error ) ;
}
}
if ( error = = GRPC_ERROR_NONE & &
( op - > recv_initial_metadata | | op - > recv_message | |
( op - > send_message | | op - > send_trailing_metadata | |
op - > recv_initial_metadata | | op - > recv_message | |
op - > recv_trailing_metadata ) ) {
// If there are any reads, mark it so that the read closure will react to
// them
// Mark ops that need to be processed by the closure
if ( op - > send_message ) {
s - > send_message_op = op ;
}
if ( op - > send_trailing_metadata ) {
s - > send_trailing_md_op = op ;
}
if ( op - > recv_initial_metadata ) {
s - > recv_initial_md_op = op ;
}
@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
// We want to initiate the closure if:
// 1. There is initial metadata and something ready to take that
// 2. There is a message and something ready to take it
// 3. There is trailing metadata, even if nothing specifically wants
// that because that can shut down the message as well
if ( ( s - > to_read_initial_md_filled & & op - > recv_initial_metadata ) | |
( ( ! slice_buffer_list_empty ( & s - > to_read_message ) | |
s - > trailing_md_recvd ) & &
op - > recv_message ) | |
( s - > to_read_trailing_md_filled ) ) {
if ( ! s - > read_closure_scheduled ) {
GRPC_CLOSURE_SCHED ( exec_ctx , & s - > read_closure , GRPC_ERROR_NONE ) ;
s - > read_closure_scheduled = true ;
// 1. We want to send a message and the other side wants to receive or end
// 2. We want to send trailing metadata and there isn't an unmatched send
// 3. We want initial metadata and the other side has sent it
// 4. We want to receive a message and there is a message ready
// 5. There is trailing metadata, even if nothing specifically wants
// that because that can shut down the receive message as well
if ( ( op - > send_message & & other & & ( ( other - > recv_message_op ! = NULL ) | |
( other - > recv_trailing_md_op ! = NULL ) ) ) | |
( op - > send_trailing_metadata & & ! op - > send_message ) | |
( op - > recv_initial_metadata & & s - > to_read_initial_md_filled ) | |
( op - > recv_message & & ( other & & other - > send_message_op ! = NULL ) ) | |
( s - > to_read_trailing_md_filled | | s - > trailing_md_recvd ) ) {
if ( ! s - > op_closure_scheduled ) {
GRPC_CLOSURE_SCHED ( exec_ctx , & s - > op_closure , GRPC_ERROR_NONE ) ;
s - > op_closure_scheduled = true ;
}
} else {
s - > reads_needed = true ;
s - > op s_needed = true ;
}
} else {
if ( error ! = GRPC_ERROR_NONE ) {
// Schedule op's read closures that we didn't push to read state machine
// Schedule op's closures that we didn't push to op state machine
if ( op - > recv_initial_metadata ) {
INPROC_LOG (
GPR_DEBUG ,