|
|
|
@ -142,7 +142,8 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_stream *s, |
|
|
|
|
void *byte_stream); |
|
|
|
|
static void fail_pending_writes(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_stream_global *stream_global); |
|
|
|
|
grpc_chttp2_stream_global *stream_global, |
|
|
|
|
grpc_error *error); |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* CONSTRUCTION/DESTRUCTION/REFCOUNTING |
|
|
|
@ -746,7 +747,7 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_stream_global *stream_global; |
|
|
|
|
while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, |
|
|
|
|
&stream_global)) { |
|
|
|
|
fail_pending_writes(exec_ctx, stream_global); |
|
|
|
|
fail_pending_writes(exec_ctx, stream_global, grpc_error_ref(error)); |
|
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -844,34 +845,36 @@ static void maybe_start_some_streams( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#define CLOSURE_BARRIER_STATS_BIT (1 << 0) |
|
|
|
|
#define CLOSURE_BARRIER_FAILURE_BIT (1 << 1) |
|
|
|
|
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) |
|
|
|
|
|
|
|
|
|
static grpc_closure *add_closure_barrier(grpc_closure *closure) { |
|
|
|
|
closure->final_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT; |
|
|
|
|
closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT; |
|
|
|
|
return closure; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_stream_global *stream_global, |
|
|
|
|
grpc_closure **pclosure, int success) { |
|
|
|
|
grpc_closure **pclosure, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_closure *closure = *pclosure; |
|
|
|
|
if (closure == NULL) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
closure->final_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; |
|
|
|
|
if (!success) { |
|
|
|
|
closure->final_data.scratch |= CLOSURE_BARRIER_FAILURE_BIT; |
|
|
|
|
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
if (closure->error == GRPC_ERROR_NONE) { |
|
|
|
|
closure->error = |
|
|
|
|
GRPC_ERROR_CREATE("Error in HTTP transport completing operation"); |
|
|
|
|
} |
|
|
|
|
closure->error = grpc_error_add_child(closure->error, error); |
|
|
|
|
} |
|
|
|
|
if (closure->final_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) { |
|
|
|
|
if (closure->final_data.scratch & CLOSURE_BARRIER_STATS_BIT) { |
|
|
|
|
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) { |
|
|
|
|
if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) { |
|
|
|
|
grpc_transport_move_stats(&stream_global->stats, |
|
|
|
|
stream_global->collecting_stats); |
|
|
|
|
stream_global->collecting_stats = NULL; |
|
|
|
|
} |
|
|
|
|
grpc_exec_ctx_push( |
|
|
|
|
exec_ctx, closure, |
|
|
|
|
(closure->final_data.scratch & CLOSURE_BARRIER_FAILURE_BIT) == 0, NULL); |
|
|
|
|
grpc_exec_ctx_push(exec_ctx, closure, closure->error, NULL); |
|
|
|
|
} |
|
|
|
|
*pclosure = NULL; |
|
|
|
|
} |
|
|
|
@ -906,12 +909,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
/* use final_data as a barrier until enqueue time; the inital counter is
|
|
|
|
|
dropped at the end of this function */ |
|
|
|
|
on_complete->final_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; |
|
|
|
|
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; |
|
|
|
|
|
|
|
|
|
if (op->collect_stats != NULL) { |
|
|
|
|
GPR_ASSERT(stream_global->collecting_stats == NULL); |
|
|
|
|
stream_global->collecting_stats = op->collect_stats; |
|
|
|
|
on_complete->final_data.scratch |= CLOSURE_BARRIER_STATS_BIT; |
|
|
|
|
on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (op->cancel_with_status != GRPC_STATUS_OK) { |
|
|
|
@ -946,7 +949,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} else { |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, stream_global, |
|
|
|
|
&stream_global->send_initial_metadata_finished, 0); |
|
|
|
|
&stream_global->send_initial_metadata_finished, |
|
|
|
|
GRPC_ERROR_CREATE( |
|
|
|
|
"Attempt to send initial metadata after stream was closed")); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -956,7 +961,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
stream_global->send_message_finished = add_closure_barrier(on_complete); |
|
|
|
|
if (stream_global->write_closed) { |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, stream_global, &stream_global->send_message_finished, 0); |
|
|
|
|
exec_ctx, stream_global, &stream_global->send_message_finished, |
|
|
|
|
GRPC_ERROR_CREATE("Attempt to send message after stream was closed")); |
|
|
|
|
} else { |
|
|
|
|
stream_global->send_message = op->send_message; |
|
|
|
|
if (stream_global->id != 0) { |
|
|
|
@ -978,7 +984,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, stream_global, |
|
|
|
|
&stream_global->send_trailing_metadata_finished, |
|
|
|
|
grpc_metadata_batch_is_empty(op->send_trailing_metadata)); |
|
|
|
|
grpc_metadata_batch_is_empty(op->send_trailing_metadata) |
|
|
|
|
? GRPC_ERROR_NONE |
|
|
|
|
: GRPC_ERROR_CREATE("Attempt to send trailing metadata after " |
|
|
|
|
"stream was closed")); |
|
|
|
|
} else if (stream_global->id != 0) { |
|
|
|
|
/* TODO(ctiller): check if there's flow control for any outstanding
|
|
|
|
|
bytes before going writable */ |
|
|
|
@ -1016,7 +1025,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_chttp2_complete_closure_step(exec_ctx, stream_global, &on_complete, 1); |
|
|
|
|
grpc_chttp2_complete_closure_step(exec_ctx, stream_global, &on_complete, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
|
|
|
|
|
GPR_TIMER_END("perform_stream_op_locked", 0); |
|
|
|
|
} |
|
|
|
@ -1185,7 +1195,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, |
|
|
|
|
stream_global->recv_trailing_metadata); |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, stream_global, |
|
|
|
|
&stream_global->recv_trailing_metadata_finished, 1); |
|
|
|
|
&stream_global->recv_trailing_metadata_finished, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1256,8 +1266,10 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, |
|
|
|
|
stream_global->seen_error = 1; |
|
|
|
|
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, |
|
|
|
|
1); |
|
|
|
|
grpc_chttp2_mark_stream_closed( |
|
|
|
|
exec_ctx, transport_global, stream_global, 1, 1, |
|
|
|
|
grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"), |
|
|
|
|
GRPC_ERROR_INT_GRPC_STATUS, status)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, |
|
|
|
@ -1298,23 +1310,25 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fail_pending_writes(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_stream_global *stream_global) { |
|
|
|
|
grpc_chttp2_stream_global *stream_global, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, stream_global, &stream_global->send_initial_metadata_finished, |
|
|
|
|
0); |
|
|
|
|
grpc_error_ref(error)); |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, stream_global, &stream_global->send_trailing_metadata_finished, |
|
|
|
|
0); |
|
|
|
|
grpc_chttp2_complete_closure_step(exec_ctx, stream_global, |
|
|
|
|
&stream_global->send_message_finished, 0); |
|
|
|
|
grpc_error_ref(error)); |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, stream_global, &stream_global->send_message_finished, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_chttp2_mark_stream_closed( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, |
|
|
|
|
grpc_chttp2_stream_global *stream_global, int close_reads, |
|
|
|
|
int close_writes) { |
|
|
|
|
grpc_chttp2_stream_global *stream_global, int close_reads, int close_writes, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
if (stream_global->read_closed && stream_global->write_closed) { |
|
|
|
|
/* already closed */ |
|
|
|
|
grpc_error_unref(error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); |
|
|
|
@ -1331,7 +1345,7 @@ void grpc_chttp2_mark_stream_closed( |
|
|
|
|
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, |
|
|
|
|
stream_global); |
|
|
|
|
} else { |
|
|
|
|
fail_pending_writes(exec_ctx, stream_global); |
|
|
|
|
fail_pending_writes(exec_ctx, stream_global, grpc_error_ref(error)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (stream_global->read_closed && stream_global->write_closed) { |
|
|
|
@ -1347,6 +1361,7 @@ void grpc_chttp2_mark_stream_closed( |
|
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
grpc_error_unref(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void close_from_api(grpc_exec_ctx *exec_ctx, |
|
|
|
@ -1451,8 +1466,16 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, |
|
|
|
|
optional_message); |
|
|
|
|
grpc_error *err = GRPC_ERROR_CREATE("Stream closed"); |
|
|
|
|
err = grpc_error_set_int(err, GRPC_ERROR_INT_GRPC_STATUS, status); |
|
|
|
|
if (optional_message) { |
|
|
|
|
char *str = |
|
|
|
|
gpr_dump_slice(*optional_message, GPR_DUMP_HEX | GPR_DUMP_ASCII); |
|
|
|
|
err = grpc_error_set_str(err, GRPC_ERROR_STR_GRPC_MESSAGE, str); |
|
|
|
|
gpr_free(str); |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, |
|
|
|
|
1); |
|
|
|
|
1, err); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, |
|
|
|
|