From 7100ee3bfb88a846995818614b84747bd758b743 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 17 Jul 2020 22:20:12 -0700 Subject: [PATCH] Add keepalive throttling --- .../filters/client_channel/client_channel.cc | 36 +++++ .../ext/filters/client_channel/subchannel.cc | 33 +++++ .../ext/filters/client_channel/subchannel.h | 5 + .../chttp2/transport/chttp2_transport.cc | 4 +- src/core/lib/transport/transport.h | 7 + test/core/end2end/cq_verifier.cc | 6 +- test/core/end2end/cq_verifier.h | 4 + .../transport/chttp2/too_many_pings_test.cc | 128 ++++++++++++++++++ 8 files changed, 220 insertions(+), 3 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 1b48d6a4d37..db5358519fc 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -28,6 +28,7 @@ #include +#include "absl/strings/numbers.h" #include "absl/strings/string_view.h" #include @@ -330,6 +331,7 @@ class ChannelData { // applied in the data plane mutex when the picker is updated. std::map, RefCountedPtr> pending_subchannel_updates_; + int keepalive_time_ = -1; // // Fields accessed from both data plane mutex and control plane @@ -971,6 +973,10 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { void ResetBackoff() override { subchannel_->ResetBackoff(); } + void ThrottleKeepaliveTime(int new_keepalive_time) { + subchannel_->ThrottleKeepaliveTime(new_keepalive_time); + } + const grpc_channel_args* channel_args() override { return subchannel_->channel_args(); } @@ -1101,6 +1107,32 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { watcher_.get()); } ConnectivityStateChange state_change = PopConnectivityStateChange(); + absl::optional keepalive_throttling = + state_change.status.GetPayload(grpc_core::keepalive_throttling_key); + if (keepalive_throttling.has_value()) { + int new_keepalive_time = -1; + if (absl::SimpleAtoi(std::string(keepalive_throttling.value()), + &new_keepalive_time)) { + if (new_keepalive_time > parent_->chand_->keepalive_time_) { + parent_->chand_->keepalive_time_ = new_keepalive_time; + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d", + parent_->chand_, parent_->chand_->keepalive_time_); + } + // Propagate the new keepalive time to all subchannels. This is so + // that new transports created by any subchannel (and not just the + // subchannel that received the GOAWAY), use the new keepalive time. + for (auto* subchannel_wrapper : + parent_->chand_->subchannel_wrappers_) { + subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time); + } + } + } else { + gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s", + parent_->chand_, + std::string(keepalive_throttling.value()).c_str()); + } + } // Ignore update if the parent WatcherWrapper has been replaced // since this callback was scheduled. if (watcher_ != nullptr) { @@ -1347,6 +1379,7 @@ class ChannelData::ClientChannelControlHelper &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1); Subchannel* subchannel = chand_->client_channel_factory_->CreateSubchannel(new_args); + subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) return nullptr; return MakeRefCounted( @@ -1682,6 +1715,9 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) channel_args_ = new_args != nullptr ? new_args : grpc_channel_args_copy(args->channel_args); + keepalive_time_ = grpc_channel_args_find_integer( + channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS, + {-1 /* default value, unset */, 1, INT_MAX}); if (!ResolverRegistry::IsValidTarget(target_uri_.get())) { *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("the target uri is not valid."); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 26bc7f8549f..389d7edc02e 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -26,6 +26,7 @@ #include #include +#include "absl/strings/numbers.h" #include "absl/strings/str_format.h" #include @@ -330,6 +331,19 @@ class Subchannel::ConnectedSubchannelStateWatcher void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { Subchannel* c = subchannel_; + absl::optional keepalive_throttling = + status.GetPayload(grpc_core::keepalive_throttling_key); + if (keepalive_throttling.has_value()) { + int new_keepalive_time = -1; + if (absl::SimpleAtoi(std::string(keepalive_throttling.value()), + &new_keepalive_time)) { + c->ThrottleKeepaliveTime(new_keepalive_time); + } else { + gpr_log(GPR_ERROR, + "Subchannel=%p: Illegal keepalive throttling value %s", c, + std::string(keepalive_throttling.value()).c_str()); + } + } MutexLock lock(&c->mu_); switch (new_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -743,6 +757,25 @@ Subchannel* Subchannel::Create(OrphanablePtr connector, return registered; } +void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) { + MutexLock lock(&mu_); + // Only update the value if the new keepalive time is larger. + if (new_keepalive_time > keepalive_time_) { + keepalive_time_ = new_keepalive_time; + if (grpc_trace_subchannel.enabled()) { + gpr_log(GPR_INFO, "Subchannel=%p: Throttling keepalive time to %d", this, + new_keepalive_time); + } + const grpc_arg arg_to_add = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), new_keepalive_time); + const char* arg_to_remove = GRPC_ARG_KEEPALIVE_TIME_MS; + grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( + args_, &arg_to_remove, 1, &arg_to_add, 1); + grpc_channel_args_destroy(args_); + args_ = new_args; + } +} + Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = RefMutate((1 << INTERNAL_REF_BITS), diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index e615c18c944..e945b4465e0 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -228,6 +228,9 @@ class Subchannel { static Subchannel* Create(OrphanablePtr connector, const grpc_channel_args* args); + // Throttle keepalive time + void ThrottleKeepaliveTime(int new_keepalive_time); + // Strong and weak refcounting. Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); @@ -422,6 +425,8 @@ class Subchannel { bool have_retry_alarm_ = false; // reset_backoff() was called while alarm was pending. bool retry_immediately_ = false; + // Keepalive time period (-1 for unset) + int keepalive_time_ = -1; // Channelz tracking. RefCountedPtr channelz_node_; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index bc45e4d69f6..e5f5e7cc5df 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1089,6 +1089,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string, goaway_error, grpc_error_string(t->goaway_error)); } + absl::Status status = grpc_error_to_absl_status(t->goaway_error); /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug * data equal to "too_many_pings", it should log the occurrence at a log level * that is enabled by default and double the configured KEEPALIVE_TIME used @@ -1107,8 +1108,9 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, ? GRPC_MILLIS_INF_FUTURE : static_cast(current_keepalive_time_ms * KEEPALIVE_TIME_BACKOFF_MULTIPLIER); + status.SetPayload(grpc_core::keepalive_throttling_key, + absl::Cord(std::to_string(t->keepalive_time))); } - absl::Status status = grpc_error_to_absl_status(t->goaway_error); /* lie: use transient failure from the transport to indicate goaway has been * received */ connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index cb5b7f7758c..46794d0e869 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -459,4 +459,11 @@ grpc_transport_op* grpc_make_transport_op(grpc_closure* on_consumed); grpc_transport_stream_op_batch* grpc_make_transport_stream_op( grpc_closure* on_consumed); +namespace grpc_core { +// This is the key to be used for loading/storing keepalive_throttling in the +// absl::Status object. +constexpr const char* keepalive_throttling_key = + "grpc.internal.keepalive_throttling"; +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */ diff --git a/test/core/end2end/cq_verifier.cc b/test/core/end2end/cq_verifier.cc index 6030b496226..e73a587dc67 100644 --- a/test/core/end2end/cq_verifier.cc +++ b/test/core/end2end/cq_verifier.cc @@ -237,8 +237,8 @@ static void verify_matches(expectation* e, grpc_event* ev) { } } -void cq_verify(cq_verifier* v) { - const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10); +void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec) { + const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(timeout_sec); while (v->first_expectation != nullptr) { grpc_event ev = grpc_completion_queue_next(v->cq, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT) { @@ -266,6 +266,8 @@ void cq_verify(cq_verifier* v) { } } +void cq_verify(cq_verifier* v) { cq_verify_custom_timeout(v, 10); } + void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) { gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h index f5f2a9f7a33..67382f7fd4d 100644 --- a/test/core/end2end/cq_verifier.h +++ b/test/core/end2end/cq_verifier.h @@ -37,6 +37,10 @@ void cq_verifier_destroy(cq_verifier* v); bound completion queue */ void cq_verify(cq_verifier* v); +/* ensure all expected events (and only those events) are present on the + bound completion queue withing \a timeout_sec */ +void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec); + /* ensure that the completion queue is empty */ void cq_verify_empty(cq_verifier* v); diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index 308b7216e12..9b774c50689 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -172,6 +172,134 @@ TEST(TooManyPings, TestLotsOfServerCancelledRpcsDoesntGiveTooManyPings) { grpc_completion_queue_destroy(cq); } +// Perform a simple RPC where the client makes a request, and both the client +// and server continue reading so that gRPC can send and receive keepalive +// pings. +grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, + grpc_completion_queue* cq, + int expected_keepalive_time) { + grpc_call* c; + grpc_call* s; + cq_verifier* cqv = cq_verifier_create(cq); + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30); + // Start a call + c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, + grpc_slice_from_static_string("/foo"), nullptr, + deadline, nullptr); + GPR_ASSERT(c); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + // Request a call on the server + error = grpc_server_request_call(server, &s, &call_details, + &request_metadata_recv, cq, cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + cq_verify(cqv); + // Since the server is configured to allow only a single ping strike, it would + // take 3 pings to trigger the GOAWAY frame with "too_many_pings" from the + // server. (The second ping from the client would be the first bad ping sent + // too quickly leading to a ping strike and the third ping would lead to the + // GOAWAY.) If the client settings match with the server's settings, there + // won't be a bad ping, and the call will end due to the deadline expiring + // instead. + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + cq_verify_custom_timeout(cqv, 60); + // The call will end after this + cq_verify(cqv); + // cleanup + grpc_slice_unref(details); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_call_unref(c); + grpc_call_unref(s); + cq_verifier_destroy(cqv); + return status; +} +TEST(TooManyPings, KeepaliveThrottling) { + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + // create the server with a ping interval of 10 seconds and a single ping + // strike. + grpc_arg server_args[2]; + server_args[0] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), + 10 * 1000); + server_args[1] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1); + grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), + server_args}; + grpc_server* server = grpc_server_create(&server_channel_args, nullptr); + std::string server_address = + grpc_core::JoinHostPort("localhost", grpc_pick_unused_port_or_die()); + grpc_server_register_completion_queue(server, cq, nullptr); + GPR_ASSERT( + grpc_server_add_insecure_http2_port(server, server_address.c_str())); + grpc_server_start(server); + // create the channel with a keepalive ping interval of 1 second. + grpc_arg client_args[4]; + client_args[0] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0); + client_args[1] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0); + client_args[2] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000); + client_args[3] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0); + grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), + client_args}; + grpc_channel* channel = grpc_insecure_channel_create( + server_address.c_str(), &client_channel_args, nullptr); + int expected_keepalive_time_sec = 1; + // We need 4 GOAWAY frames to throttle the keepalive time from 1 second to 16 + // seconds (> 10sec). + for (int i = 0; i < 4; i++) { + EXPECT_EQ( + PerformWaitingCall(channel, server, cq, expected_keepalive_time_sec), + GRPC_STATUS_UNAVAILABLE); + expected_keepalive_time_sec *= 2; + } + EXPECT_EQ( + PerformWaitingCall(channel, server, cq, expected_keepalive_time_sec), + GRPC_STATUS_DEADLINE_EXCEEDED); + // shutdown and destroy the client and server + grpc_channel_destroy(channel); + grpc_server_shutdown_and_notify(server, cq, nullptr); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_server_destroy(server); + grpc_completion_queue_destroy(cq); +} + } // namespace int main(int argc, char** argv) {