|
|
@ -48,6 +48,7 @@ |
|
|
|
#include "src/core/ext/transport/chttp2/transport/status_conversion.h" |
|
|
|
#include "src/core/ext/transport/chttp2/transport/status_conversion.h" |
|
|
|
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" |
|
|
|
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" |
|
|
|
#include "src/core/lib/http/parser.h" |
|
|
|
#include "src/core/lib/http/parser.h" |
|
|
|
|
|
|
|
#include "src/core/lib/iomgr/workqueue.h" |
|
|
|
#include "src/core/lib/profiling/timers.h" |
|
|
|
#include "src/core/lib/profiling/timers.h" |
|
|
|
#include "src/core/lib/support/string.h" |
|
|
|
#include "src/core/lib/support/string.h" |
|
|
|
#include "src/core/lib/transport/static_metadata.h" |
|
|
|
#include "src/core/lib/transport/static_metadata.h" |
|
|
@ -60,9 +61,9 @@ |
|
|
|
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) |
|
|
|
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) |
|
|
|
|
|
|
|
|
|
|
|
#define MAX_CLIENT_STREAM_ID 0x7fffffffu |
|
|
|
#define MAX_CLIENT_STREAM_ID 0x7fffffffu |
|
|
|
|
|
|
|
|
|
|
|
int grpc_http_trace = 0; |
|
|
|
int grpc_http_trace = 0; |
|
|
|
int grpc_flowctl_trace = 0; |
|
|
|
int grpc_flowctl_trace = 0; |
|
|
|
|
|
|
|
int grpc_http_write_state_trace = 0; |
|
|
|
|
|
|
|
|
|
|
|
#define TRANSPORT_FROM_WRITING(tw) \ |
|
|
|
#define TRANSPORT_FROM_WRITING(tw) \ |
|
|
|
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
|
|
|
|
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
|
|
|
@ -88,10 +89,16 @@ static const grpc_transport_vtable vtable; |
|
|
|
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); |
|
|
|
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); |
|
|
|
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); |
|
|
|
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); |
|
|
|
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); |
|
|
|
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); |
|
|
|
|
|
|
|
static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, |
|
|
|
|
|
|
|
grpc_error *error); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); |
|
|
|
|
|
|
|
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
grpc_chttp2_transport *t, grpc_error *error); |
|
|
|
|
|
|
|
|
|
|
|
/** Set a transport level setting, and push it to our peer */ |
|
|
|
/** Set a transport level setting, and push it to our peer */ |
|
|
|
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, |
|
|
|
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
uint32_t value); |
|
|
|
grpc_chttp2_setting_id id, uint32_t value); |
|
|
|
|
|
|
|
|
|
|
|
/** Start disconnection chain */ |
|
|
|
/** Start disconnection chain */ |
|
|
|
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
@ -137,7 +144,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_chttp2_transport_global *transport_global); |
|
|
|
grpc_chttp2_transport_global *transport_global); |
|
|
|
|
|
|
|
|
|
|
|
static void incoming_byte_stream_update_flow_control( |
|
|
|
static void incoming_byte_stream_update_flow_control( |
|
|
|
grpc_chttp2_transport_global *transport_global, |
|
|
|
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, |
|
|
|
grpc_chttp2_stream_global *stream_global, size_t max_size_hint, |
|
|
|
grpc_chttp2_stream_global *stream_global, size_t max_size_hint, |
|
|
|
size_t have_already); |
|
|
|
size_t have_already); |
|
|
|
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, |
|
|
@ -201,6 +208,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, |
|
|
|
gpr_free(t); |
|
|
|
gpr_free(t); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*#define REFCOUNTING_DEBUG 1*/ |
|
|
|
#ifdef REFCOUNTING_DEBUG |
|
|
|
#ifdef REFCOUNTING_DEBUG |
|
|
|
#define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) |
|
|
|
#define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) |
|
|
|
#define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__) |
|
|
|
#define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__) |
|
|
@ -231,7 +239,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } |
|
|
|
|
|
|
|
|
|
|
|
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
const grpc_channel_args *channel_args, |
|
|
|
const grpc_channel_args *channel_args, |
|
|
|
grpc_endpoint *ep, uint8_t is_client) { |
|
|
|
grpc_endpoint *ep, bool is_client) { |
|
|
|
size_t i; |
|
|
|
size_t i; |
|
|
|
int j; |
|
|
|
int j; |
|
|
|
|
|
|
|
|
|
|
@ -273,6 +281,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
grpc_closure_init(&t->writing_action, writing_action, t); |
|
|
|
grpc_closure_init(&t->writing_action, writing_action, t); |
|
|
|
grpc_closure_init(&t->reading_action, reading_action, t); |
|
|
|
grpc_closure_init(&t->reading_action, reading_action, t); |
|
|
|
grpc_closure_init(&t->parsing_action, parsing_action, t); |
|
|
|
grpc_closure_init(&t->parsing_action, parsing_action, t); |
|
|
|
|
|
|
|
grpc_closure_init(&t->initiate_writing, initiate_writing, t); |
|
|
|
|
|
|
|
|
|
|
|
gpr_slice_buffer_init(&t->parsing.qbuf); |
|
|
|
gpr_slice_buffer_init(&t->parsing.qbuf); |
|
|
|
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); |
|
|
|
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); |
|
|
@ -286,6 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
gpr_slice_buffer_add( |
|
|
|
gpr_slice_buffer_add( |
|
|
|
&t->global.qbuf, |
|
|
|
&t->global.qbuf, |
|
|
|
gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); |
|
|
|
gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); |
|
|
|
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); |
|
|
|
} |
|
|
|
} |
|
|
|
/* 8 is a random stab in the dark as to a good initial size: it's small enough
|
|
|
|
/* 8 is a random stab in the dark as to a good initial size: it's small enough
|
|
|
|
that it shouldn't waste memory for infrequently used connections, yet |
|
|
|
that it shouldn't waste memory for infrequently used connections, yet |
|
|
@ -311,11 +321,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|
|
|
|
|
|
|
|
/* configure http2 the way we like it */ |
|
|
|
/* configure http2 the way we like it */ |
|
|
|
if (is_client) { |
|
|
|
if (is_client) { |
|
|
|
push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); |
|
|
|
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); |
|
|
|
push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); |
|
|
|
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); |
|
|
|
} |
|
|
|
} |
|
|
|
push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW); |
|
|
|
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, |
|
|
|
push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, |
|
|
|
DEFAULT_WINDOW); |
|
|
|
|
|
|
|
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, |
|
|
|
DEFAULT_MAX_HEADER_LIST_SIZE); |
|
|
|
DEFAULT_MAX_HEADER_LIST_SIZE); |
|
|
|
|
|
|
|
|
|
|
|
if (channel_args) { |
|
|
|
if (channel_args) { |
|
|
@ -329,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
gpr_log(GPR_ERROR, "%s: must be an integer", |
|
|
|
gpr_log(GPR_ERROR, "%s: must be an integer", |
|
|
|
GRPC_ARG_MAX_CONCURRENT_STREAMS); |
|
|
|
GRPC_ARG_MAX_CONCURRENT_STREAMS); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, |
|
|
|
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, |
|
|
|
(uint32_t)channel_args->args[i].value.integer); |
|
|
|
(uint32_t)channel_args->args[i].value.integer); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (0 == strcmp(channel_args->args[i].key, |
|
|
|
} else if (0 == strcmp(channel_args->args[i].key, |
|
|
@ -368,7 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
gpr_log(GPR_ERROR, "%s: must be non-negative", |
|
|
|
gpr_log(GPR_ERROR, "%s: must be non-negative", |
|
|
|
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); |
|
|
|
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, |
|
|
|
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, |
|
|
|
(uint32_t)channel_args->args[i].value.integer); |
|
|
|
(uint32_t)channel_args->args[i].value.integer); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (0 == strcmp(channel_args->args[i].key, |
|
|
|
} else if (0 == strcmp(channel_args->args[i].key, |
|
|
@ -393,7 +404,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
gpr_log(GPR_ERROR, "%s: must be non-negative", |
|
|
|
gpr_log(GPR_ERROR, "%s: must be non-negative", |
|
|
|
GRPC_ARG_MAX_METADATA_SIZE); |
|
|
|
GRPC_ARG_MAX_METADATA_SIZE); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, |
|
|
|
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, |
|
|
|
(uint32_t)channel_args->args[i].value.integer); |
|
|
|
(uint32_t)channel_args->args[i].value.integer); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -444,6 +455,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_chttp2_transport *t, |
|
|
|
grpc_chttp2_transport *t, |
|
|
|
grpc_error *error) { |
|
|
|
grpc_error *error) { |
|
|
|
if (!t->closed) { |
|
|
|
if (!t->closed) { |
|
|
|
|
|
|
|
if (grpc_http_write_state_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "W:%p close transport", t); |
|
|
|
|
|
|
|
} |
|
|
|
t->closed = 1; |
|
|
|
t->closed = 1; |
|
|
|
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, |
|
|
|
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, |
|
|
|
GRPC_ERROR_REF(error), "close_transport"); |
|
|
|
GRPC_ERROR_REF(error), "close_transport"); |
|
|
@ -589,7 +603,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_chttp2_incoming_metadata_buffer_destroy( |
|
|
|
grpc_chttp2_incoming_metadata_buffer_destroy( |
|
|
|
&s->global.received_trailing_metadata); |
|
|
|
&s->global.received_trailing_metadata); |
|
|
|
gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); |
|
|
|
gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); |
|
|
|
GRPC_ERROR_UNREF(s->global.removal_error); |
|
|
|
GRPC_ERROR_UNREF(s->global.read_closed_error); |
|
|
|
|
|
|
|
GRPC_ERROR_UNREF(s->global.write_closed_error); |
|
|
|
|
|
|
|
|
|
|
|
UNREF_TRANSPORT(exec_ctx, t, "stream"); |
|
|
|
UNREF_TRANSPORT(exec_ctx, t, "stream"); |
|
|
|
|
|
|
|
|
|
|
@ -633,6 +648,36 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( |
|
|
|
* LOCK MANAGEMENT |
|
|
|
* LOCK MANAGEMENT |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static const char *write_state_name(grpc_chttp2_write_state state) { |
|
|
|
|
|
|
|
switch (state) { |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_INACTIVE: |
|
|
|
|
|
|
|
return "INACTIVE"; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: |
|
|
|
|
|
|
|
return "REQUESTED[p=0]"; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: |
|
|
|
|
|
|
|
return "REQUESTED[p=1]"; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_SCHEDULED: |
|
|
|
|
|
|
|
return "SCHEDULED"; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING: |
|
|
|
|
|
|
|
return "WRITING"; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: |
|
|
|
|
|
|
|
return "WRITING[p=1]"; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: |
|
|
|
|
|
|
|
return "WRITING[p=0]"; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
GPR_UNREACHABLE_CODE(return "UNKNOWN"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void set_write_state(grpc_chttp2_transport *t, |
|
|
|
|
|
|
|
grpc_chttp2_write_state state, const char *reason) { |
|
|
|
|
|
|
|
if (grpc_http_write_state_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "W:%p %s -> %s because %s", t, |
|
|
|
|
|
|
|
write_state_name(t->executor.write_state), write_state_name(state), |
|
|
|
|
|
|
|
reason); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
t->executor.write_state = state; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void finish_global_actions(grpc_exec_ctx *exec_ctx, |
|
|
|
static void finish_global_actions(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_chttp2_transport *t) { |
|
|
|
grpc_chttp2_transport *t) { |
|
|
|
grpc_chttp2_executor_action_header *hdr; |
|
|
|
grpc_chttp2_executor_action_header *hdr; |
|
|
@ -641,13 +686,6 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, |
|
|
|
GPR_TIMER_BEGIN("finish_global_actions", 0); |
|
|
|
GPR_TIMER_BEGIN("finish_global_actions", 0); |
|
|
|
|
|
|
|
|
|
|
|
for (;;) { |
|
|
|
for (;;) { |
|
|
|
if (!t->executor.writing_active && !t->closed && |
|
|
|
|
|
|
|
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { |
|
|
|
|
|
|
|
t->executor.writing_active = 1; |
|
|
|
|
|
|
|
REF_TRANSPORT(t, "writing"); |
|
|
|
|
|
|
|
prevent_endpoint_shutdown(t); |
|
|
|
|
|
|
|
grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
check_read_ops(exec_ctx, &t->global); |
|
|
|
check_read_ops(exec_ctx, &t->global); |
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&t->executor.mu); |
|
|
|
gpr_mu_lock(&t->executor.mu); |
|
|
@ -668,8 +706,27 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
t->executor.global_active = false; |
|
|
|
t->executor.global_active = false; |
|
|
|
|
|
|
|
switch (t->executor.write_state) { |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: |
|
|
|
|
|
|
|
set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking"); |
|
|
|
|
|
|
|
REF_TRANSPORT(t, "initiate_writing"); |
|
|
|
|
|
|
|
gpr_mu_unlock(&t->executor.mu); |
|
|
|
|
|
|
|
grpc_exec_ctx_sched(exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE, |
|
|
|
|
|
|
|
grpc_endpoint_get_workqueue(t->ep)); |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: |
|
|
|
|
|
|
|
start_writing(exec_ctx, t); |
|
|
|
|
|
|
|
gpr_mu_unlock(&t->executor.mu); |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_INACTIVE: |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING: |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_SCHEDULED: |
|
|
|
|
|
|
|
gpr_mu_unlock(&t->executor.mu); |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
gpr_mu_unlock(&t->executor.mu); |
|
|
|
|
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -740,16 +797,99 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, |
|
|
|
* OUTPUT PROCESSING |
|
|
|
* OUTPUT PROCESSING |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
|
|
void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, |
|
|
|
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_chttp2_stream_global *stream_global) { |
|
|
|
grpc_chttp2_transport_global *transport_global, |
|
|
|
|
|
|
|
bool covered_by_poller, const char *reason) { |
|
|
|
|
|
|
|
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); |
|
|
|
|
|
|
|
switch (t->executor.write_state) { |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_INACTIVE: |
|
|
|
|
|
|
|
set_write_state(t, covered_by_poller |
|
|
|
|
|
|
|
? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER |
|
|
|
|
|
|
|
: GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, |
|
|
|
|
|
|
|
reason); |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: |
|
|
|
|
|
|
|
/* nothing to do: write already requested */ |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: |
|
|
|
|
|
|
|
if (covered_by_poller) { |
|
|
|
|
|
|
|
/* upgrade to note poller is available to cover the write */ |
|
|
|
|
|
|
|
set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_SCHEDULED: |
|
|
|
|
|
|
|
/* nothing to do: write already scheduled */ |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING: |
|
|
|
|
|
|
|
set_write_state(t, |
|
|
|
|
|
|
|
covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER |
|
|
|
|
|
|
|
: GRPC_CHTTP2_WRITING_STALE_NO_POLLER, |
|
|
|
|
|
|
|
reason); |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: |
|
|
|
|
|
|
|
/* nothing to do: write already requested */ |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: |
|
|
|
|
|
|
|
if (covered_by_poller) { |
|
|
|
|
|
|
|
/* upgrade to note poller is available to cover the write */ |
|
|
|
|
|
|
|
set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, reason); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { |
|
|
|
|
|
|
|
GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED || |
|
|
|
|
|
|
|
t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER); |
|
|
|
|
|
|
|
if (!t->closed && |
|
|
|
|
|
|
|
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { |
|
|
|
|
|
|
|
set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing"); |
|
|
|
|
|
|
|
REF_TRANSPORT(t, "writing"); |
|
|
|
|
|
|
|
prevent_endpoint_shutdown(t); |
|
|
|
|
|
|
|
grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
if (t->closed) { |
|
|
|
|
|
|
|
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, |
|
|
|
|
|
|
|
"start_writing:transport_closed"); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, |
|
|
|
|
|
|
|
"start_writing:nothing_to_write"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write")); |
|
|
|
|
|
|
|
if (t->ep && !t->endpoint_reading) { |
|
|
|
|
|
|
|
destroy_endpoint(exec_ctx, t); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
grpc_chttp2_transport *t, |
|
|
|
|
|
|
|
grpc_chttp2_stream *s_unused, |
|
|
|
|
|
|
|
void *arg_ignored) { |
|
|
|
|
|
|
|
start_writing(exec_ctx, t); |
|
|
|
|
|
|
|
UNREF_TRANSPORT(exec_ctx, t, "initiate_writing"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
|
|
|
grpc_error *error) { |
|
|
|
|
|
|
|
grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked, |
|
|
|
|
|
|
|
NULL, 0); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
grpc_chttp2_transport_global *transport_global, |
|
|
|
|
|
|
|
grpc_chttp2_stream_global *stream_global, |
|
|
|
|
|
|
|
bool covered_by_poller, const char *reason) { |
|
|
|
if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && |
|
|
|
if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && |
|
|
|
grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { |
|
|
|
grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { |
|
|
|
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); |
|
|
|
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); |
|
|
|
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller, |
|
|
|
|
|
|
|
reason); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, |
|
|
|
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
uint32_t value) { |
|
|
|
grpc_chttp2_setting_id id, uint32_t value) { |
|
|
|
const grpc_chttp2_setting_parameters *sp = |
|
|
|
const grpc_chttp2_setting_parameters *sp = |
|
|
|
&grpc_chttp2_settings_parameters[id]; |
|
|
|
&grpc_chttp2_settings_parameters[id]; |
|
|
|
uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value); |
|
|
|
uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value); |
|
|
@ -760,9 +900,22 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, |
|
|
|
if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) { |
|
|
|
if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) { |
|
|
|
t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value; |
|
|
|
t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value; |
|
|
|
t->global.dirtied_local_settings = 1; |
|
|
|
t->global.dirtied_local_settings = 1; |
|
|
|
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
grpc_chttp2_transport *t, grpc_error *error) { |
|
|
|
|
|
|
|
grpc_chttp2_stream_global *stream_global; |
|
|
|
|
|
|
|
while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, |
|
|
|
|
|
|
|
&stream_global)) { |
|
|
|
|
|
|
|
fail_pending_writes(exec_ctx, &t->global, stream_global, |
|
|
|
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, |
|
|
|
static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_chttp2_transport *t, |
|
|
|
grpc_chttp2_transport *t, |
|
|
|
grpc_chttp2_stream *s_ignored, |
|
|
|
grpc_chttp2_stream *s_ignored, |
|
|
@ -777,24 +930,32 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
|
|
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); |
|
|
|
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); |
|
|
|
|
|
|
|
|
|
|
|
grpc_chttp2_stream_global *stream_global; |
|
|
|
end_waiting_for_write(exec_ctx, t, error); |
|
|
|
while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, |
|
|
|
|
|
|
|
&stream_global)) { |
|
|
|
switch (t->executor.write_state) { |
|
|
|
fail_pending_writes(exec_ctx, &t->global, stream_global, |
|
|
|
case GRPC_CHTTP2_WRITING_INACTIVE: |
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: |
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); |
|
|
|
case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITE_SCHEDULED: |
|
|
|
|
|
|
|
GPR_UNREACHABLE_CODE(break); |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING: |
|
|
|
|
|
|
|
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing"); |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: |
|
|
|
|
|
|
|
set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, |
|
|
|
|
|
|
|
"terminate_writing"); |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: |
|
|
|
|
|
|
|
set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, |
|
|
|
|
|
|
|
"terminate_writing"); |
|
|
|
|
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* leave the writing flag up on shutdown to prevent further writes in
|
|
|
|
|
|
|
|
unlock() |
|
|
|
|
|
|
|
from starting */ |
|
|
|
|
|
|
|
t->executor.writing_active = 0; |
|
|
|
|
|
|
|
if (t->ep && !t->endpoint_reading) { |
|
|
|
if (t->ep && !t->endpoint_reading) { |
|
|
|
destroy_endpoint(exec_ctx, t); |
|
|
|
destroy_endpoint(exec_ctx, t); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
UNREF_TRANSPORT(exec_ctx, t, "writing"); |
|
|
|
UNREF_TRANSPORT(exec_ctx, t, "writing"); |
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, |
|
|
|
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, |
|
|
@ -877,7 +1038,8 @@ static void maybe_start_some_streams( |
|
|
|
stream_global->id, STREAM_FROM_GLOBAL(stream_global)); |
|
|
|
stream_global->id, STREAM_FROM_GLOBAL(stream_global)); |
|
|
|
stream_global->in_stream_map = true; |
|
|
|
stream_global->in_stream_map = true; |
|
|
|
transport_global->concurrent_stream_count++; |
|
|
|
transport_global->concurrent_stream_count++; |
|
|
|
grpc_chttp2_become_writable(transport_global, stream_global); |
|
|
|
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true, |
|
|
|
|
|
|
|
"new_stream"); |
|
|
|
} |
|
|
|
} |
|
|
|
/* cancel out streams that will never be started */ |
|
|
|
/* cancel out streams that will never be started */ |
|
|
|
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && |
|
|
|
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && |
|
|
@ -1012,9 +1174,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
maybe_start_some_streams(exec_ctx, transport_global); |
|
|
|
maybe_start_some_streams(exec_ctx, transport_global); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
GPR_ASSERT(stream_global->id != 0); |
|
|
|
GPR_ASSERT(stream_global->id != 0); |
|
|
|
grpc_chttp2_become_writable(transport_global, stream_global); |
|
|
|
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, |
|
|
|
|
|
|
|
true, "op.send_initial_metadata"); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
|
|
|
|
stream_global->send_trailing_metadata = NULL; |
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
exec_ctx, transport_global, stream_global, |
|
|
|
exec_ctx, transport_global, stream_global, |
|
|
|
&stream_global->send_initial_metadata_finished, |
|
|
|
&stream_global->send_initial_metadata_finished, |
|
|
@ -1036,7 +1200,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
stream_global->send_message = op->send_message; |
|
|
|
stream_global->send_message = op->send_message; |
|
|
|
if (stream_global->id != 0) { |
|
|
|
if (stream_global->id != 0) { |
|
|
|
grpc_chttp2_become_writable(transport_global, stream_global); |
|
|
|
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, |
|
|
|
|
|
|
|
true, "op.send_message"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -1069,6 +1234,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); |
|
|
|
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); |
|
|
|
} |
|
|
|
} |
|
|
|
if (stream_global->write_closed) { |
|
|
|
if (stream_global->write_closed) { |
|
|
|
|
|
|
|
stream_global->send_trailing_metadata = NULL; |
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
exec_ctx, transport_global, stream_global, |
|
|
|
exec_ctx, transport_global, stream_global, |
|
|
|
&stream_global->send_trailing_metadata_finished, |
|
|
|
&stream_global->send_trailing_metadata_finished, |
|
|
@ -1079,7 +1245,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
} else if (stream_global->id != 0) { |
|
|
|
} else if (stream_global->id != 0) { |
|
|
|
/* TODO(ctiller): check if there's flow control for any outstanding
|
|
|
|
/* TODO(ctiller): check if there's flow control for any outstanding
|
|
|
|
bytes before going writable */ |
|
|
|
bytes before going writable */ |
|
|
|
grpc_chttp2_become_writable(transport_global, stream_global); |
|
|
|
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, |
|
|
|
|
|
|
|
true, "op.send_trailing_metadata"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -1100,8 +1267,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
(stream_global->incoming_frames.head == NULL || |
|
|
|
(stream_global->incoming_frames.head == NULL || |
|
|
|
stream_global->incoming_frames.head->is_tail)) { |
|
|
|
stream_global->incoming_frames.head->is_tail)) { |
|
|
|
incoming_byte_stream_update_flow_control( |
|
|
|
incoming_byte_stream_update_flow_control( |
|
|
|
transport_global, stream_global, transport_global->stream_lookahead, |
|
|
|
exec_ctx, transport_global, stream_global, |
|
|
|
0); |
|
|
|
transport_global->stream_lookahead, 0); |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); |
|
|
|
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); |
|
|
|
} |
|
|
|
} |
|
|
@ -1129,7 +1296,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
|
|
|
sizeof(*op)); |
|
|
|
sizeof(*op)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { |
|
|
|
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|
|
|
|
grpc_closure *on_recv) { |
|
|
|
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); |
|
|
|
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); |
|
|
|
p->next = &t->global.pings; |
|
|
|
p->next = &t->global.pings; |
|
|
|
p->prev = p->next->prev; |
|
|
|
p->prev = p->next->prev; |
|
|
@ -1144,6 +1312,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { |
|
|
|
p->id[7] = (uint8_t)(t->global.ping_counter & 0xff); |
|
|
|
p->id[7] = (uint8_t)(t->global.ping_counter & 0xff); |
|
|
|
p->on_recv = on_recv; |
|
|
|
p->on_recv = on_recv; |
|
|
|
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); |
|
|
|
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); |
|
|
|
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
@ -1203,6 +1372,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
close_transport = grpc_chttp2_has_streams(t) |
|
|
|
close_transport = grpc_chttp2_has_streams(t) |
|
|
|
? GRPC_ERROR_NONE |
|
|
|
? GRPC_ERROR_NONE |
|
|
|
: GRPC_ERROR_CREATE("GOAWAY sent"); |
|
|
|
: GRPC_ERROR_CREATE("GOAWAY sent"); |
|
|
|
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (op->set_accept_stream) { |
|
|
|
if (op->set_accept_stream) { |
|
|
@ -1220,7 +1390,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (op->send_ping) { |
|
|
|
if (op->send_ping) { |
|
|
|
send_ping_locked(t, op->send_ping); |
|
|
|
send_ping_locked(exec_ctx, t, op->send_ping); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (close_transport != GRPC_ERROR_NONE) { |
|
|
|
if (close_transport != GRPC_ERROR_NONE) { |
|
|
@ -1407,6 +1577,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, |
|
|
|
&transport_global->qbuf, |
|
|
|
&transport_global->qbuf, |
|
|
|
grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error, |
|
|
|
grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error, |
|
|
|
&stream_global->stats.outgoing)); |
|
|
|
&stream_global->stats.outgoing)); |
|
|
|
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, transport_global, false, |
|
|
|
|
|
|
|
"rst_stream"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const char *msg = |
|
|
|
const char *msg = |
|
|
@ -1466,10 +1638,38 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) { |
|
|
|
|
|
|
|
if (error == GRPC_ERROR_NONE) return; |
|
|
|
|
|
|
|
for (size_t i = 0; i < *nrefs; i++) { |
|
|
|
|
|
|
|
if (error == refs[i]) { |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
refs[*nrefs] = error; |
|
|
|
|
|
|
|
++*nrefs; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static grpc_error *removal_error(grpc_error *extra_error, |
|
|
|
|
|
|
|
grpc_chttp2_stream_global *stream_global) { |
|
|
|
|
|
|
|
grpc_error *refs[3]; |
|
|
|
|
|
|
|
size_t nrefs = 0; |
|
|
|
|
|
|
|
add_error(stream_global->read_closed_error, refs, &nrefs); |
|
|
|
|
|
|
|
add_error(stream_global->write_closed_error, refs, &nrefs); |
|
|
|
|
|
|
|
add_error(extra_error, refs, &nrefs); |
|
|
|
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
|
|
|
if (nrefs > 0) { |
|
|
|
|
|
|
|
error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs, |
|
|
|
|
|
|
|
nrefs); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
GRPC_ERROR_UNREF(extra_error); |
|
|
|
|
|
|
|
return error; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void fail_pending_writes(grpc_exec_ctx *exec_ctx, |
|
|
|
static void fail_pending_writes(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_chttp2_transport_global *transport_global, |
|
|
|
grpc_chttp2_transport_global *transport_global, |
|
|
|
grpc_chttp2_stream_global *stream_global, |
|
|
|
grpc_chttp2_stream_global *stream_global, |
|
|
|
grpc_error *error) { |
|
|
|
grpc_error *error) { |
|
|
|
|
|
|
|
error = removal_error(error, stream_global); |
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
exec_ctx, transport_global, stream_global, |
|
|
|
exec_ctx, transport_global, stream_global, |
|
|
|
&stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error)); |
|
|
|
&stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error)); |
|
|
@ -1492,14 +1692,17 @@ void grpc_chttp2_mark_stream_closed( |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); |
|
|
|
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); |
|
|
|
if (close_reads && !stream_global->read_closed) { |
|
|
|
if (close_reads && !stream_global->read_closed) { |
|
|
|
|
|
|
|
stream_global->read_closed_error = GRPC_ERROR_REF(error); |
|
|
|
stream_global->read_closed = true; |
|
|
|
stream_global->read_closed = true; |
|
|
|
stream_global->published_initial_metadata = true; |
|
|
|
stream_global->published_initial_metadata = true; |
|
|
|
stream_global->published_trailing_metadata = true; |
|
|
|
stream_global->published_trailing_metadata = true; |
|
|
|
decrement_active_streams_locked(exec_ctx, transport_global, stream_global); |
|
|
|
decrement_active_streams_locked(exec_ctx, transport_global, stream_global); |
|
|
|
} |
|
|
|
} |
|
|
|
if (close_writes && !stream_global->write_closed) { |
|
|
|
if (close_writes && !stream_global->write_closed) { |
|
|
|
|
|
|
|
stream_global->write_closed_error = GRPC_ERROR_REF(error); |
|
|
|
stream_global->write_closed = true; |
|
|
|
stream_global->write_closed = true; |
|
|
|
if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) { |
|
|
|
if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.write_state != |
|
|
|
|
|
|
|
GRPC_CHTTP2_WRITING_INACTIVE) { |
|
|
|
GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); |
|
|
|
GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); |
|
|
|
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, |
|
|
|
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, |
|
|
|
stream_global); |
|
|
|
stream_global); |
|
|
@ -1509,7 +1712,6 @@ void grpc_chttp2_mark_stream_closed( |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (stream_global->read_closed && stream_global->write_closed) { |
|
|
|
if (stream_global->read_closed && stream_global->write_closed) { |
|
|
|
stream_global->removal_error = GRPC_ERROR_REF(error); |
|
|
|
|
|
|
|
if (stream_global->id != 0 && |
|
|
|
if (stream_global->id != 0 && |
|
|
|
TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { |
|
|
|
TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { |
|
|
|
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, |
|
|
|
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, |
|
|
@ -1517,7 +1719,8 @@ void grpc_chttp2_mark_stream_closed( |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
if (stream_global->id != 0) { |
|
|
|
if (stream_global->id != 0) { |
|
|
|
remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), |
|
|
|
remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), |
|
|
|
stream_global->id, GRPC_ERROR_REF(error)); |
|
|
|
stream_global->id, |
|
|
|
|
|
|
|
removal_error(GRPC_ERROR_REF(error), stream_global)); |
|
|
|
} |
|
|
|
} |
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); |
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); |
|
|
|
} |
|
|
|
} |
|
|
@ -1641,6 +1844,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
|
|
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, |
|
|
|
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, |
|
|
|
1, error); |
|
|
|
1, error); |
|
|
|
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, transport_global, false, |
|
|
|
|
|
|
|
"close_from_api"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
typedef struct { |
|
|
@ -1670,8 +1875,14 @@ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** update window from a settings change */ |
|
|
|
/** update window from a settings change */ |
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
|
|
|
grpc_chttp2_transport *t; |
|
|
|
|
|
|
|
grpc_exec_ctx *exec_ctx; |
|
|
|
|
|
|
|
} update_global_window_args; |
|
|
|
|
|
|
|
|
|
|
|
static void update_global_window(void *args, uint32_t id, void *stream) { |
|
|
|
static void update_global_window(void *args, uint32_t id, void *stream) { |
|
|
|
grpc_chttp2_transport *t = args; |
|
|
|
update_global_window_args *a = args; |
|
|
|
|
|
|
|
grpc_chttp2_transport *t = a->t; |
|
|
|
grpc_chttp2_stream *s = stream; |
|
|
|
grpc_chttp2_stream *s = stream; |
|
|
|
grpc_chttp2_transport_global *transport_global = &t->global; |
|
|
|
grpc_chttp2_transport_global *transport_global = &t->global; |
|
|
|
grpc_chttp2_stream_global *stream_global = &s->global; |
|
|
|
grpc_chttp2_stream_global *stream_global = &s->global; |
|
|
@ -1685,7 +1896,8 @@ static void update_global_window(void *args, uint32_t id, void *stream) { |
|
|
|
is_zero = stream_global->outgoing_window <= 0; |
|
|
|
is_zero = stream_global->outgoing_window <= 0; |
|
|
|
|
|
|
|
|
|
|
|
if (was_zero && !is_zero) { |
|
|
|
if (was_zero && !is_zero) { |
|
|
|
grpc_chttp2_become_writable(transport_global, stream_global); |
|
|
|
grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global, |
|
|
|
|
|
|
|
true, "update_global_window"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -1794,14 +2006,19 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
grpc_chttp2_transport_global *transport_global = &t->global; |
|
|
|
grpc_chttp2_transport_global *transport_global = &t->global; |
|
|
|
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; |
|
|
|
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; |
|
|
|
/* copy parsing qbuf to global qbuf */ |
|
|
|
/* copy parsing qbuf to global qbuf */ |
|
|
|
gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); |
|
|
|
if (t->parsing.qbuf.count > 0) { |
|
|
|
|
|
|
|
gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); |
|
|
|
|
|
|
|
grpc_chttp2_initiate_write(exec_ctx, transport_global, false, |
|
|
|
|
|
|
|
"parsing_qbuf"); |
|
|
|
|
|
|
|
} |
|
|
|
/* merge stream lists */ |
|
|
|
/* merge stream lists */ |
|
|
|
grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); |
|
|
|
grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); |
|
|
|
transport_global->concurrent_stream_count = |
|
|
|
transport_global->concurrent_stream_count = |
|
|
|
(uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map); |
|
|
|
(uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map); |
|
|
|
if (transport_parsing->initial_window_update != 0) { |
|
|
|
if (transport_parsing->initial_window_update != 0) { |
|
|
|
|
|
|
|
update_global_window_args args = {t, exec_ctx}; |
|
|
|
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, |
|
|
|
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, |
|
|
|
update_global_window, t); |
|
|
|
update_global_window, &args); |
|
|
|
transport_parsing->initial_window_update = 0; |
|
|
|
transport_parsing->initial_window_update = 0; |
|
|
|
} |
|
|
|
} |
|
|
|
/* handle higher level things */ |
|
|
|
/* handle higher level things */ |
|
|
@ -1824,7 +2041,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
GPR_ASSERT(stream_global->write_closed); |
|
|
|
GPR_ASSERT(stream_global->write_closed); |
|
|
|
GPR_ASSERT(stream_global->read_closed); |
|
|
|
GPR_ASSERT(stream_global->read_closed); |
|
|
|
remove_stream(exec_ctx, t, stream_global->id, |
|
|
|
remove_stream(exec_ctx, t, stream_global->id, |
|
|
|
GRPC_ERROR_REF(stream_global->removal_error)); |
|
|
|
removal_error(GRPC_ERROR_NONE, stream_global)); |
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); |
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -1847,11 +2064,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
} |
|
|
|
} |
|
|
|
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); |
|
|
|
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); |
|
|
|
t->endpoint_reading = 0; |
|
|
|
t->endpoint_reading = 0; |
|
|
|
if (!t->executor.writing_active && t->ep) { |
|
|
|
if (grpc_http_write_state_trace) { |
|
|
|
grpc_endpoint_destroy(exec_ctx, t->ep); |
|
|
|
gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t, |
|
|
|
t->ep = NULL; |
|
|
|
write_state_name(t->executor.write_state)); |
|
|
|
/* safe as we still have a ref for read */ |
|
|
|
} |
|
|
|
UNREF_TRANSPORT(exec_ctx, t, "disconnect"); |
|
|
|
if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) { |
|
|
|
|
|
|
|
destroy_endpoint(exec_ctx, t); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (!t->closed) { |
|
|
|
} else if (!t->closed) { |
|
|
|
keep_reading = true; |
|
|
|
keep_reading = true; |
|
|
@ -1935,7 +2153,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void incoming_byte_stream_update_flow_control( |
|
|
|
static void incoming_byte_stream_update_flow_control( |
|
|
|
grpc_chttp2_transport_global *transport_global, |
|
|
|
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, |
|
|
|
grpc_chttp2_stream_global *stream_global, size_t max_size_hint, |
|
|
|
grpc_chttp2_stream_global *stream_global, size_t max_size_hint, |
|
|
|
size_t have_already) { |
|
|
|
size_t have_already) { |
|
|
|
uint32_t max_recv_bytes; |
|
|
|
uint32_t max_recv_bytes; |
|
|
@ -1970,7 +2188,8 @@ static void incoming_byte_stream_update_flow_control( |
|
|
|
add_max_recv_bytes); |
|
|
|
add_max_recv_bytes); |
|
|
|
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, |
|
|
|
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, |
|
|
|
stream_global); |
|
|
|
stream_global); |
|
|
|
grpc_chttp2_become_writable(transport_global, stream_global); |
|
|
|
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, |
|
|
|
|
|
|
|
false, "read_incoming_stream"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -1992,8 +2211,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_chttp2_stream_global *stream_global = &bs->stream->global; |
|
|
|
grpc_chttp2_stream_global *stream_global = &bs->stream->global; |
|
|
|
|
|
|
|
|
|
|
|
if (bs->is_tail) { |
|
|
|
if (bs->is_tail) { |
|
|
|
incoming_byte_stream_update_flow_control( |
|
|
|
incoming_byte_stream_update_flow_control(exec_ctx, transport_global, |
|
|
|
transport_global, stream_global, arg->max_size_hint, bs->slices.length); |
|
|
|
stream_global, arg->max_size_hint, |
|
|
|
|
|
|
|
bs->slices.length); |
|
|
|
} |
|
|
|
} |
|
|
|
if (bs->slices.count > 0) { |
|
|
|
if (bs->slices.count > 0) { |
|
|
|
*arg->slice = gpr_slice_buffer_take_first(&bs->slices); |
|
|
|
*arg->slice = gpr_slice_buffer_take_first(&bs->slices); |
|
|
@ -2177,7 +2397,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, |
|
|
|
if (context == NULL) { |
|
|
|
if (context == NULL) { |
|
|
|
*scope = NULL; |
|
|
|
*scope = NULL; |
|
|
|
gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val); |
|
|
|
gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val); |
|
|
|
result = gpr_leftpad(buf, ' ', 40); |
|
|
|
result = gpr_leftpad(buf, ' ', 60); |
|
|
|
gpr_free(buf); |
|
|
|
gpr_free(buf); |
|
|
|
return result; |
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
@ -2190,7 +2410,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, |
|
|
|
gpr_free(tmp); |
|
|
|
gpr_free(tmp); |
|
|
|
} |
|
|
|
} |
|
|
|
gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val); |
|
|
|
gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val); |
|
|
|
result = gpr_leftpad(buf, ' ', 40); |
|
|
|
result = gpr_leftpad(buf, ' ', 60); |
|
|
|
gpr_free(buf); |
|
|
|
gpr_free(buf); |
|
|
|
return result; |
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
@ -2223,7 +2443,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, |
|
|
|
|
|
|
|
|
|
|
|
tmp_phase = gpr_leftpad(phase, ' ', 8); |
|
|
|
tmp_phase = gpr_leftpad(phase, ' ', 8); |
|
|
|
tmp_scope1 = gpr_leftpad(scope1, ' ', 11); |
|
|
|
tmp_scope1 = gpr_leftpad(scope1, ' ', 11); |
|
|
|
gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1); |
|
|
|
gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1); |
|
|
|
gpr_free(tmp_phase); |
|
|
|
gpr_free(tmp_phase); |
|
|
|
gpr_free(tmp_scope1); |
|
|
|
gpr_free(tmp_scope1); |
|
|
|
|
|
|
|
|
|
|
|