From 7100ee3bfb88a846995818614b84747bd758b743 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 17 Jul 2020 22:20:12 -0700 Subject: [PATCH 1/7] Add keepalive throttling --- .../filters/client_channel/client_channel.cc | 36 +++++ .../ext/filters/client_channel/subchannel.cc | 33 +++++ .../ext/filters/client_channel/subchannel.h | 5 + .../chttp2/transport/chttp2_transport.cc | 4 +- src/core/lib/transport/transport.h | 7 + test/core/end2end/cq_verifier.cc | 6 +- test/core/end2end/cq_verifier.h | 4 + .../transport/chttp2/too_many_pings_test.cc | 128 ++++++++++++++++++ 8 files changed, 220 insertions(+), 3 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 1b48d6a4d37..db5358519fc 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -28,6 +28,7 @@ #include +#include "absl/strings/numbers.h" #include "absl/strings/string_view.h" #include @@ -330,6 +331,7 @@ class ChannelData { // applied in the data plane mutex when the picker is updated. std::map, RefCountedPtr> pending_subchannel_updates_; + int keepalive_time_ = -1; // // Fields accessed from both data plane mutex and control plane @@ -971,6 +973,10 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { void ResetBackoff() override { subchannel_->ResetBackoff(); } + void ThrottleKeepaliveTime(int new_keepalive_time) { + subchannel_->ThrottleKeepaliveTime(new_keepalive_time); + } + const grpc_channel_args* channel_args() override { return subchannel_->channel_args(); } @@ -1101,6 +1107,32 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { watcher_.get()); } ConnectivityStateChange state_change = PopConnectivityStateChange(); + absl::optional keepalive_throttling = + state_change.status.GetPayload(grpc_core::keepalive_throttling_key); + if (keepalive_throttling.has_value()) { + int new_keepalive_time = -1; + if (absl::SimpleAtoi(std::string(keepalive_throttling.value()), + &new_keepalive_time)) { + if (new_keepalive_time > parent_->chand_->keepalive_time_) { + parent_->chand_->keepalive_time_ = new_keepalive_time; + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d", + parent_->chand_, parent_->chand_->keepalive_time_); + } + // Propagate the new keepalive time to all subchannels. This is so + // that new transports created by any subchannel (and not just the + // subchannel that received the GOAWAY), use the new keepalive time. + for (auto* subchannel_wrapper : + parent_->chand_->subchannel_wrappers_) { + subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time); + } + } + } else { + gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s", + parent_->chand_, + std::string(keepalive_throttling.value()).c_str()); + } + } // Ignore update if the parent WatcherWrapper has been replaced // since this callback was scheduled. if (watcher_ != nullptr) { @@ -1347,6 +1379,7 @@ class ChannelData::ClientChannelControlHelper &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1); Subchannel* subchannel = chand_->client_channel_factory_->CreateSubchannel(new_args); + subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) return nullptr; return MakeRefCounted( @@ -1682,6 +1715,9 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) channel_args_ = new_args != nullptr ? new_args : grpc_channel_args_copy(args->channel_args); + keepalive_time_ = grpc_channel_args_find_integer( + channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS, + {-1 /* default value, unset */, 1, INT_MAX}); if (!ResolverRegistry::IsValidTarget(target_uri_.get())) { *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("the target uri is not valid."); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 26bc7f8549f..389d7edc02e 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -26,6 +26,7 @@ #include #include +#include "absl/strings/numbers.h" #include "absl/strings/str_format.h" #include @@ -330,6 +331,19 @@ class Subchannel::ConnectedSubchannelStateWatcher void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { Subchannel* c = subchannel_; + absl::optional keepalive_throttling = + status.GetPayload(grpc_core::keepalive_throttling_key); + if (keepalive_throttling.has_value()) { + int new_keepalive_time = -1; + if (absl::SimpleAtoi(std::string(keepalive_throttling.value()), + &new_keepalive_time)) { + c->ThrottleKeepaliveTime(new_keepalive_time); + } else { + gpr_log(GPR_ERROR, + "Subchannel=%p: Illegal keepalive throttling value %s", c, + std::string(keepalive_throttling.value()).c_str()); + } + } MutexLock lock(&c->mu_); switch (new_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -743,6 +757,25 @@ Subchannel* Subchannel::Create(OrphanablePtr connector, return registered; } +void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) { + MutexLock lock(&mu_); + // Only update the value if the new keepalive time is larger. + if (new_keepalive_time > keepalive_time_) { + keepalive_time_ = new_keepalive_time; + if (grpc_trace_subchannel.enabled()) { + gpr_log(GPR_INFO, "Subchannel=%p: Throttling keepalive time to %d", this, + new_keepalive_time); + } + const grpc_arg arg_to_add = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), new_keepalive_time); + const char* arg_to_remove = GRPC_ARG_KEEPALIVE_TIME_MS; + grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( + args_, &arg_to_remove, 1, &arg_to_add, 1); + grpc_channel_args_destroy(args_); + args_ = new_args; + } +} + Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = RefMutate((1 << INTERNAL_REF_BITS), diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index e615c18c944..e945b4465e0 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -228,6 +228,9 @@ class Subchannel { static Subchannel* Create(OrphanablePtr connector, const grpc_channel_args* args); + // Throttle keepalive time + void ThrottleKeepaliveTime(int new_keepalive_time); + // Strong and weak refcounting. Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); @@ -422,6 +425,8 @@ class Subchannel { bool have_retry_alarm_ = false; // reset_backoff() was called while alarm was pending. bool retry_immediately_ = false; + // Keepalive time period (-1 for unset) + int keepalive_time_ = -1; // Channelz tracking. RefCountedPtr channelz_node_; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index bc45e4d69f6..e5f5e7cc5df 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1089,6 +1089,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string, goaway_error, grpc_error_string(t->goaway_error)); } + absl::Status status = grpc_error_to_absl_status(t->goaway_error); /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug * data equal to "too_many_pings", it should log the occurrence at a log level * that is enabled by default and double the configured KEEPALIVE_TIME used @@ -1107,8 +1108,9 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, ? GRPC_MILLIS_INF_FUTURE : static_cast(current_keepalive_time_ms * KEEPALIVE_TIME_BACKOFF_MULTIPLIER); + status.SetPayload(grpc_core::keepalive_throttling_key, + absl::Cord(std::to_string(t->keepalive_time))); } - absl::Status status = grpc_error_to_absl_status(t->goaway_error); /* lie: use transient failure from the transport to indicate goaway has been * received */ connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index cb5b7f7758c..46794d0e869 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -459,4 +459,11 @@ grpc_transport_op* grpc_make_transport_op(grpc_closure* on_consumed); grpc_transport_stream_op_batch* grpc_make_transport_stream_op( grpc_closure* on_consumed); +namespace grpc_core { +// This is the key to be used for loading/storing keepalive_throttling in the +// absl::Status object. +constexpr const char* keepalive_throttling_key = + "grpc.internal.keepalive_throttling"; +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */ diff --git a/test/core/end2end/cq_verifier.cc b/test/core/end2end/cq_verifier.cc index 6030b496226..e73a587dc67 100644 --- a/test/core/end2end/cq_verifier.cc +++ b/test/core/end2end/cq_verifier.cc @@ -237,8 +237,8 @@ static void verify_matches(expectation* e, grpc_event* ev) { } } -void cq_verify(cq_verifier* v) { - const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10); +void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec) { + const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(timeout_sec); while (v->first_expectation != nullptr) { grpc_event ev = grpc_completion_queue_next(v->cq, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT) { @@ -266,6 +266,8 @@ void cq_verify(cq_verifier* v) { } } +void cq_verify(cq_verifier* v) { cq_verify_custom_timeout(v, 10); } + void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) { gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h index f5f2a9f7a33..67382f7fd4d 100644 --- a/test/core/end2end/cq_verifier.h +++ b/test/core/end2end/cq_verifier.h @@ -37,6 +37,10 @@ void cq_verifier_destroy(cq_verifier* v); bound completion queue */ void cq_verify(cq_verifier* v); +/* ensure all expected events (and only those events) are present on the + bound completion queue withing \a timeout_sec */ +void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec); + /* ensure that the completion queue is empty */ void cq_verify_empty(cq_verifier* v); diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index 308b7216e12..9b774c50689 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -172,6 +172,134 @@ TEST(TooManyPings, TestLotsOfServerCancelledRpcsDoesntGiveTooManyPings) { grpc_completion_queue_destroy(cq); } +// Perform a simple RPC where the client makes a request, and both the client +// and server continue reading so that gRPC can send and receive keepalive +// pings. +grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, + grpc_completion_queue* cq, + int expected_keepalive_time) { + grpc_call* c; + grpc_call* s; + cq_verifier* cqv = cq_verifier_create(cq); + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30); + // Start a call + c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, + grpc_slice_from_static_string("/foo"), nullptr, + deadline, nullptr); + GPR_ASSERT(c); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + // Request a call on the server + error = grpc_server_request_call(server, &s, &call_details, + &request_metadata_recv, cq, cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + cq_verify(cqv); + // Since the server is configured to allow only a single ping strike, it would + // take 3 pings to trigger the GOAWAY frame with "too_many_pings" from the + // server. (The second ping from the client would be the first bad ping sent + // too quickly leading to a ping strike and the third ping would lead to the + // GOAWAY.) If the client settings match with the server's settings, there + // won't be a bad ping, and the call will end due to the deadline expiring + // instead. + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + cq_verify_custom_timeout(cqv, 60); + // The call will end after this + cq_verify(cqv); + // cleanup + grpc_slice_unref(details); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_call_unref(c); + grpc_call_unref(s); + cq_verifier_destroy(cqv); + return status; +} +TEST(TooManyPings, KeepaliveThrottling) { + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + // create the server with a ping interval of 10 seconds and a single ping + // strike. + grpc_arg server_args[2]; + server_args[0] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), + 10 * 1000); + server_args[1] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1); + grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), + server_args}; + grpc_server* server = grpc_server_create(&server_channel_args, nullptr); + std::string server_address = + grpc_core::JoinHostPort("localhost", grpc_pick_unused_port_or_die()); + grpc_server_register_completion_queue(server, cq, nullptr); + GPR_ASSERT( + grpc_server_add_insecure_http2_port(server, server_address.c_str())); + grpc_server_start(server); + // create the channel with a keepalive ping interval of 1 second. + grpc_arg client_args[4]; + client_args[0] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0); + client_args[1] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0); + client_args[2] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000); + client_args[3] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0); + grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), + client_args}; + grpc_channel* channel = grpc_insecure_channel_create( + server_address.c_str(), &client_channel_args, nullptr); + int expected_keepalive_time_sec = 1; + // We need 4 GOAWAY frames to throttle the keepalive time from 1 second to 16 + // seconds (> 10sec). + for (int i = 0; i < 4; i++) { + EXPECT_EQ( + PerformWaitingCall(channel, server, cq, expected_keepalive_time_sec), + GRPC_STATUS_UNAVAILABLE); + expected_keepalive_time_sec *= 2; + } + EXPECT_EQ( + PerformWaitingCall(channel, server, cq, expected_keepalive_time_sec), + GRPC_STATUS_DEADLINE_EXCEEDED); + // shutdown and destroy the client and server + grpc_channel_destroy(channel); + grpc_server_shutdown_and_notify(server, cq, nullptr); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_server_destroy(server); + grpc_completion_queue_destroy(cq); +} + } // namespace int main(int argc, char** argv) { From d51601d165afcadb5abb3f0bc4b489bbeb0e6ed2 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 22 Jul 2020 20:02:16 -0700 Subject: [PATCH 2/7] Clang format --- src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index e5f5e7cc5df..3cf2bf45a74 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1089,7 +1089,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string, goaway_error, grpc_error_string(t->goaway_error)); } - absl::Status status = grpc_error_to_absl_status(t->goaway_error); + absl::Status status = grpc_error_to_absl_status(t->goaway_error); /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug * data equal to "too_many_pings", it should log the occurrence at a log level * that is enabled by default and double the configured KEEPALIVE_TIME used From aeeb0d711762450dd40ed1f8fd723e82cfcb446e Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 22 Jul 2020 20:16:21 -0700 Subject: [PATCH 3/7] Remove unnecessary parsing of status payload in subchannel.cc --- src/core/ext/filters/client_channel/subchannel.cc | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 389d7edc02e..9bae661c7e5 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -331,19 +331,6 @@ class Subchannel::ConnectedSubchannelStateWatcher void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { Subchannel* c = subchannel_; - absl::optional keepalive_throttling = - status.GetPayload(grpc_core::keepalive_throttling_key); - if (keepalive_throttling.has_value()) { - int new_keepalive_time = -1; - if (absl::SimpleAtoi(std::string(keepalive_throttling.value()), - &new_keepalive_time)) { - c->ThrottleKeepaliveTime(new_keepalive_time); - } else { - gpr_log(GPR_ERROR, - "Subchannel=%p: Illegal keepalive throttling value %s", c, - std::string(keepalive_throttling.value()).c_str()); - } - } MutexLock lock(&c->mu_); switch (new_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: From cf0b46c4d678793d63ab7b6be511c539da93deb0 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 5 Aug 2020 07:37:54 -0700 Subject: [PATCH 4/7] Reviewer comments --- .../filters/client_channel/client_channel.cc | 2 +- .../ext/filters/client_channel/subchannel.cc | 1 - .../ext/filters/client_channel/subchannel.h | 4 +- .../chttp2/transport/chttp2_transport.cc | 2 +- src/core/lib/transport/transport.h | 2 +- test/core/end2end/cq_verifier.cc | 4 +- test/core/end2end/cq_verifier.h | 8 +- .../transport/chttp2/too_many_pings_test.cc | 196 +++++++++++++++--- 8 files changed, 171 insertions(+), 48 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index db5358519fc..fd201ee1825 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1108,7 +1108,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { } ConnectivityStateChange state_change = PopConnectivityStateChange(); absl::optional keepalive_throttling = - state_change.status.GetPayload(grpc_core::keepalive_throttling_key); + state_change.status.GetPayload(grpc_core::kKeepaliveThrottlingKey); if (keepalive_throttling.has_value()) { int new_keepalive_time = -1; if (absl::SimpleAtoi(std::string(keepalive_throttling.value()), diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 9bae661c7e5..c872e3ed7e4 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -26,7 +26,6 @@ #include #include -#include "absl/strings/numbers.h" #include "absl/strings/str_format.h" #include diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 53a3924da99..f220db2fa27 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -228,7 +228,9 @@ class Subchannel { static Subchannel* Create(OrphanablePtr connector, const grpc_channel_args* args); - // Throttle keepalive time + // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time + // is larger than the subchannel's current keepalive time. The updated value + // would have an affect when the subchannel creates a new ConnectedSubchannel. void ThrottleKeepaliveTime(int new_keepalive_time); // Strong and weak refcounting. diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 2cfc1cb6959..4cb4afa8a0e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1103,7 +1103,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, ? GRPC_MILLIS_INF_FUTURE : static_cast(current_keepalive_time_ms * KEEPALIVE_TIME_BACKOFF_MULTIPLIER); - status.SetPayload(grpc_core::keepalive_throttling_key, + status.SetPayload(grpc_core::kKeepaliveThrottlingKey, absl::Cord(std::to_string(t->keepalive_time))); } // lie: use transient failure from the transport to indicate goaway has been diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 46794d0e869..cbb23b6cf0d 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -462,7 +462,7 @@ grpc_transport_stream_op_batch* grpc_make_transport_stream_op( namespace grpc_core { // This is the key to be used for loading/storing keepalive_throttling in the // absl::Status object. -constexpr const char* keepalive_throttling_key = +constexpr const char* kKeepaliveThrottlingKey = "grpc.internal.keepalive_throttling"; } // namespace grpc_core diff --git a/test/core/end2end/cq_verifier.cc b/test/core/end2end/cq_verifier.cc index e73a587dc67..959315d6e69 100644 --- a/test/core/end2end/cq_verifier.cc +++ b/test/core/end2end/cq_verifier.cc @@ -237,7 +237,7 @@ static void verify_matches(expectation* e, grpc_event* ev) { } } -void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec) { +void cq_verify(cq_verifier* v, int timeout_sec) { const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(timeout_sec); while (v->first_expectation != nullptr) { grpc_event ev = grpc_completion_queue_next(v->cq, deadline, nullptr); @@ -266,8 +266,6 @@ void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec) { } } -void cq_verify(cq_verifier* v) { cq_verify_custom_timeout(v, 10); } - void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) { gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h index 67382f7fd4d..fd0a7380f91 100644 --- a/test/core/end2end/cq_verifier.h +++ b/test/core/end2end/cq_verifier.h @@ -34,12 +34,8 @@ cq_verifier* cq_verifier_create(grpc_completion_queue* cq); void cq_verifier_destroy(cq_verifier* v); /* ensure all expected events (and only those events) are present on the - bound completion queue */ -void cq_verify(cq_verifier* v); - -/* ensure all expected events (and only those events) are present on the - bound completion queue withing \a timeout_sec */ -void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec); + bound completion queue within \a timeout_sec */ +void cq_verify(cq_verifier* v, int timeout_sec = 10); /* ensure that the completion queue is empty */ void cq_verify_empty(cq_verifier* v); diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index 9b774c50689..9b06ef41cb3 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -25,6 +25,8 @@ #include #include +#include "absl/strings/str_cat.h" + #include #include #include @@ -37,6 +39,8 @@ #include #include +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/thd.h" @@ -176,8 +180,7 @@ TEST(TooManyPings, TestLotsOfServerCancelledRpcsDoesntGiveTooManyPings) { // and server continue reading so that gRPC can send and receive keepalive // pings. grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, - grpc_completion_queue* cq, - int expected_keepalive_time) { + grpc_completion_queue* cq) { grpc_call* c; grpc_call* s; cq_verifier* cqv = cq_verifier_create(cq); @@ -189,7 +192,7 @@ grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, grpc_status_code status; grpc_call_error error; grpc_slice details; - gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30); + gpr_timespec deadline = grpc_timeout_seconds_to_deadline(15); // Start a call c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, grpc_slice_from_static_string("/foo"), nullptr, @@ -229,9 +232,8 @@ grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, // won't be a bad ping, and the call will end due to the deadline expiring // instead. CQ_EXPECT_COMPLETION(cqv, tag(1), 1); - cq_verify_custom_timeout(cqv, 60); // The call will end after this - cq_verify(cqv); + cq_verify(cqv, 60); // cleanup grpc_slice_unref(details); grpc_metadata_array_destroy(&trailing_metadata_recv); @@ -242,54 +244,68 @@ grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, cq_verifier_destroy(cqv); return status; } -TEST(TooManyPings, KeepaliveThrottling) { + +TEST(TooManyPings, KeepaliveThrottlingMultipleChannels) { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); - // create the server with a ping interval of 10 seconds and a single ping + // create the server with a ping interval of 5 seconds and a single ping // strike. - grpc_arg server_args[2]; - server_args[0] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), - 10 * 1000); - server_args[1] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1); + grpc_arg server_args[] = { + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), + 5 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), server_args}; grpc_server* server = grpc_server_create(&server_channel_args, nullptr); std::string server_address = - grpc_core::JoinHostPort("localhost", grpc_pick_unused_port_or_die()); + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); grpc_server_register_completion_queue(server, cq, nullptr); GPR_ASSERT( grpc_server_add_insecure_http2_port(server, server_address.c_str())); grpc_server_start(server); - // create the channel with a keepalive ping interval of 1 second. - grpc_arg client_args[4]; - client_args[0] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0); - client_args[1] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), - 0); - client_args[2] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000); - client_args[3] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0); + // create two channel with a keepalive ping interval of 1 second. + grpc_arg client_args[]{ + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0)}; grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), client_args}; grpc_channel* channel = grpc_insecure_channel_create( server_address.c_str(), &client_channel_args, nullptr); + grpc_channel* channel_dup = grpc_insecure_channel_create( + server_address.c_str(), &client_channel_args, nullptr); int expected_keepalive_time_sec = 1; - // We need 4 GOAWAY frames to throttle the keepalive time from 1 second to 16 - // seconds (> 10sec). - for (int i = 0; i < 4; i++) { - EXPECT_EQ( - PerformWaitingCall(channel, server, cq, expected_keepalive_time_sec), - GRPC_STATUS_UNAVAILABLE); + // We need 3 GOAWAY frames to throttle the keepalive time from 1 second to 8 + // seconds (> 5sec). + for (int i = 0; i < 3; i++) { + gpr_log(GPR_INFO, "Expected keepalive time : %d", + expected_keepalive_time_sec); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE); expected_keepalive_time_sec *= 2; } - EXPECT_EQ( - PerformWaitingCall(channel, server, cq, expected_keepalive_time_sec), - GRPC_STATUS_DEADLINE_EXCEEDED); + gpr_log( + GPR_INFO, + "Client keepalive time %d should now be in sync with the server settings", + expected_keepalive_time_sec); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + // Since the subchannel is shared, the second channel should also have + // keepalive settings in sync with the server. + gpr_log(GPR_INFO, "Now testing second channel sharing the same subchannel"); + EXPECT_EQ(PerformWaitingCall(channel_dup, server, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); // shutdown and destroy the client and server grpc_channel_destroy(channel); + grpc_channel_destroy(channel_dup); grpc_server_shutdown_and_notify(server, cq, nullptr); grpc_completion_queue_shutdown(cq); while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), @@ -300,6 +316,118 @@ TEST(TooManyPings, KeepaliveThrottling) { grpc_completion_queue_destroy(cq); } +grpc_core::Resolver::Result BuildResolverResult( + const std::vector& addresses) { + grpc_core::Resolver::Result result; + for (const auto& address_str : addresses) { + grpc_uri* uri = grpc_uri_parse(address_str.c_str(), true); + if (uri == nullptr) { + gpr_log(GPR_ERROR, "Failed to parse uri:%s", address_str.c_str()); + GPR_ASSERT(0); + } + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(uri, &address)); + result.addresses.emplace_back(address.addr, address.len, nullptr); + grpc_uri_destroy(uri); + } + return result; +} + +TEST(TooManyPings, KeepaliveThrottlingMultipleSubchannels) { + grpc_core::ExecCtx exec_ctx; + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + // create two servers with a ping interval of 5 seconds and a single ping + // strike. + grpc_arg server_args[] = { + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), + 5 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; + grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), + server_args}; + grpc_server* server1 = grpc_server_create(&server_channel_args, nullptr); + std::string server_address1 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + grpc_server_register_completion_queue(server1, cq, nullptr); + GPR_ASSERT( + grpc_server_add_insecure_http2_port(server1, server_address1.c_str())); + grpc_server_start(server1); + grpc_server* server2 = grpc_server_create(&server_channel_args, nullptr); + std::string server_address2 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + grpc_server_register_completion_queue(server2, cq, nullptr); + GPR_ASSERT( + grpc_server_add_insecure_http2_port(server2, server_address2.c_str())); + grpc_server_start(server2); + // create a single channel with multiple subchannels with a keepalive ping + // interval of 1 second. To get finer control on subchannel connection times, + // we are using pick_first instead of round_robin and using the fake resolver + // response generator to switch between the two. + auto response_generator = + grpc_core::MakeRefCounted(); + grpc_arg client_args[] = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), + grpc_core::FakeResolverResponseGenerator::MakeChannelArg( + response_generator.get())}; + grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), + client_args}; + grpc_channel* channel = + grpc_insecure_channel_create("fake:///", &client_channel_args, nullptr); + // For a single subchannel 3 GOAWAYs would be sufficient to increase the + // keepalive time from 1 second to beyond 5 seconds. Even though we are + // alternating between two subchannels, 3 GOAWAYs should still be enough since + // the channel should start all new transports with the new keepalive value + // (even those from a different subchannel). + int expected_keepalive_time_sec = 1; + for (int i = 0; i < 3; i++) { + gpr_log(GPR_INFO, "Expected keepalive time : %d", + expected_keepalive_time_sec); + response_generator->SetResponse(BuildResolverResult({absl::StrCat( + "ipv4:", i % 2 == 0 ? server_address1 : server_address2)})); + // ExecCtx::Flush() might not be enough to make sure that the resolver + // result has been propagated, so sleep for a bit. + grpc_core::ExecCtx::Get()->Flush(); + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + EXPECT_EQ(PerformWaitingCall(channel, i % 2 == 0 ? server1 : server2, cq), + GRPC_STATUS_UNAVAILABLE); + } + gpr_log( + GPR_INFO, + "Client keepalive time %d should now be in sync with the server settings", + expected_keepalive_time_sec); + response_generator->SetResponse( + BuildResolverResult({absl::StrCat("ipv4:", server_address2)})); + grpc_core::ExecCtx::Get()->Flush(); + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + EXPECT_EQ(PerformWaitingCall(channel, server2, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + // shutdown and destroy the client and server + grpc_channel_destroy(channel); + grpc_server_shutdown_and_notify(server1, cq, nullptr); + grpc_server_shutdown_and_notify(server2, cq, nullptr); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_server_destroy(server1); + grpc_server_destroy(server2); + grpc_completion_queue_destroy(cq); +} + } // namespace int main(int argc, char** argv) { From 1618d2179646a6379dc190d439816fc8b31e3718 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 10 Aug 2020 14:05:27 -0700 Subject: [PATCH 5/7] Add roundrobin test and reviewer comments --- .../health/health_check_client.cc | 3 +- .../ext/filters/client_channel/subchannel.h | 2 +- .../transport/chttp2/too_many_pings_test.cc | 191 +++++++++++++----- 3 files changed, 148 insertions(+), 48 deletions(-) diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 5d65293af29..60a2ebb4bfa 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -91,11 +91,12 @@ void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state, gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this, ConnectivityStateName(state), reason); } - if (watcher_ != nullptr) + if (watcher_ != nullptr) { watcher_->Notify(state, state == GRPC_CHANNEL_TRANSIENT_FAILURE ? absl::Status(absl::StatusCode::kUnavailable, reason) : absl::Status()); + } } void HealthCheckClient::Orphan() { diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index f220db2fa27..f3de5c80747 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -230,7 +230,7 @@ class Subchannel { // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time // is larger than the subchannel's current keepalive time. The updated value - // would have an affect when the subchannel creates a new ConnectedSubchannel. + // will have an affect when the subchannel creates a new ConnectedSubchannel. void ThrottleKeepaliveTime(int new_keepalive_time); // Strong and weak refcounting. diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index 9b06ef41cb3..c53db6ef229 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -245,28 +245,76 @@ grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, return status; } -TEST(TooManyPings, KeepaliveThrottlingMultipleChannels) { +class KeepaliveThrottlingTest : public ::testing::Test { + protected: + // Starts the server and makes sure that the channel is able to get connected. + grpc_server* ServerStart(const char* addr, grpc_completion_queue* cq) { + // Set up server channel args to expect pings at an interval of 5 seconds + // and use a single ping strike + grpc_arg server_args[] = { + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), + 5 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; + grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), + server_args}; + // Create server + grpc_server* server = grpc_server_create(&server_channel_args, nullptr); + grpc_server_register_completion_queue(server, cq, nullptr); + GPR_ASSERT(grpc_server_add_insecure_http2_port(server, addr)); + grpc_server_start(server); + return server; + } + + // Shuts down and destroys the server. Also, makes sure that the channel + // receives the disconnection event. + void ServerShutdownAndDestroy(grpc_server* server, + grpc_completion_queue* cq) { + // Shutdown and destroy server + grpc_server_shutdown_and_notify(server, cq, (void*)(1000)); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .tag != (void*)(1000)) + ; + grpc_server_destroy(server); + } + + void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) { + grpc_connectivity_state state = + grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */); + while (state != GRPC_CHANNEL_READY) { + grpc_channel_watch_connectivity_state( + channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr); + grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), + nullptr); + state = grpc_channel_check_connectivity_state(channel, 0); + } + } + + void VerifyChannelDisconnected(grpc_channel* channel, + grpc_completion_queue* cq) { + // Verify channel gets disconnected. Use a ping to make sure that clients + // tries sending/receiving bytes if the channel is connected. + grpc_channel_ping(channel, cq, (void*)(2000), nullptr); + grpc_event ev = grpc_completion_queue_next( + cq, grpc_timeout_seconds_to_deadline(5), nullptr); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + GPR_ASSERT(ev.tag == (void*)(2000)); + GPR_ASSERT(ev.success == 0); + GPR_ASSERT(grpc_channel_check_connectivity_state(channel, 0) != + GRPC_CHANNEL_READY); + } +}; + +TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleChannels) { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); - // create the server with a ping interval of 5 seconds and a single ping - // strike. - grpc_arg server_args[] = { - grpc_channel_arg_integer_create( - const_cast( - GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), - 5 * 1000), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; - grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), - server_args}; - grpc_server* server = grpc_server_create(&server_channel_args, nullptr); std::string server_address = grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); - grpc_server_register_completion_queue(server, cq, nullptr); - GPR_ASSERT( - grpc_server_add_insecure_http2_port(server, server_address.c_str())); - grpc_server_start(server); + grpc_server* server = ServerStart(server_address.c_str(), cq); // create two channel with a keepalive ping interval of 1 second. - grpc_arg client_args[]{ + grpc_arg client_args[] = { grpc_channel_arg_integer_create( const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), grpc_channel_arg_integer_create( @@ -306,13 +354,12 @@ TEST(TooManyPings, KeepaliveThrottlingMultipleChannels) { // shutdown and destroy the client and server grpc_channel_destroy(channel); grpc_channel_destroy(channel_dup); - grpc_server_shutdown_and_notify(server, cq, nullptr); + ServerShutdownAndDestroy(server, cq); grpc_completion_queue_shutdown(cq); while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr) .type != GRPC_QUEUE_SHUTDOWN) ; - grpc_server_destroy(server); grpc_completion_queue_destroy(cq); } @@ -333,34 +380,17 @@ grpc_core::Resolver::Result BuildResolverResult( return result; } -TEST(TooManyPings, KeepaliveThrottlingMultipleSubchannels) { +// Tests that when new subchannels are created due to a change in resolved +// addresses, the new subchannels use the updated keepalive time. +TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleSubchannels1) { grpc_core::ExecCtx exec_ctx; grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); - // create two servers with a ping interval of 5 seconds and a single ping - // strike. - grpc_arg server_args[] = { - grpc_channel_arg_integer_create( - const_cast( - GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), - 5 * 1000), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; - grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), - server_args}; - grpc_server* server1 = grpc_server_create(&server_channel_args, nullptr); std::string server_address1 = grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); - grpc_server_register_completion_queue(server1, cq, nullptr); - GPR_ASSERT( - grpc_server_add_insecure_http2_port(server1, server_address1.c_str())); - grpc_server_start(server1); - grpc_server* server2 = grpc_server_create(&server_channel_args, nullptr); std::string server_address2 = grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); - grpc_server_register_completion_queue(server2, cq, nullptr); - GPR_ASSERT( - grpc_server_add_insecure_http2_port(server2, server_address2.c_str())); - grpc_server_start(server2); + grpc_server* server1 = ServerStart(server_address1.c_str(), cq); + grpc_server* server2 = ServerStart(server_address2.c_str(), cq); // create a single channel with multiple subchannels with a keepalive ping // interval of 1 second. To get finer control on subchannel connection times, // we are using pick_first instead of round_robin and using the fake resolver @@ -403,6 +433,7 @@ TEST(TooManyPings, KeepaliveThrottlingMultipleSubchannels) { gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); EXPECT_EQ(PerformWaitingCall(channel, i % 2 == 0 ? server1 : server2, cq), GRPC_STATUS_UNAVAILABLE); + expected_keepalive_time_sec *= 2; } gpr_log( GPR_INFO, @@ -416,15 +447,83 @@ TEST(TooManyPings, KeepaliveThrottlingMultipleSubchannels) { GRPC_STATUS_DEADLINE_EXCEEDED); // shutdown and destroy the client and server grpc_channel_destroy(channel); - grpc_server_shutdown_and_notify(server1, cq, nullptr); - grpc_server_shutdown_and_notify(server2, cq, nullptr); + ServerShutdownAndDestroy(server1, cq); + ServerShutdownAndDestroy(server2, cq); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_completion_queue_destroy(cq); +} + +// Tests that when a channel has multiple subchannels and receives a GOAWAY with +// "too_many_pings" on one of them, all subchannels start any new transports +// with an updated keepalive time. +TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleSubchannels2) { + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + std::string server_address1 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + std::string server_address2 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + // create a single channel with round robin load balancing policy. + auto response_generator = + grpc_core::MakeRefCounted(); + grpc_arg client_args[] = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), + grpc_core::FakeResolverResponseGenerator::MakeChannelArg( + response_generator.get())}; + grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), + client_args}; + grpc_channel* channel = + grpc_insecure_channel_create("fake:///", &client_channel_args, nullptr); + response_generator->SetResponse( + BuildResolverResult({absl::StrCat("ipv4:", server_address1), + absl::StrCat("ipv4:", server_address2)})); + // For a single subchannel 3 GOAWAYs would be sufficient to increase the + // keepalive time from 1 second to beyond 5 seconds. Even though we are + // alternating between two subchannels, 3 GOAWAYs should still be enough since + // the channel should start all new transports with the new keepalive value + // (even those from a different subchannel). + int expected_keepalive_time_sec = 1; + for (int i = 0; i < 3; i++) { + gpr_log(GPR_ERROR, "Expected keepalive time : %d", + expected_keepalive_time_sec); + grpc_server* server = ServerStart( + i % 2 == 0 ? server_address1.c_str() : server_address2.c_str(), cq); + VerifyChannelReady(channel, cq); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE); + ServerShutdownAndDestroy(server, cq); + VerifyChannelDisconnected(channel, cq); + expected_keepalive_time_sec *= 2; + } + gpr_log( + GPR_INFO, + "Client keepalive time %d should now be in sync with the server settings", + expected_keepalive_time_sec); + grpc_server* server = ServerStart(server_address1.c_str(), cq); + VerifyChannelReady(channel, cq); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + ServerShutdownAndDestroy(server, cq); + // shutdown and destroy the client + grpc_channel_destroy(channel); grpc_completion_queue_shutdown(cq); while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr) .type != GRPC_QUEUE_SHUTDOWN) ; - grpc_server_destroy(server1); - grpc_server_destroy(server2); grpc_completion_queue_destroy(cq); } From c62d2b483c53aaab3a6c278cf2d4602c838b7afd Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 13 Aug 2020 17:12:57 -0700 Subject: [PATCH 6/7] Test renaming --- test/core/transport/chttp2/too_many_pings_test.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index c53db6ef229..1195a47bd31 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -382,7 +382,7 @@ grpc_core::Resolver::Result BuildResolverResult( // Tests that when new subchannels are created due to a change in resolved // addresses, the new subchannels use the updated keepalive time. -TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleSubchannels1) { +TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) { grpc_core::ExecCtx exec_ctx; grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); std::string server_address1 = @@ -460,7 +460,8 @@ TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleSubchannels1) { // Tests that when a channel has multiple subchannels and receives a GOAWAY with // "too_many_pings" on one of them, all subchannels start any new transports // with an updated keepalive time. -TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleSubchannels2) { +TEST_F(KeepaliveThrottlingTest, + ExistingSubchannelsUseNewKeepaliveTimeWhenReconnecting) { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); std::string server_address1 = grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); From 371d55a1af744886a587c136f4440333ad139527 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 14 Aug 2020 00:43:07 -0700 Subject: [PATCH 7/7] Fix SegFault issue in case of bad subchannel --- src/core/ext/filters/client_channel/client_channel.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index f9116b1cf51..15a39b23884 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1380,9 +1380,9 @@ class ChannelData::ClientChannelControlHelper &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1); Subchannel* subchannel = chand_->client_channel_factory_->CreateSubchannel(new_args); - subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) return nullptr; + subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_); return MakeRefCounted( chand_, subchannel, std::move(health_check_service_name)); }