|
|
|
@ -84,8 +84,6 @@ grpc_tracer_flag grpc_trace_chttp2_refcount = |
|
|
|
|
GRPC_TRACER_INITIALIZER(false, "chttp2_refcount"); |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
static const grpc_transport_vtable vtable; |
|
|
|
|
|
|
|
|
|
/* forward declarations of various callbacks that we'll build closures around */ |
|
|
|
|
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t, |
|
|
|
|
grpc_error *error); |
|
|
|
@ -248,6 +246,8 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
static const grpc_transport_vtable *get_vtable(void); |
|
|
|
|
|
|
|
|
|
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|
const grpc_channel_args *channel_args, |
|
|
|
|
grpc_endpoint *ep, bool is_client) { |
|
|
|
@ -257,7 +257,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == |
|
|
|
|
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); |
|
|
|
|
|
|
|
|
|
t->base.vtable = &vtable; |
|
|
|
|
t->base.vtable = get_vtable(); |
|
|
|
|
t->ep = ep; |
|
|
|
|
/* one ref is for destroy */ |
|
|
|
|
gpr_ref_init(&t->refs, 1); |
|
|
|
@ -542,11 +542,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&t->write_action, write_action, t, |
|
|
|
|
t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT |
|
|
|
|
? grpc_executor_scheduler |
|
|
|
|
: grpc_schedule_on_exec_ctx); |
|
|
|
|
|
|
|
|
|
t->ping_state.pings_before_data_required = |
|
|
|
|
t->ping_policy.max_pings_without_data; |
|
|
|
|
t->ping_state.is_delayed_ping_timer_set = false; |
|
|
|
@ -573,7 +568,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|
|
|
|
|
|
static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = tp; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; |
|
|
|
|
t->destroying = 1; |
|
|
|
|
close_transport_locked( |
|
|
|
|
exec_ctx, t, |
|
|
|
@ -699,7 +694,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
|
|
|
|
|
|
|
|
|
static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_stream *s = sp; |
|
|
|
|
grpc_chttp2_stream *s = (grpc_chttp2_stream *)sp; |
|
|
|
|
grpc_chttp2_transport *t = s->t; |
|
|
|
|
|
|
|
|
|
GPR_TIMER_BEGIN("destroy_stream", 0); |
|
|
|
@ -783,7 +778,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
|
|
|
|
|
|
|
|
|
grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t, |
|
|
|
|
uint32_t id) { |
|
|
|
|
return grpc_chttp2_stream_map_find(&t->stream_map, id); |
|
|
|
|
return (grpc_chttp2_stream *)grpc_chttp2_stream_map_find(&t->stream_map, id); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, |
|
|
|
@ -842,6 +837,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, |
|
|
|
|
switch (t->write_state) { |
|
|
|
|
case GRPC_CHTTP2_WRITE_STATE_IDLE: |
|
|
|
|
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); |
|
|
|
|
t->is_first_write_in_batch = true; |
|
|
|
|
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); |
|
|
|
|
GRPC_CLOSURE_SCHED( |
|
|
|
|
exec_ctx, |
|
|
|
@ -860,52 +856,100 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GPR_TIMER_END("grpc_chttp2_initiate_write", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_chttp2_become_writable( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, |
|
|
|
|
grpc_chttp2_stream_write_type stream_write_type, const char *reason) { |
|
|
|
|
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_transport *t, |
|
|
|
|
grpc_chttp2_stream *s, |
|
|
|
|
bool also_initiate_write, const char *reason) { |
|
|
|
|
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { |
|
|
|
|
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); |
|
|
|
|
} |
|
|
|
|
switch (stream_write_type) { |
|
|
|
|
case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK: |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED: |
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, t, reason); |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED: |
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, t, reason); |
|
|
|
|
break; |
|
|
|
|
if (also_initiate_write) { |
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, t, reason); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t, |
|
|
|
|
bool early_results_scheduled, |
|
|
|
|
bool partial_write) { |
|
|
|
|
/* if it's not the first write in a batch, always offload to the executor:
|
|
|
|
|
we'll probably end up queuing against the kernel anyway, so we'll likely |
|
|
|
|
get better latency overall if we switch writing work elsewhere and continue |
|
|
|
|
with application work above */ |
|
|
|
|
if (!t->is_first_write_in_batch) { |
|
|
|
|
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); |
|
|
|
|
} |
|
|
|
|
/* equivalently, if it's a partial write, we *know* we're going to be taking a
|
|
|
|
|
thread jump to write it because of the above, may as well do so |
|
|
|
|
immediately */ |
|
|
|
|
if (partial_write) { |
|
|
|
|
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); |
|
|
|
|
} |
|
|
|
|
switch (t->opt_target) { |
|
|
|
|
case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT: |
|
|
|
|
/* executor gives us the largest probability of being able to batch a
|
|
|
|
|
* write with others on this transport */ |
|
|
|
|
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); |
|
|
|
|
case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY: |
|
|
|
|
return grpc_schedule_on_exec_ctx; |
|
|
|
|
} |
|
|
|
|
GPR_UNREACHABLE_CODE(return NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i)) |
|
|
|
|
static const char *begin_writing_desc(bool partial, bool inlined) { |
|
|
|
|
switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) { |
|
|
|
|
case WRITE_STATE_TUPLE_TO_INT(false, false): |
|
|
|
|
return "begin write in background"; |
|
|
|
|
case WRITE_STATE_TUPLE_TO_INT(false, true): |
|
|
|
|
return "begin write in current thread"; |
|
|
|
|
case WRITE_STATE_TUPLE_TO_INT(true, false): |
|
|
|
|
return "begin partial write in background"; |
|
|
|
|
case WRITE_STATE_TUPLE_TO_INT(true, true): |
|
|
|
|
return "begin partial write in current thread"; |
|
|
|
|
} |
|
|
|
|
GPR_UNREACHABLE_CODE(return "bad state tuple"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
GPR_TIMER_BEGIN("write_action_begin_locked", 0); |
|
|
|
|
grpc_chttp2_transport *t = gt; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; |
|
|
|
|
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); |
|
|
|
|
switch (t->closed ? GRPC_CHTTP2_NOTHING_TO_WRITE |
|
|
|
|
: grpc_chttp2_begin_write(exec_ctx, t)) { |
|
|
|
|
case GRPC_CHTTP2_NOTHING_TO_WRITE: |
|
|
|
|
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, |
|
|
|
|
"begin writing nothing"); |
|
|
|
|
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHTTP2_PARTIAL_WRITE: |
|
|
|
|
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, |
|
|
|
|
"begin writing partial"); |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE); |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHTTP2_FULL_WRITE: |
|
|
|
|
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, |
|
|
|
|
"begin writing"); |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE); |
|
|
|
|
break; |
|
|
|
|
grpc_chttp2_begin_write_result r; |
|
|
|
|
if (t->closed) { |
|
|
|
|
r.writing = false; |
|
|
|
|
} else { |
|
|
|
|
r = grpc_chttp2_begin_write(exec_ctx, t); |
|
|
|
|
} |
|
|
|
|
if (r.writing) { |
|
|
|
|
if (r.partial) { |
|
|
|
|
GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx); |
|
|
|
|
} |
|
|
|
|
if (!t->is_first_write_in_batch) { |
|
|
|
|
GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(exec_ctx); |
|
|
|
|
} |
|
|
|
|
grpc_closure_scheduler *scheduler = |
|
|
|
|
write_scheduler(t, r.early_results_scheduled, r.partial); |
|
|
|
|
if (scheduler != grpc_schedule_on_exec_ctx) { |
|
|
|
|
GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(exec_ctx); |
|
|
|
|
} |
|
|
|
|
set_write_state( |
|
|
|
|
exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE |
|
|
|
|
: GRPC_CHTTP2_WRITE_STATE_WRITING, |
|
|
|
|
begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx)); |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action, |
|
|
|
|
write_action, t, scheduler), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, |
|
|
|
|
"begin writing nothing"); |
|
|
|
|
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); |
|
|
|
|
} |
|
|
|
|
GPR_TIMER_END("write_action_begin_locked", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = gt; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; |
|
|
|
|
GPR_TIMER_BEGIN("write_action", 0); |
|
|
|
|
grpc_endpoint_write( |
|
|
|
|
exec_ctx, t->ep, &t->outbuf, |
|
|
|
@ -917,7 +961,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { |
|
|
|
|
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
GPR_TIMER_BEGIN("terminate_writing_with_lock", 0); |
|
|
|
|
grpc_chttp2_transport *t = tp; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; |
|
|
|
|
|
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); |
|
|
|
@ -942,7 +986,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: |
|
|
|
|
GPR_TIMER_MARK("state=writing_stale_no_poller", 0); |
|
|
|
|
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, |
|
|
|
|
"continue writing [!covered]"); |
|
|
|
|
"continue writing"); |
|
|
|
|
t->is_first_write_in_batch = false; |
|
|
|
|
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); |
|
|
|
|
GRPC_CLOSURE_RUN( |
|
|
|
|
exec_ctx, |
|
|
|
@ -1042,9 +1087,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); |
|
|
|
|
post_destructive_reclaimer(exec_ctx, t); |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, |
|
|
|
|
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, |
|
|
|
|
"new_stream"); |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); |
|
|
|
|
} |
|
|
|
|
/* cancel out streams that will never be started */ |
|
|
|
|
while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && |
|
|
|
@ -1093,12 +1136,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, |
|
|
|
|
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_http_trace)) { |
|
|
|
|
const char *errstr = grpc_error_string(error); |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s", |
|
|
|
|
closure, |
|
|
|
|
(int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT), |
|
|
|
|
(int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), |
|
|
|
|
desc, errstr); |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_DEBUG, |
|
|
|
|
"complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s " |
|
|
|
|
"write_state=%s", |
|
|
|
|
t, closure, |
|
|
|
|
(int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT), |
|
|
|
|
(int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), desc, |
|
|
|
|
errstr, write_state_name(t->write_state)); |
|
|
|
|
} |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
if (closure->error_data.error == GRPC_ERROR_NONE) { |
|
|
|
@ -1139,9 +1184,7 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_stream *s) { |
|
|
|
|
if (s->id != 0 && (!s->write_buffering || |
|
|
|
|
s->flow_controlled_buffer.length > t->write_buffer_size)) { |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, |
|
|
|
|
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, |
|
|
|
|
"op.send_message"); |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1173,15 +1216,19 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} else { |
|
|
|
|
grpc_chttp2_write_cb *cb = t->write_cb_pool; |
|
|
|
|
if (cb == NULL) { |
|
|
|
|
cb = gpr_malloc(sizeof(*cb)); |
|
|
|
|
cb = (grpc_chttp2_write_cb *)gpr_malloc(sizeof(*cb)); |
|
|
|
|
} else { |
|
|
|
|
t->write_cb_pool = cb->next; |
|
|
|
|
} |
|
|
|
|
cb->call_at_byte = notify_offset; |
|
|
|
|
cb->closure = s->fetching_send_message_finished; |
|
|
|
|
s->fetching_send_message_finished = NULL; |
|
|
|
|
cb->next = s->on_write_finished_cbs; |
|
|
|
|
s->on_write_finished_cbs = cb; |
|
|
|
|
grpc_chttp2_write_cb **list = |
|
|
|
|
s->fetching_send_message->flags & GRPC_WRITE_THROUGH |
|
|
|
|
? &s->on_write_finished_cbs |
|
|
|
|
: &s->on_flow_controlled_cbs; |
|
|
|
|
cb->next = *list; |
|
|
|
|
*list = cb; |
|
|
|
|
} |
|
|
|
|
s->fetching_send_message = NULL; |
|
|
|
|
return; /* early out */ |
|
|
|
@ -1201,7 +1248,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_stream *s = gs; |
|
|
|
|
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; |
|
|
|
|
grpc_chttp2_transport *t = s->t; |
|
|
|
|
if (error == GRPC_ERROR_NONE) { |
|
|
|
|
error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, |
|
|
|
@ -1236,8 +1283,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
GPR_TIMER_BEGIN("perform_stream_op_locked", 0); |
|
|
|
|
|
|
|
|
|
grpc_transport_stream_op_batch *op = stream_op; |
|
|
|
|
grpc_chttp2_stream *s = op->handler_private.extra_arg; |
|
|
|
|
grpc_transport_stream_op_batch *op = |
|
|
|
|
(grpc_transport_stream_op_batch *)stream_op; |
|
|
|
|
grpc_chttp2_stream *s = (grpc_chttp2_stream *)op->handler_private.extra_arg; |
|
|
|
|
grpc_transport_stream_op_batch_payload *op_payload = op->payload; |
|
|
|
|
grpc_chttp2_transport *t = s->t; |
|
|
|
|
|
|
|
|
@ -1290,7 +1338,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, |
|
|
|
|
if ((s->stream_compression_send_enabled = |
|
|
|
|
(op_payload->send_initial_metadata.send_initial_metadata->idx.named |
|
|
|
|
.content_encoding != NULL)) == true) { |
|
|
|
|
s->compressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer)); |
|
|
|
|
s->compressed_data_buffer = |
|
|
|
|
(grpc_slice_buffer *)gpr_malloc(sizeof(grpc_slice_buffer)); |
|
|
|
|
grpc_slice_buffer_init(s->compressed_data_buffer); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1336,14 +1385,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT(s->id != 0); |
|
|
|
|
grpc_chttp2_stream_write_type write_type = |
|
|
|
|
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; |
|
|
|
|
bool initiate_write = true; |
|
|
|
|
if (op->send_message && |
|
|
|
|
(op->payload->send_message.send_message->flags & |
|
|
|
|
GRPC_WRITE_BUFFER_HINT)) { |
|
|
|
|
write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; |
|
|
|
|
initiate_write = false; |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, write_type, |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, initiate_write, |
|
|
|
|
"op.send_initial_metadata"); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
@ -1452,8 +1500,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, |
|
|
|
|
} else if (s->id != 0) { |
|
|
|
|
/* TODO(ctiller): check if there's flow control for any outstanding
|
|
|
|
|
bytes before going writable */ |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, |
|
|
|
|
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, true, |
|
|
|
|
"op.send_trailing_metadata"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1578,7 +1625,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|
|
|
|
|
|
static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = tp; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; |
|
|
|
|
t->ping_state.is_delayed_ping_timer_set = false; |
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping"); |
|
|
|
|
} |
|
|
|
@ -1634,8 +1681,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, |
|
|
|
|
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *stream_op, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
grpc_transport_op *op = stream_op; |
|
|
|
|
grpc_chttp2_transport *t = op->handler_private.extra_arg; |
|
|
|
|
grpc_transport_op *op = (grpc_transport_op *)stream_op; |
|
|
|
|
grpc_chttp2_transport *t = |
|
|
|
|
(grpc_chttp2_transport *)op->handler_private.extra_arg; |
|
|
|
|
grpc_error *close_transport = op->disconnect_with_error; |
|
|
|
|
|
|
|
|
|
if (op->goaway_error) { |
|
|
|
@ -1847,7 +1895,8 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|
uint32_t id, grpc_error *error) { |
|
|
|
|
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id); |
|
|
|
|
grpc_chttp2_stream *s = |
|
|
|
|
(grpc_chttp2_stream *)grpc_chttp2_stream_map_delete(&t->stream_map, id); |
|
|
|
|
GPR_ASSERT(s); |
|
|
|
|
if (t->incoming_stream == s) { |
|
|
|
|
t->incoming_stream = NULL; |
|
|
|
@ -1979,6 +2028,21 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, |
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|
grpc_chttp2_stream *s, grpc_chttp2_write_cb **list, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
while (*list) { |
|
|
|
|
grpc_chttp2_write_cb *cb = *list; |
|
|
|
|
*list = cb->next; |
|
|
|
|
grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, |
|
|
|
|
GRPC_ERROR_REF(error), |
|
|
|
|
"on_write_finished_cb"); |
|
|
|
|
cb->next = t->write_cb_pool; |
|
|
|
|
t->write_cb_pool = cb; |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_transport *t, |
|
|
|
|
grpc_chttp2_stream *s, grpc_error *error) { |
|
|
|
@ -1998,16 +2062,9 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), |
|
|
|
|
"fetching_send_message_finished"); |
|
|
|
|
while (s->on_write_finished_cbs) { |
|
|
|
|
grpc_chttp2_write_cb *cb = s->on_write_finished_cbs; |
|
|
|
|
s->on_write_finished_cbs = cb->next; |
|
|
|
|
grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, |
|
|
|
|
GRPC_ERROR_REF(error), |
|
|
|
|
"on_write_finished_cb"); |
|
|
|
|
cb->next = t->write_cb_pool; |
|
|
|
|
t->write_cb_pool = cb; |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, |
|
|
|
@ -2227,8 +2284,8 @@ typedef struct { |
|
|
|
|
} cancel_stream_cb_args; |
|
|
|
|
|
|
|
|
|
static void cancel_stream_cb(void *user_data, uint32_t key, void *stream) { |
|
|
|
|
cancel_stream_cb_args *args = user_data; |
|
|
|
|
grpc_chttp2_stream *s = stream; |
|
|
|
|
cancel_stream_cb_args *args = (cancel_stream_cb_args *)user_data; |
|
|
|
|
grpc_chttp2_stream *s = (grpc_chttp2_stream *)stream; |
|
|
|
|
grpc_chttp2_cancel_stream(args->exec_ctx, args->t, s, |
|
|
|
|
GRPC_ERROR_REF(args->error)); |
|
|
|
|
} |
|
|
|
@ -2252,13 +2309,11 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, |
|
|
|
|
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, |
|
|
|
|
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, true, |
|
|
|
|
"immediate stream flowctl"); |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, |
|
|
|
|
GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK, |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, false, |
|
|
|
|
"queue stream flowctl"); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
@ -2330,7 +2385,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
GPR_TIMER_BEGIN("reading_action_locked", 0); |
|
|
|
|
|
|
|
|
|
grpc_chttp2_transport *t = tp; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; |
|
|
|
|
|
|
|
|
|
GRPC_ERROR_REF(error); |
|
|
|
|
|
|
|
|
@ -2371,9 +2426,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
if (t->flow_control.initial_window_update > 0) { |
|
|
|
|
grpc_chttp2_stream *s; |
|
|
|
|
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { |
|
|
|
|
grpc_chttp2_become_writable( |
|
|
|
|
exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, |
|
|
|
|
"unstalled"); |
|
|
|
|
grpc_chttp2_become_writable(exec_ctx, t, s, true, "unstalled"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
t->flow_control.initial_window_update = 0; |
|
|
|
@ -2415,7 +2468,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
|
|
|
|
|
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = tp; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_http_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string); |
|
|
|
|
} |
|
|
|
@ -2428,7 +2481,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
|
|
|
|
|
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = tp; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_http_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); |
|
|
|
|
} |
|
|
|
@ -2477,7 +2530,7 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, |
|
|
|
|
|
|
|
|
|
static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = arg; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; |
|
|
|
|
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); |
|
|
|
|
if (t->destroying || t->closed) { |
|
|
|
|
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; |
|
|
|
@ -2507,7 +2560,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
|
|
|
|
|
static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = arg; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; |
|
|
|
|
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); |
|
|
|
|
grpc_timer_init(exec_ctx, &t->keepalive_watchdog_timer, |
|
|
|
|
grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, |
|
|
|
@ -2516,7 +2569,7 @@ static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
|
|
|
|
|
static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = arg; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; |
|
|
|
|
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { |
|
|
|
|
if (error == GRPC_ERROR_NONE) { |
|
|
|
|
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; |
|
|
|
@ -2532,7 +2585,7 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
|
|
|
|
|
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = arg; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; |
|
|
|
|
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { |
|
|
|
|
if (error == GRPC_ERROR_NONE) { |
|
|
|
|
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; |
|
|
|
@ -2613,7 +2666,8 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, |
|
|
|
|
static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *argp, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
grpc_chttp2_incoming_byte_stream *bs = argp; |
|
|
|
|
grpc_chttp2_incoming_byte_stream *bs = |
|
|
|
|
(grpc_chttp2_incoming_byte_stream *)argp; |
|
|
|
|
grpc_chttp2_transport *t = bs->transport; |
|
|
|
|
grpc_chttp2_stream *s = bs->stream; |
|
|
|
|
|
|
|
|
@ -2823,7 +2877,8 @@ static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { |
|
|
|
|
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *byte_stream, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
grpc_chttp2_incoming_byte_stream *bs = byte_stream; |
|
|
|
|
grpc_chttp2_incoming_byte_stream *bs = |
|
|
|
|
(grpc_chttp2_incoming_byte_stream *)byte_stream; |
|
|
|
|
grpc_chttp2_stream *s = bs->stream; |
|
|
|
|
grpc_chttp2_transport *t = s->t; |
|
|
|
|
|
|
|
|
@ -2879,7 +2934,7 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = arg; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; |
|
|
|
|
if (error == GRPC_ERROR_NONE && |
|
|
|
|
grpc_chttp2_stream_map_size(&t->stream_map) == 0) { |
|
|
|
|
/* Channel with no active streams: send a goaway to try and make it
|
|
|
|
@ -2909,11 +2964,12 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
|
|
|
|
|
static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_chttp2_transport *t = arg; |
|
|
|
|
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; |
|
|
|
|
size_t n = grpc_chttp2_stream_map_size(&t->stream_map); |
|
|
|
|
t->destructive_reclaimer_registered = false; |
|
|
|
|
if (error == GRPC_ERROR_NONE && n > 0) { |
|
|
|
|
grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map); |
|
|
|
|
grpc_chttp2_stream *s = |
|
|
|
|
(grpc_chttp2_stream *)grpc_chttp2_stream_map_rand(&t->stream_map); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string, |
|
|
|
|
s->id); |
|
|
|
@ -2957,10 +3013,13 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), |
|
|
|
|
destroy_transport, |
|
|
|
|
chttp2_get_endpoint}; |
|
|
|
|
|
|
|
|
|
static const grpc_transport_vtable *get_vtable(void) { return &vtable; } |
|
|
|
|
|
|
|
|
|
grpc_transport *grpc_create_chttp2_transport( |
|
|
|
|
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, |
|
|
|
|
grpc_endpoint *ep, int is_client) { |
|
|
|
|
grpc_chttp2_transport *t = gpr_zalloc(sizeof(grpc_chttp2_transport)); |
|
|
|
|
grpc_chttp2_transport *t = |
|
|
|
|
(grpc_chttp2_transport *)gpr_zalloc(sizeof(grpc_chttp2_transport)); |
|
|
|
|
init_transport(exec_ctx, t, channel_args, ep, is_client != 0); |
|
|
|
|
return &t->base; |
|
|
|
|
} |
|
|
|
|