Merge pull request #23575 from yashykt/chttp2transportcomment

Comment formatting in chttp2_transport
reviewable/pr23489/r5^2
Yash Tibrewal 4 years ago committed by GitHub
commit 8bd03cb9d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 310
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc

@ -1,20 +1,18 @@
/* //
* // Copyright 2018 gRPC authors.
* Copyright 2018 gRPC authors. //
* // Licensed under the Apache License, Version 2.0 (the "License");
* Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.
* you may not use this file except in compliance with the License. // You may obtain a copy of the License at
* You may obtain a copy of the License at //
* // http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0 //
* // Unless required by applicable law or agreed to in writing, software
* Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS,
* distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and
* See the License for the specific language governing permissions and // limitations under the License.
* limitations under the License. //
*
*/
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
@ -104,7 +102,7 @@ grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false, grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
"chttp2_refcount"); "chttp2_refcount");
/* forward declarations of various callbacks that we'll build closures around */ // forward declarations of various callbacks that we'll build closures around
static void write_action_begin_locked(void* t, grpc_error* error); static void write_action_begin_locked(void* t, grpc_error* error);
static void write_action(void* t, grpc_error* error); static void write_action(void* t, grpc_error* error);
static void write_action_end(void* t, grpc_error* error); static void write_action_end(void* t, grpc_error* error);
@ -116,14 +114,14 @@ static void continue_read_action_locked(grpc_chttp2_transport* t);
static void complete_fetch(void* gs, grpc_error* error); static void complete_fetch(void* gs, grpc_error* error);
static void complete_fetch_locked(void* gs, grpc_error* error); static void complete_fetch_locked(void* gs, grpc_error* error);
/** Set a transport level setting, and push it to our peer */ // Set a transport level setting, and push it to our peer
static void queue_setting_update(grpc_chttp2_transport* t, static void queue_setting_update(grpc_chttp2_transport* t,
grpc_chttp2_setting_id id, uint32_t value); grpc_chttp2_setting_id id, uint32_t value);
static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s, static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error* error); grpc_error* error);
/** Start new streams that have been created if we can */ // Start new streams that have been created if we can
static void maybe_start_some_streams(grpc_chttp2_transport* t); static void maybe_start_some_streams(grpc_chttp2_transport* t);
static void connectivity_state_set(grpc_chttp2_transport* t, static void connectivity_state_set(grpc_chttp2_transport* t,
@ -156,7 +154,7 @@ static void send_ping_locked(grpc_chttp2_transport* t,
grpc_closure* on_complete); grpc_closure* on_complete);
static void retry_initiate_ping_locked(void* tp, grpc_error* error); static void retry_initiate_ping_locked(void* tp, grpc_error* error);
/** keepalive-relevant functions */ // keepalive-relevant functions
static void init_keepalive_ping(void* arg, grpc_error* error); static void init_keepalive_ping(void* arg, grpc_error* error);
static void init_keepalive_ping_locked(void* arg, grpc_error* error); static void init_keepalive_ping_locked(void* arg, grpc_error* error);
static void start_keepalive_ping(void* arg, grpc_error* error); static void start_keepalive_ping(void* arg, grpc_error* error);
@ -172,9 +170,9 @@ static void reset_byte_stream(void* arg, grpc_error* error);
// GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
bool g_flow_control_enabled = true; bool g_flow_control_enabled = true;
/******************************************************************************* //
* CONSTRUCTION/DESTRUCTION/REFCOUNTING // CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/ //
grpc_chttp2_transport::~grpc_chttp2_transport() { grpc_chttp2_transport::~grpc_chttp2_transport() {
size_t i; size_t i;
@ -233,7 +231,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
static const grpc_transport_vtable* get_vtable(void); static const grpc_transport_vtable* get_vtable(void);
/* Returns whether bdp is enabled */ // Returns whether bdp is enabled
static bool read_channel_args(grpc_chttp2_transport* t, static bool read_channel_args(grpc_chttp2_transport* t,
const grpc_channel_args* channel_args, const grpc_channel_args* channel_args,
bool is_client) { bool is_client) {
@ -431,8 +429,8 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping_locked); &t->init_keepalive_ping_locked);
} else { } else {
/* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
inflight keeaplive timers */ // inflight keeaplive timers
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
} }
} }
@ -453,11 +451,11 @@ grpc_chttp2_transport::grpc_chttp2_transport(
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
base.vtable = get_vtable(); base.vtable = get_vtable();
/* 8 is a random stab in the dark as to a good initial size: it's small enough // 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet // that it shouldn't waste memory for infrequently used connections, yet
large enough that the exponential growth should happen nicely when it's // large enough that the exponential growth should happen nicely when it's
needed. // needed.
TODO(ctiller): tune this */ // TODO(ctiller): tune this
grpc_chttp2_stream_map_init(&stream_map, 8); grpc_chttp2_stream_map_init(&stream_map, 8);
grpc_slice_buffer_init(&read_buffer); grpc_slice_buffer_init(&read_buffer);
@ -468,7 +466,7 @@ grpc_chttp2_transport::grpc_chttp2_transport(
} }
grpc_chttp2_hpack_compressor_init(&hpack_compressor); grpc_chttp2_hpack_compressor_init(&hpack_compressor);
grpc_slice_buffer_init(&qbuf); grpc_slice_buffer_init(&qbuf);
/* copy in initial settings to all setting sets */ // copy in initial settings to all setting sets
size_t i; size_t i;
int j; int j;
for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
@ -479,7 +477,7 @@ grpc_chttp2_transport::grpc_chttp2_transport(
grpc_chttp2_hpack_parser_init(&hpack_parser); grpc_chttp2_hpack_parser_init(&hpack_parser);
grpc_chttp2_goaway_parser_init(&goaway_parser); grpc_chttp2_goaway_parser_init(&goaway_parser);
/* configure http2 the way we like it */ // configure http2 the way we like it
if (is_client) { if (is_client) {
queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
@ -505,7 +503,7 @@ grpc_chttp2_transport::grpc_chttp2_transport(
enable_bdp = false; enable_bdp = false;
} }
/* No pings allowed before receiving a header or data frame. */ // No pings allowed before receiving a header or data frame.
ping_state.pings_before_data_required = 0; ping_state.pings_before_data_required = 0;
ping_state.is_delayed_ping_timer_set = false; ping_state.is_delayed_ping_timer_set = false;
ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
@ -582,11 +580,11 @@ static void close_transport_locked(grpc_chttp2_transport* t,
break; break;
case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED: case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
/* keepalive timers are not set in these two states */ // keepalive timers are not set in these two states
break; break;
} }
/* flush writable stream list to avoid dangling references */ // flush writable stream list to avoid dangling references
grpc_chttp2_stream* s; grpc_chttp2_stream* s;
while (grpc_chttp2_list_pop_writable_stream(t, &s)) { while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close"); GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
@ -619,9 +617,9 @@ void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
#endif #endif
grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) { grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
/* We reserve one 'active stream' that's dropped when the stream is // We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for Chttp2IncomingByteStreams that are // read-closed. The others are for Chttp2IncomingByteStreams that are
actively reading */ // actively reading
GRPC_CHTTP2_STREAM_REF(s, "chttp2"); GRPC_CHTTP2_STREAM_REF(s, "chttp2");
GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream"); GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
} }
@ -777,9 +775,9 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
return accepting; return accepting;
} }
/******************************************************************************* //
* OUTPUT PROCESSING // OUTPUT PROCESSING
*/ //
static const char* write_state_name(grpc_chttp2_write_state st) { static const char* write_state_name(grpc_chttp2_write_state st) {
switch (st) { switch (st) {
@ -800,12 +798,12 @@ static void set_write_state(grpc_chttp2_transport* t,
t->is_client ? "CLIENT" : "SERVER", t->peer_string, t->is_client ? "CLIENT" : "SERVER", t->peer_string,
write_state_name(t->write_state), write_state_name(st), reason)); write_state_name(t->write_state), write_state_name(st), reason));
t->write_state = st; t->write_state = st;
/* If the state is being reset back to idle, it means a write was just // If the state is being reset back to idle, it means a write was just
* finished. Make sure all the run_after_write closures are scheduled. // finished. Make sure all the run_after_write closures are scheduled.
* //
* This is also our chance to close the transport if the transport was marked // This is also our chance to close the transport if the transport was marked
* to be closed after all writes finish (for example, if we received a go-away // to be closed after all writes finish (for example, if we received a go-away
* from peer while we had some pending writes) */ // from peer while we had some pending writes)
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write); grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
if (t->close_transport_on_writes_finished != nullptr) { if (t->close_transport_on_writes_finished != nullptr) {
@ -892,22 +890,22 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
grpc_chttp2_initiate_write_reason_string(reason)); grpc_chttp2_initiate_write_reason_string(reason));
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
/* Note that the 'write_action_begin_locked' closure is being scheduled // Note that the 'write_action_begin_locked' closure is being scheduled
* on the 'finally_scheduler' of t->combiner. This means that // on the 'finally_scheduler' of t->combiner. This means that
* 'write_action_begin_locked' is called only *after* all the other // 'write_action_begin_locked' is called only *after* all the other
* closures (some of which are potentially initiating more writes on the // closures (some of which are potentially initiating more writes on the
* transport) are executed on the t->combiner. // transport) are executed on the t->combiner.
* //
* The reason for scheduling on finally_scheduler is to make sure we batch // The reason for scheduling on finally_scheduler is to make sure we batch
* as many writes as possible. 'write_action_begin_locked' is the function // as many writes as possible. 'write_action_begin_locked' is the function
* that gathers all the relevant bytes (which are at various places in the // that gathers all the relevant bytes (which are at various places in the
* grpc_chttp2_transport structure) and append them to 'outbuf' field in // grpc_chttp2_transport structure) and append them to 'outbuf' field in
* grpc_chttp2_transport thereby batching what would have been potentially // grpc_chttp2_transport thereby batching what would have been potentially
* multiple write operations. // multiple write operations.
* //
* Also, 'write_action_begin_locked' only gathers the bytes into outbuf. // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
* It does not call the endpoint to write the bytes. That is done by the // It does not call the endpoint to write the bytes. That is done by the
* 'write_action' (which is scheduled by 'write_action_begin_locked') */ // 'write_action' (which is scheduled by 'write_action_begin_locked')
t->combiner->FinallyRun( t->combiner->FinallyRun(
GRPC_CLOSURE_INIT(&t->write_action_begin_locked, GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t, nullptr), write_action_begin_locked, t, nullptr),
@ -959,9 +957,9 @@ static void write_action_begin_locked(void* gt, grpc_error* /*error_ignored*/) {
write_action(t, GRPC_ERROR_NONE); write_action(t, GRPC_ERROR_NONE);
if (t->reading_paused_on_pending_induced_frames) { if (t->reading_paused_on_pending_induced_frames) {
GPR_ASSERT(t->num_pending_induced_frames == 0); GPR_ASSERT(t->num_pending_induced_frames == 0);
/* We had paused reading, because we had many induced frames (SETTINGS // We had paused reading, because we had many induced frames (SETTINGS
* ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
* been able to flush qbuf, we can resume reading. */ // been able to flush qbuf, we can resume reading.
GRPC_CHTTP2_IF_TRACING(gpr_log( GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_INFO, GPR_INFO,
"transport %p : Resuming reading after being paused due to too " "transport %p : Resuming reading after being paused due to too "
@ -996,8 +994,8 @@ static void write_action_end(void* tp, grpc_error* error) {
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
/* Callback from the grpc_endpoint after bytes have been written by calling // Callback from the grpc_endpoint after bytes have been written by calling
* sendmsg */ // sendmsg
static void write_action_end_locked(void* tp, grpc_error* error) { static void write_action_end_locked(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
@ -1083,16 +1081,16 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
GRPC_CHTTP2_IF_TRACING( GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t, gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
last_stream_id)); last_stream_id));
/* We want to log this irrespective of whether http tracing is enabled if we // We want to log this irrespective of whether http tracing is enabled if we
* received a GOAWAY with a non NO_ERROR code. */ // received a GOAWAY with a non NO_ERROR code.
if (goaway_error != GRPC_HTTP2_NO_ERROR) { if (goaway_error != GRPC_HTTP2_NO_ERROR) {
gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string, gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string,
goaway_error, grpc_error_string(t->goaway_error)); goaway_error, grpc_error_string(t->goaway_error));
} }
/* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
* data equal to "too_many_pings", it should log the occurrence at a log level // data equal to "too_many_pings", it should log the occurrence at a log level
* that is enabled by default and double the configured KEEPALIVE_TIME used // that is enabled by default and double the configured KEEPALIVE_TIME used
* for new connections on that channel. */ // for new connections on that channel.
if (GPR_UNLIKELY(t->is_client && if (GPR_UNLIKELY(t->is_client &&
goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM && goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) { grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
@ -1109,15 +1107,15 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
KEEPALIVE_TIME_BACKOFF_MULTIPLIER); KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
} }
absl::Status status = grpc_error_to_absl_status(t->goaway_error); absl::Status status = grpc_error_to_absl_status(t->goaway_error);
/* lie: use transient failure from the transport to indicate goaway has been // lie: use transient failure from the transport to indicate goaway has been
* received */ // received.
connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status, connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
"got_goaway"); "got_goaway");
} }
static void maybe_start_some_streams(grpc_chttp2_transport* t) { static void maybe_start_some_streams(grpc_chttp2_transport* t) {
grpc_chttp2_stream* s; grpc_chttp2_stream* s;
/* cancel out streams that haven't yet started if we have received a GOAWAY */ // cancel out streams that haven't yet started if we have received a GOAWAY
if (t->goaway_error != GRPC_ERROR_NONE) { if (t->goaway_error != GRPC_ERROR_NONE) {
while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) { while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
grpc_chttp2_cancel_stream( grpc_chttp2_cancel_stream(
@ -1128,14 +1126,14 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
} }
return; return;
} }
/* start streams where we have free grpc_chttp2_stream ids and free // start streams where we have free grpc_chttp2_stream ids and free
* concurrency */ // * concurrency
while (t->next_stream_id <= MAX_CLIENT_STREAM_ID && while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_stream_map_size(&t->stream_map) < grpc_chttp2_stream_map_size(&t->stream_map) <
t->settings[GRPC_PEER_SETTINGS] t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) { grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
/* safe since we can't (legally) be parsing this stream yet */ // safe since we can't (legally) be parsing this stream yet
GRPC_CHTTP2_IF_TRACING(gpr_log( GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_INFO, GPR_INFO,
"HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d", "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
@ -1157,7 +1155,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
grpc_chttp2_mark_stream_writable(t, s); grpc_chttp2_mark_stream_writable(t, s);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
} }
/* cancel out streams that will never be started */ // cancel out streams that will never be started
if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) { if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) { while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
grpc_chttp2_cancel_stream( grpc_chttp2_cancel_stream(
@ -1169,12 +1167,12 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
} }
} }
/* Flag that this closure barrier may be covering a write in a pollset, and so // Flag that this closure barrier may be covering a write in a pollset, and so
we should not complete this closure until we can prove that the write got // we should not complete this closure until we can prove that the write got
scheduled */ // scheduled
#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0) #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
/* First bit of the reference count, stored in the high order bits (with the low // First bit of the reference count, stored in the high order bits (with the low
bits being used for flags defined above) */ // bits being used for flags defined above)
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure* add_closure_barrier(grpc_closure* closure) { static grpc_closure* add_closure_barrier(grpc_closure* closure) {
@ -1266,7 +1264,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) { grpc_chttp2_stream* s) {
for (;;) { for (;;) {
if (s->fetching_send_message == nullptr) { if (s->fetching_send_message == nullptr) {
/* Stream was cancelled before message fetch completed */ // Stream was cancelled before message fetch completed
abort(); /* TODO(ctiller): what cleanup here? */ abort(); /* TODO(ctiller): what cleanup here? */
return; /* early out */ return; /* early out */
} }
@ -1396,7 +1394,7 @@ static void perform_stream_op_locked(void* stream_op,
GPR_ASSERT(s->send_initial_metadata_finished == nullptr); GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
/* Identify stream compression */ // Identify stream compression
if (op_payload->send_initial_metadata.send_initial_metadata->idx.named if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
.content_encoding == nullptr || .content_encoding == nullptr ||
grpc_stream_compression_method_parse( grpc_stream_compression_method_parse(
@ -1569,8 +1567,8 @@ static void perform_stream_op_locked(void* stream_op,
"stream was closed"), "stream was closed"),
"send_trailing_metadata_finished"); "send_trailing_metadata_finished");
} else if (s->id != 0) { } else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding // TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */ // bytes before going writable
grpc_chttp2_mark_stream_writable(t, s); grpc_chttp2_mark_stream_writable(t, s);
grpc_chttp2_initiate_write( grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
@ -1672,8 +1670,8 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
} }
static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
/* callback remaining pings: they're not allowed to call into the transport, // callback remaining pings: they're not allowed to call into the transport,
and maybe they hold resources that need to be freed */ // and maybe they hold resources that need to be freed
grpc_chttp2_ping_queue* pq = &t->ping_queue; grpc_chttp2_ping_queue* pq = &t->ping_queue;
GPR_ASSERT(error != GRPC_ERROR_NONE); GPR_ASSERT(error != GRPC_ERROR_NONE);
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
@ -1699,11 +1697,9 @@ static void send_ping_locked(grpc_chttp2_transport* t,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
/* // Specialized form of send_ping_locked for keepalive ping. If there is already
* Specialized form of send_ping_locked for keepalive ping. If there is already // a ping in progress, the keepalive ping would piggyback onto that ping,
* a ping in progress, the keepalive ping would piggyback onto that ping, // instead of waiting for that ping to complete and then starting a new ping.
* instead of waiting for that ping to complete and then starting a new ping.
*/
static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
if (t->closed_with_error != GRPC_ERROR_NONE) { if (t->closed_with_error != GRPC_ERROR_NONE) {
t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
@ -1717,7 +1713,7 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
} }
grpc_chttp2_ping_queue* pq = &t->ping_queue; grpc_chttp2_ping_queue* pq = &t->ping_queue;
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
/* There is a ping in flight. Add yourself to the inflight closure list. */ // There is a ping in flight. Add yourself to the inflight closure list.
t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
start_keepalive_ping_locked, t, nullptr), start_keepalive_ping_locked, t, nullptr),
GRPC_ERROR_REF(t->closed_with_error)); GRPC_ERROR_REF(t->closed_with_error));
@ -1772,7 +1768,7 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
} }
static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) { static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
/* We want to log this irrespective of whether http tracing is enabled */ // We want to log this irrespective of whether http tracing is enabled
gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string, gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string,
grpc_error_string(error)); grpc_error_string(error));
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
@ -1794,7 +1790,7 @@ void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
grpc_error_set_int( grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
/*The transport will be closed after the write is done */ // The transport will be closed after the write is done
close_transport_locked( close_transport_locked(
t, grpc_error_set_int( t, grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
@ -1869,9 +1865,9 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
/******************************************************************************* //
* INPUT PROCESSING - GENERAL // INPUT PROCESSING - GENERAL
*/ //
void grpc_chttp2_maybe_complete_recv_initial_metadata( void grpc_chttp2_maybe_complete_recv_initial_metadata(
grpc_chttp2_transport* /*t*/, grpc_chttp2_stream* s) { grpc_chttp2_transport* /*t*/, grpc_chttp2_stream* s) {
@ -1991,8 +1987,8 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
s->unprocessed_incoming_frames_buffer.length > 0; s->unprocessed_incoming_frames_buffer.length > 0;
if (s->read_closed && s->frame_storage.length > 0 && !pending_data && if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
!s->seen_error && s->recv_trailing_metadata_finished != nullptr) { !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
/* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and // Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
* maybe decompress the next 5 bytes in the stream. */ // maybe decompress the next 5 bytes in the stream.
if (s->stream_decompression_method == if (s->stream_decompression_method ==
GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
grpc_slice_buffer_move_first( grpc_slice_buffer_move_first(
@ -2114,12 +2110,12 @@ void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
if (status != GRPC_STATUS_OK) { if (status != GRPC_STATUS_OK) {
s->seen_error = true; s->seen_error = true;
} }
/* stream_global->recv_trailing_metadata_finished gives us a // stream_global->recv_trailing_metadata_finished gives us a
last chance replacement: we've received trailing metadata, // last chance replacement: we've received trailing metadata,
but something more important has become available to signal // but something more important has become available to signal
to the upper layers - drop what we've got, and then publish // to the upper layers - drop what we've got, and then publish
what we want - which is safe because we haven't told anyone // what we want - which is safe because we haven't told anyone
about the metadata yet */ // about the metadata yet
if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED || if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
s->recv_trailing_metadata_finished != nullptr) { s->recv_trailing_metadata_finished != nullptr) {
char status_string[GPR_LTOA_MIN_BUFSIZE]; char status_string[GPR_LTOA_MIN_BUFSIZE];
@ -2211,7 +2207,7 @@ void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
grpc_chttp2_stream* s, int close_reads, grpc_chttp2_stream* s, int close_reads,
int close_writes, grpc_error* error) { int close_writes, grpc_error* error) {
if (s->read_closed && s->write_closed) { if (s->read_closed && s->write_closed) {
/* already closed, but we should still fake the status if needed. */ // already closed, but we should still fake the status if needed.
grpc_error* overall_error = removal_error(error, s, "Stream removed"); grpc_error* overall_error = removal_error(error, s, "Stream removed");
if (overall_error != GRPC_ERROR_NONE) { if (overall_error != GRPC_ERROR_NONE) {
grpc_chttp2_fake_status(t, s, overall_error); grpc_chttp2_fake_status(t, s, overall_error);
@ -2238,7 +2234,7 @@ void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
if (s->id != 0) { if (s->id != 0) {
remove_stream(t, s->id, GRPC_ERROR_REF(overall_error)); remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
} else { } else {
/* Purge streams waiting on concurrency still waiting for id assignment */ // Purge streams waiting on concurrency still waiting for id assignment
grpc_chttp2_list_remove_waiting_for_concurrency(t, s); grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
} }
if (overall_error != GRPC_ERROR_NONE) { if (overall_error != GRPC_ERROR_NONE) {
@ -2277,12 +2273,12 @@ static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100); GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
/* Hand roll a header block. // Hand roll a header block.
This is unnecessarily ugly - at some point we should find a more // This is unnecessarily ugly - at some point we should find a more
elegant solution. // elegant solution.
It's complicated by the fact that our send machinery would be dead by // It's complicated by the fact that our send machinery would be dead by
the time we got around to sending this, so instead we ignore HPACK // the time we got around to sending this, so instead we ignore HPACK
compression and just write the uncompressed bytes onto the wire. */ // compression and just write the uncompressed bytes onto the wire.
if (!s->sent_initial_metadata) { if (!s->sent_initial_metadata) {
http_status_hdr = GRPC_SLICE_MALLOC(13); http_status_hdr = GRPC_SLICE_MALLOC(13);
p = GRPC_SLICE_START_PTR(http_status_hdr); p = GRPC_SLICE_START_PTR(http_status_hdr);
@ -2443,9 +2439,9 @@ static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
/******************************************************************************* //
* INPUT PROCESSING - PARSING // INPUT PROCESSING - PARSING
*/ //
template <class F> template <class F>
static void WithUrgency(grpc_chttp2_transport* t, static void WithUrgency(grpc_chttp2_transport* t,
@ -2580,8 +2576,8 @@ static void read_action_locked(void* tp, grpc_error* error) {
"Transport closed", &t->closed_with_error, 1); "Transport closed", &t->closed_with_error, 1);
} }
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
/* If a goaway frame was received, this might be the reason why the read // If a goaway frame was received, this might be the reason why the read
* failed. Add this info to the error */ // failed. Add this info to the error
if (t->goaway_error != GRPC_ERROR_NONE) { if (t->goaway_error != GRPC_ERROR_NONE) {
error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error)); error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
} }
@ -2590,7 +2586,7 @@ static void read_action_locked(void* tp, grpc_error* error) {
t->endpoint_reading = 0; t->endpoint_reading = 0;
} else if (t->closed_with_error == GRPC_ERROR_NONE) { } else if (t->closed_with_error == GRPC_ERROR_NONE) {
keep_reading = true; keep_reading = true;
/* Since we have read a byte, reset the keepalive timer */ // Since we have read a byte, reset the keepalive timer
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(&t->keepalive_ping_timer); grpc_timer_cancel(&t->keepalive_ping_timer);
} }
@ -2651,7 +2647,7 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) {
if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
return; return;
} }
/* Reset the keepalive ping timer */ // Reset the keepalive ping timer
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(&t->keepalive_ping_timer); grpc_timer_cancel(&t->keepalive_ping_timer);
} }
@ -2677,8 +2673,8 @@ static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
return; return;
} }
if (!t->bdp_ping_started) { if (!t->bdp_ping_started) {
/* start_bdp_ping_locked has not been run yet. Schedule // start_bdp_ping_locked has not been run yet. Schedule
* finish_bdp_ping_locked to be run later. */ // finish_bdp_ping_locked to be run later.
t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
finish_bdp_ping_locked, t, nullptr), finish_bdp_ping_locked, t, nullptr),
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
@ -2811,7 +2807,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
&t->init_keepalive_ping_locked); &t->init_keepalive_ping_locked);
} }
} else if (error == GRPC_ERROR_CANCELLED) { } else if (error == GRPC_ERROR_CANCELLED) {
/* The keepalive ping timer may be cancelled by bdp */ // The keepalive ping timer may be cancelled by bdp
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
@ -2866,8 +2862,8 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string);
} }
if (!t->keepalive_ping_started) { if (!t->keepalive_ping_started) {
/* start_keepalive_ping_locked has not run yet. Reschedule // start_keepalive_ping_locked has not run yet. Reschedule
* finish_keepalive_ping_locked for it to be run later. */ // finish_keepalive_ping_locked for it to be run later.
t->combiner->Run( t->combiner->Run(
GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
finish_keepalive_ping_locked, t, nullptr), finish_keepalive_ping_locked, t, nullptr),
@ -2910,8 +2906,8 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
GRPC_STATUS_UNAVAILABLE)); GRPC_STATUS_UNAVAILABLE));
} }
} else { } else {
/* The watchdog timer should have been cancelled by // The watchdog timer should have been cancelled by
* finish_keepalive_ping_locked. */ // finish_keepalive_ping_locked.
if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
@ -2920,9 +2916,9 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
} }
/******************************************************************************* //
* CALLBACK LOOP // CALLBACK LOOP
*/ //
static void connectivity_state_set(grpc_chttp2_transport* t, static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state, grpc_connectivity_state state,
@ -2933,9 +2929,9 @@ static void connectivity_state_set(grpc_chttp2_transport* t,
t->state_tracker.SetState(state, status, reason); t->state_tracker.SetState(state, status, reason);
} }
/******************************************************************************* //
* POLLSET STUFF // POLLSET STUFF
*/ //
static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/, static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
grpc_pollset* pollset) { grpc_pollset* pollset) {
@ -2949,9 +2945,9 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
grpc_endpoint_add_to_pollset_set(t->ep, pollset_set); grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
} }
/******************************************************************************* //
* BYTE STREAM // BYTE STREAM
*/ //
static void reset_byte_stream(void* arg, grpc_error* error) { static void reset_byte_stream(void* arg, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg); grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
@ -3039,7 +3035,7 @@ void Chttp2IncomingByteStream::NextLocked(void* arg,
s->data_parser.parsing_frame = nullptr; s->data_parser.parsing_frame = nullptr;
} }
} else { } else {
/* Should never reach here. */ // Should never reach here.
GPR_ASSERT(false); GPR_ASSERT(false);
} }
} else { } else {
@ -3168,9 +3164,9 @@ void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
} // namespace grpc_core } // namespace grpc_core
/******************************************************************************* //
* RESOURCE QUOTAS // RESOURCE QUOTAS
*/ //
static void post_benign_reclaimer(grpc_chttp2_transport* t) { static void post_benign_reclaimer(grpc_chttp2_transport* t) {
if (!t->benign_reclaimer_registered) { if (!t->benign_reclaimer_registered) {
@ -3205,8 +3201,8 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
if (error == GRPC_ERROR_NONE && if (error == GRPC_ERROR_NONE &&
grpc_chttp2_stream_map_size(&t->stream_map) == 0) { grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
/* Channel with no active streams: send a goaway to try and make it // Channel with no active streams: send a goaway to try and make it
* disconnect cleanly */ // disconnect cleanly
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
t->peer_string); t->peer_string);
@ -3254,10 +3250,10 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_ERROR_INT_HTTP2_ERROR,
GRPC_HTTP2_ENHANCE_YOUR_CALM)); GRPC_HTTP2_ENHANCE_YOUR_CALM));
if (n > 1) { if (n > 1) {
/* Since we cancel one stream per destructive reclamation, if // Since we cancel one stream per destructive reclamation, if
there are more streams left, we can immediately post a new // there are more streams left, we can immediately post a new
reclaimer in case the resource quota needs to free more // reclaimer in case the resource quota needs to free more
memory */ // memory
post_destructive_reclaimer(t); post_destructive_reclaimer(t);
} }
} }
@ -3268,9 +3264,9 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
} }
/******************************************************************************* //
* MONITORING // MONITORING
*/ //
const char* grpc_chttp2_initiate_write_reason_string( const char* grpc_chttp2_initiate_write_reason_string(
grpc_chttp2_initiate_write_reason reason) { grpc_chttp2_initiate_write_reason reason) {

Loading…
Cancel
Save