@ -111,9 +111,6 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked ( grpc_exec_ctx * exec_ctx ,
void * byte_stream ,
grpc_error * error_ignored ) ;
static void fail_pending_writes ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t , grpc_chttp2_stream * s ,
grpc_error * error ) ;
static void benign_reclaimer ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
@ -428,6 +425,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream * s ;
while ( grpc_chttp2_list_pop_writable_stream ( t , & s ) ) {
grpc_chttp2_leave_writing_lists ( exec_ctx , t , s ) ;
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , s , " chttp2_writing:close " ) ;
}
end_all_the_calls ( exec_ctx , t , GRPC_ERROR_REF ( error ) ) ;
@ -523,6 +521,10 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
}
if ( s - > fail_pending_writes_on_writes_finished_error ! = NULL ) {
GRPC_ERROR_UNREF ( s - > fail_pending_writes_on_writes_finished_error ) ;
}
GPR_ASSERT ( s - > send_initial_metadata_finished = = NULL ) ;
GPR_ASSERT ( s - > fetching_send_message = = NULL ) ;
GPR_ASSERT ( s - > send_trailing_metadata_finished = = NULL ) ;
@ -704,8 +706,6 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
}
}
grpc_chttp2_end_write ( exec_ctx , t , GRPC_ERROR_REF ( error ) ) ;
switch ( t - > write_state ) {
case GRPC_CHTTP2_WRITE_STATE_IDLE :
GPR_UNREACHABLE_CODE ( break ) ;
@ -734,6 +734,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
break ;
}
grpc_chttp2_end_write ( exec_ctx , t , GRPC_ERROR_REF ( error ) ) ;
GRPC_CHTTP2_UNREF_TRANSPORT ( exec_ctx , t , " writing " ) ;
GPR_TIMER_END ( " terminate_writing_with_lock " , 0 ) ;
}
@ -1388,6 +1390,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
if ( grpc_chttp2_list_remove_writable_stream ( t , s ) ) {
grpc_chttp2_leave_writing_lists ( exec_ctx , t , s ) ;
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , s , " chttp2_writing:remove_stream " ) ;
}
@ -1518,9 +1521,41 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error ;
}
static void fail_pending_writes ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t , grpc_chttp2_stream * s ,
grpc_error * error ) {
void grpc_chttp2_leave_writing_lists ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_stream * s ) {
if ( s - > need_fail_pending_writes_on_writes_finished ) {
grpc_error * error = s - > fail_pending_writes_on_writes_finished_error ;
s - > fail_pending_writes_on_writes_finished_error = NULL ;
s - > need_fail_pending_writes_on_writes_finished = false ;
grpc_chttp2_fail_pending_writes ( exec_ctx , t , s , error ) ;
}
}
void grpc_chttp2_fail_pending_writes ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_stream * s , grpc_error * error ) {
if ( s - > need_fail_pending_writes_on_writes_finished | |
( t - > write_state ! = GRPC_CHTTP2_WRITE_STATE_IDLE & &
( s - > included [ GRPC_CHTTP2_LIST_WRITABLE ] | |
s - > included [ GRPC_CHTTP2_LIST_WRITING ] ) ) ) {
/* If a write is in progress, and it involves this stream, wait for the
* write to complete before cancelling things out . If we don ' t do this , then
* our combiner lock might think that some operation on its queue might be
* covering a completion even though there is none , in which case we might
* offload to another thread , which isn ' t guarateed to exist */
if ( error ! = GRPC_ERROR_NONE ) {
if ( s - > fail_pending_writes_on_writes_finished_error = = GRPC_ERROR_NONE ) {
s - > fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE (
" Post-poned fail writes due to in-progress write " ) ;
}
s - > fail_pending_writes_on_writes_finished_error = grpc_error_add_child (
s - > fail_pending_writes_on_writes_finished_error , error ) ;
}
s - > need_fail_pending_writes_on_writes_finished = true ;
return ; /* early out */
}
error =
removal_error ( error , s , " Pending writes failed due to stream closure " ) ;
s - > send_initial_metadata = NULL ;
@ -1529,6 +1564,12 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
" send_initial_metadata_finished " ) ;
s - > send_trailing_metadata = NULL ;
if ( s - > send_trailing_metadata_finished ) {
const char * why = grpc_error_string ( error ) ;
gpr_log ( GPR_DEBUG , " cancel send_trailing_metadata: writing=%d %s " ,
t - > write_state , why ) ;
grpc_error_free_string ( why ) ;
}
grpc_chttp2_complete_closure_step (
exec_ctx , t , s , & s - > send_trailing_metadata_finished ,
GRPC_ERROR_REF ( error ) , " send_trailing_metadata_finished " ) ;
@ -1574,7 +1615,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
if ( close_writes & & ! s - > write_closed ) {
s - > write_closed_error = GRPC_ERROR_REF ( error ) ;
s - > write_closed = true ;
fail_pending_writes ( exec_ctx , t , s , GRPC_ERROR_REF ( error ) ) ;
grpc_chttp2_ fail_pending_writes( exec_ctx , t , s , GRPC_ERROR_REF ( error ) ) ;
grpc_chttp2_maybe_complete_recv_trailing_metadata ( exec_ctx , t , s ) ;
}
if ( s - > read_closed & & s - > write_closed ) {