@ -425,7 +425,6 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream * s ;
while ( grpc_chttp2_list_pop_writable_stream ( t , & s ) ) {
grpc_chttp2_leave_writing_lists ( exec_ctx , t , s ) ;
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , s , " chttp2_writing:close " ) ;
}
end_all_the_calls ( exec_ctx , t , GRPC_ERROR_REF ( error ) ) ;
@ -521,10 +520,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
}
if ( s - > fail_pending_writes_on_writes_finished_error ! = NULL ) {
GRPC_ERROR_UNREF ( s - > fail_pending_writes_on_writes_finished_error ) ;
}
GPR_ASSERT ( s - > send_initial_metadata_finished = = NULL ) ;
GPR_ASSERT ( s - > fetching_send_message = = NULL ) ;
GPR_ASSERT ( s - > send_trailing_metadata_finished = = NULL ) ;
@ -826,6 +821,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
}
# define CLOSURE_BARRIER_STATS_BIT (1 << 0)
# define CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE (1 << 1)
# define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure * add_closure_barrier ( grpc_closure * closure ) {
@ -852,6 +848,16 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
return ;
}
closure - > next_data . scratch - = CLOSURE_BARRIER_FIRST_REF_BIT ;
if ( grpc_http_trace ) {
const char * errstr = grpc_error_string ( error ) ;
gpr_log ( GPR_DEBUG ,
" complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s " ,
closure ,
( int ) ( closure - > next_data . scratch / CLOSURE_BARRIER_FIRST_REF_BIT ) ,
( int ) ( closure - > next_data . scratch % CLOSURE_BARRIER_FIRST_REF_BIT ) ,
desc , errstr ) ;
grpc_error_free_string ( errstr ) ;
}
if ( error ! = GRPC_ERROR_NONE ) {
if ( closure - > error_data . error = = GRPC_ERROR_NONE ) {
closure - > error_data . error =
@ -868,7 +874,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_transport_move_stats ( & s - > stats , s - > collecting_stats ) ;
s - > collecting_stats = NULL ;
}
grpc_closure_run ( exec_ctx , closure , closure - > error_data . error ) ;
if ( t - > write_state = = GRPC_CHTTP2_WRITE_STATE_IDLE | |
( closure - > next_data . scratch & CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE ) = =
0 ) {
grpc_closure_run ( exec_ctx , closure , closure - > error_data . error ) ;
} else {
grpc_closure_list_append ( & t - > run_after_write , closure ,
closure - > error_data . error ) ;
}
}
}
@ -1013,6 +1026,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if ( op - > send_initial_metadata ! = NULL ) {
GPR_ASSERT ( s - > send_initial_metadata_finished = = NULL ) ;
on_complete - > next_data . scratch | = CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE ;
s - > send_initial_metadata_finished = add_closure_barrier ( on_complete ) ;
s - > send_initial_metadata = op - > send_initial_metadata ;
const size_t metadata_size =
@ -1066,6 +1080,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if ( op - > send_message ! = NULL ) {
on_complete - > next_data . scratch | = CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE ;
s - > fetching_send_message_finished = add_closure_barrier ( op - > on_complete ) ;
if ( s - > write_closed ) {
grpc_chttp2_complete_closure_step (
@ -1103,6 +1118,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if ( op - > send_trailing_metadata ! = NULL ) {
GPR_ASSERT ( s - > send_trailing_metadata_finished = = NULL ) ;
on_complete - > next_data . scratch | = CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE ;
s - > send_trailing_metadata_finished = add_closure_barrier ( on_complete ) ;
s - > send_trailing_metadata = op - > send_trailing_metadata ;
const size_t metadata_size =
@ -1406,7 +1422,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
if ( grpc_chttp2_list_remove_writable_stream ( t , s ) ) {
grpc_chttp2_leave_writing_lists ( exec_ctx , t , s ) ;
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , s , " chttp2_writing:remove_stream " ) ;
}
@ -1537,41 +1552,9 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error ;
}
void grpc_chttp2_leave_writing_lists ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_stream * s ) {
if ( s - > need_fail_pending_writes_on_writes_finished ) {
grpc_error * error = s - > fail_pending_writes_on_writes_finished_error ;
s - > fail_pending_writes_on_writes_finished_error = NULL ;
s - > need_fail_pending_writes_on_writes_finished = false ;
grpc_chttp2_fail_pending_writes ( exec_ctx , t , s , error ) ;
}
}
void grpc_chttp2_fail_pending_writes ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_stream * s , grpc_error * error ) {
if ( s - > need_fail_pending_writes_on_writes_finished | |
( t - > write_state ! = GRPC_CHTTP2_WRITE_STATE_IDLE & &
( s - > included [ GRPC_CHTTP2_LIST_WRITABLE ] | |
s - > included [ GRPC_CHTTP2_LIST_WRITING ] ) ) ) {
/* If a write is in progress, and it involves this stream, wait for the
* write to complete before cancelling things out . If we don ' t do this , then
* our combiner lock might think that some operation on its queue might be
* covering a completion even though there is none , in which case we might
* offload to another thread , which isn ' t guarateed to exist */
if ( error ! = GRPC_ERROR_NONE ) {
if ( s - > fail_pending_writes_on_writes_finished_error = = GRPC_ERROR_NONE ) {
s - > fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE (
" Post-poned fail writes due to in-progress write " ) ;
}
s - > fail_pending_writes_on_writes_finished_error = grpc_error_add_child (
s - > fail_pending_writes_on_writes_finished_error , error ) ;
}
s - > need_fail_pending_writes_on_writes_finished = true ;
return ; /* early out */
}
error =
removal_error ( error , s , " Pending writes failed due to stream closure " ) ;
s - > send_initial_metadata = NULL ;