diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index ecd80ac16e8..15a39b23884 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -28,6 +28,7 @@ #include +#include "absl/strings/numbers.h" #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" @@ -331,6 +332,7 @@ class ChannelData { // applied in the data plane mutex when the picker is updated. std::map, RefCountedPtr> pending_subchannel_updates_; + int keepalive_time_ = -1; // // Fields accessed from both data plane mutex and control plane @@ -972,6 +974,10 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { void ResetBackoff() override { subchannel_->ResetBackoff(); } + void ThrottleKeepaliveTime(int new_keepalive_time) { + subchannel_->ThrottleKeepaliveTime(new_keepalive_time); + } + const grpc_channel_args* channel_args() override { return subchannel_->channel_args(); } @@ -1102,6 +1108,32 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { watcher_.get()); } ConnectivityStateChange state_change = PopConnectivityStateChange(); + absl::optional keepalive_throttling = + state_change.status.GetPayload(grpc_core::kKeepaliveThrottlingKey); + if (keepalive_throttling.has_value()) { + int new_keepalive_time = -1; + if (absl::SimpleAtoi(std::string(keepalive_throttling.value()), + &new_keepalive_time)) { + if (new_keepalive_time > parent_->chand_->keepalive_time_) { + parent_->chand_->keepalive_time_ = new_keepalive_time; + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d", + parent_->chand_, parent_->chand_->keepalive_time_); + } + // Propagate the new keepalive time to all subchannels. This is so + // that new transports created by any subchannel (and not just the + // subchannel that received the GOAWAY), use the new keepalive time. + for (auto* subchannel_wrapper : + parent_->chand_->subchannel_wrappers_) { + subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time); + } + } + } else { + gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s", + parent_->chand_, + std::string(keepalive_throttling.value()).c_str()); + } + } // Ignore update if the parent WatcherWrapper has been replaced // since this callback was scheduled. if (watcher_ != nullptr) { @@ -1350,6 +1382,7 @@ class ChannelData::ClientChannelControlHelper chand_->client_channel_factory_->CreateSubchannel(new_args); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) return nullptr; + subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_); return MakeRefCounted( chand_, subchannel, std::move(health_check_service_name)); } @@ -1683,6 +1716,9 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) channel_args_ = new_args != nullptr ? new_args : grpc_channel_args_copy(args->channel_args); + keepalive_time_ = grpc_channel_args_find_integer( + channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS, + {-1 /* default value, unset */, 1, INT_MAX}); if (!ResolverRegistry::IsValidTarget(target_uri_.get())) { std::string error_message = absl::StrCat("the target uri is not valid: ", target_uri_.get()); diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 5d65293af29..60a2ebb4bfa 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -91,11 +91,12 @@ void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state, gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this, ConnectivityStateName(state), reason); } - if (watcher_ != nullptr) + if (watcher_ != nullptr) { watcher_->Notify(state, state == GRPC_CHANNEL_TRANSIENT_FAILURE ? absl::Status(absl::StatusCode::kUnavailable, reason) : absl::Status()); + } } void HealthCheckClient::Orphan() { diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 26bc7f8549f..c872e3ed7e4 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -743,6 +743,25 @@ Subchannel* Subchannel::Create(OrphanablePtr connector, return registered; } +void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) { + MutexLock lock(&mu_); + // Only update the value if the new keepalive time is larger. + if (new_keepalive_time > keepalive_time_) { + keepalive_time_ = new_keepalive_time; + if (grpc_trace_subchannel.enabled()) { + gpr_log(GPR_INFO, "Subchannel=%p: Throttling keepalive time to %d", this, + new_keepalive_time); + } + const grpc_arg arg_to_add = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), new_keepalive_time); + const char* arg_to_remove = GRPC_ARG_KEEPALIVE_TIME_MS; + grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( + args_, &arg_to_remove, 1, &arg_to_add, 1); + grpc_channel_args_destroy(args_); + args_ = new_args; + } +} + Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = RefMutate((1 << INTERNAL_REF_BITS), diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index ce890bde139..f3de5c80747 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -228,6 +228,11 @@ class Subchannel { static Subchannel* Create(OrphanablePtr connector, const grpc_channel_args* args); + // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time + // is larger than the subchannel's current keepalive time. The updated value + // will have an affect when the subchannel creates a new ConnectedSubchannel. + void ThrottleKeepaliveTime(int new_keepalive_time); + // Strong and weak refcounting. Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); @@ -422,6 +427,8 @@ class Subchannel { bool have_retry_alarm_ = false; // reset_backoff() was called while alarm was pending. bool retry_immediately_ = false; + // Keepalive time period (-1 for unset) + int keepalive_time_ = -1; // Channelz tracking. RefCountedPtr channelz_node_; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 3ea224e0f9b..4cb4afa8a0e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1084,6 +1084,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string.c_str(), goaway_error, grpc_error_string(t->goaway_error)); } + absl::Status status = grpc_error_to_absl_status(t->goaway_error); // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug // data equal to "too_many_pings", it should log the occurrence at a log level // that is enabled by default and double the configured KEEPALIVE_TIME used @@ -1102,8 +1103,9 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, ? GRPC_MILLIS_INF_FUTURE : static_cast(current_keepalive_time_ms * KEEPALIVE_TIME_BACKOFF_MULTIPLIER); + status.SetPayload(grpc_core::kKeepaliveThrottlingKey, + absl::Cord(std::to_string(t->keepalive_time))); } - absl::Status status = grpc_error_to_absl_status(t->goaway_error); // lie: use transient failure from the transport to indicate goaway has been // received. connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index cb5b7f7758c..cbb23b6cf0d 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -459,4 +459,11 @@ grpc_transport_op* grpc_make_transport_op(grpc_closure* on_consumed); grpc_transport_stream_op_batch* grpc_make_transport_stream_op( grpc_closure* on_consumed); +namespace grpc_core { +// This is the key to be used for loading/storing keepalive_throttling in the +// absl::Status object. +constexpr const char* kKeepaliveThrottlingKey = + "grpc.internal.keepalive_throttling"; +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */ diff --git a/test/core/end2end/cq_verifier.cc b/test/core/end2end/cq_verifier.cc index 6030b496226..959315d6e69 100644 --- a/test/core/end2end/cq_verifier.cc +++ b/test/core/end2end/cq_verifier.cc @@ -237,8 +237,8 @@ static void verify_matches(expectation* e, grpc_event* ev) { } } -void cq_verify(cq_verifier* v) { - const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10); +void cq_verify(cq_verifier* v, int timeout_sec) { + const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(timeout_sec); while (v->first_expectation != nullptr) { grpc_event ev = grpc_completion_queue_next(v->cq, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT) { diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h index f5f2a9f7a33..fd0a7380f91 100644 --- a/test/core/end2end/cq_verifier.h +++ b/test/core/end2end/cq_verifier.h @@ -34,8 +34,8 @@ cq_verifier* cq_verifier_create(grpc_completion_queue* cq); void cq_verifier_destroy(cq_verifier* v); /* ensure all expected events (and only those events) are present on the - bound completion queue */ -void cq_verify(cq_verifier* v); + bound completion queue within \a timeout_sec */ +void cq_verify(cq_verifier* v, int timeout_sec = 10); /* ensure that the completion queue is empty */ void cq_verify_empty(cq_verifier* v); diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index 308b7216e12..1195a47bd31 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -25,6 +25,8 @@ #include #include +#include "absl/strings/str_cat.h" + #include #include #include @@ -37,6 +39,8 @@ #include #include +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/thd.h" @@ -172,6 +176,358 @@ TEST(TooManyPings, TestLotsOfServerCancelledRpcsDoesntGiveTooManyPings) { grpc_completion_queue_destroy(cq); } +// Perform a simple RPC where the client makes a request, and both the client +// and server continue reading so that gRPC can send and receive keepalive +// pings. +grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, + grpc_completion_queue* cq) { + grpc_call* c; + grpc_call* s; + cq_verifier* cqv = cq_verifier_create(cq); + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + gpr_timespec deadline = grpc_timeout_seconds_to_deadline(15); + // Start a call + c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, + grpc_slice_from_static_string("/foo"), nullptr, + deadline, nullptr); + GPR_ASSERT(c); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + // Request a call on the server + error = grpc_server_request_call(server, &s, &call_details, + &request_metadata_recv, cq, cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + cq_verify(cqv); + // Since the server is configured to allow only a single ping strike, it would + // take 3 pings to trigger the GOAWAY frame with "too_many_pings" from the + // server. (The second ping from the client would be the first bad ping sent + // too quickly leading to a ping strike and the third ping would lead to the + // GOAWAY.) If the client settings match with the server's settings, there + // won't be a bad ping, and the call will end due to the deadline expiring + // instead. + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + // The call will end after this + cq_verify(cqv, 60); + // cleanup + grpc_slice_unref(details); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_call_unref(c); + grpc_call_unref(s); + cq_verifier_destroy(cqv); + return status; +} + +class KeepaliveThrottlingTest : public ::testing::Test { + protected: + // Starts the server and makes sure that the channel is able to get connected. + grpc_server* ServerStart(const char* addr, grpc_completion_queue* cq) { + // Set up server channel args to expect pings at an interval of 5 seconds + // and use a single ping strike + grpc_arg server_args[] = { + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), + 5 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; + grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), + server_args}; + // Create server + grpc_server* server = grpc_server_create(&server_channel_args, nullptr); + grpc_server_register_completion_queue(server, cq, nullptr); + GPR_ASSERT(grpc_server_add_insecure_http2_port(server, addr)); + grpc_server_start(server); + return server; + } + + // Shuts down and destroys the server. Also, makes sure that the channel + // receives the disconnection event. + void ServerShutdownAndDestroy(grpc_server* server, + grpc_completion_queue* cq) { + // Shutdown and destroy server + grpc_server_shutdown_and_notify(server, cq, (void*)(1000)); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .tag != (void*)(1000)) + ; + grpc_server_destroy(server); + } + + void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) { + grpc_connectivity_state state = + grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */); + while (state != GRPC_CHANNEL_READY) { + grpc_channel_watch_connectivity_state( + channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr); + grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), + nullptr); + state = grpc_channel_check_connectivity_state(channel, 0); + } + } + + void VerifyChannelDisconnected(grpc_channel* channel, + grpc_completion_queue* cq) { + // Verify channel gets disconnected. Use a ping to make sure that clients + // tries sending/receiving bytes if the channel is connected. + grpc_channel_ping(channel, cq, (void*)(2000), nullptr); + grpc_event ev = grpc_completion_queue_next( + cq, grpc_timeout_seconds_to_deadline(5), nullptr); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + GPR_ASSERT(ev.tag == (void*)(2000)); + GPR_ASSERT(ev.success == 0); + GPR_ASSERT(grpc_channel_check_connectivity_state(channel, 0) != + GRPC_CHANNEL_READY); + } +}; + +TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleChannels) { + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + std::string server_address = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + grpc_server* server = ServerStart(server_address.c_str(), cq); + // create two channel with a keepalive ping interval of 1 second. + grpc_arg client_args[] = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0)}; + grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), + client_args}; + grpc_channel* channel = grpc_insecure_channel_create( + server_address.c_str(), &client_channel_args, nullptr); + grpc_channel* channel_dup = grpc_insecure_channel_create( + server_address.c_str(), &client_channel_args, nullptr); + int expected_keepalive_time_sec = 1; + // We need 3 GOAWAY frames to throttle the keepalive time from 1 second to 8 + // seconds (> 5sec). + for (int i = 0; i < 3; i++) { + gpr_log(GPR_INFO, "Expected keepalive time : %d", + expected_keepalive_time_sec); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE); + expected_keepalive_time_sec *= 2; + } + gpr_log( + GPR_INFO, + "Client keepalive time %d should now be in sync with the server settings", + expected_keepalive_time_sec); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + // Since the subchannel is shared, the second channel should also have + // keepalive settings in sync with the server. + gpr_log(GPR_INFO, "Now testing second channel sharing the same subchannel"); + EXPECT_EQ(PerformWaitingCall(channel_dup, server, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + // shutdown and destroy the client and server + grpc_channel_destroy(channel); + grpc_channel_destroy(channel_dup); + ServerShutdownAndDestroy(server, cq); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_completion_queue_destroy(cq); +} + +grpc_core::Resolver::Result BuildResolverResult( + const std::vector& addresses) { + grpc_core::Resolver::Result result; + for (const auto& address_str : addresses) { + grpc_uri* uri = grpc_uri_parse(address_str.c_str(), true); + if (uri == nullptr) { + gpr_log(GPR_ERROR, "Failed to parse uri:%s", address_str.c_str()); + GPR_ASSERT(0); + } + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(uri, &address)); + result.addresses.emplace_back(address.addr, address.len, nullptr); + grpc_uri_destroy(uri); + } + return result; +} + +// Tests that when new subchannels are created due to a change in resolved +// addresses, the new subchannels use the updated keepalive time. +TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) { + grpc_core::ExecCtx exec_ctx; + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + std::string server_address1 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + std::string server_address2 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + grpc_server* server1 = ServerStart(server_address1.c_str(), cq); + grpc_server* server2 = ServerStart(server_address2.c_str(), cq); + // create a single channel with multiple subchannels with a keepalive ping + // interval of 1 second. To get finer control on subchannel connection times, + // we are using pick_first instead of round_robin and using the fake resolver + // response generator to switch between the two. + auto response_generator = + grpc_core::MakeRefCounted(); + grpc_arg client_args[] = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), + grpc_core::FakeResolverResponseGenerator::MakeChannelArg( + response_generator.get())}; + grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), + client_args}; + grpc_channel* channel = + grpc_insecure_channel_create("fake:///", &client_channel_args, nullptr); + // For a single subchannel 3 GOAWAYs would be sufficient to increase the + // keepalive time from 1 second to beyond 5 seconds. Even though we are + // alternating between two subchannels, 3 GOAWAYs should still be enough since + // the channel should start all new transports with the new keepalive value + // (even those from a different subchannel). + int expected_keepalive_time_sec = 1; + for (int i = 0; i < 3; i++) { + gpr_log(GPR_INFO, "Expected keepalive time : %d", + expected_keepalive_time_sec); + response_generator->SetResponse(BuildResolverResult({absl::StrCat( + "ipv4:", i % 2 == 0 ? server_address1 : server_address2)})); + // ExecCtx::Flush() might not be enough to make sure that the resolver + // result has been propagated, so sleep for a bit. + grpc_core::ExecCtx::Get()->Flush(); + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + EXPECT_EQ(PerformWaitingCall(channel, i % 2 == 0 ? server1 : server2, cq), + GRPC_STATUS_UNAVAILABLE); + expected_keepalive_time_sec *= 2; + } + gpr_log( + GPR_INFO, + "Client keepalive time %d should now be in sync with the server settings", + expected_keepalive_time_sec); + response_generator->SetResponse( + BuildResolverResult({absl::StrCat("ipv4:", server_address2)})); + grpc_core::ExecCtx::Get()->Flush(); + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + EXPECT_EQ(PerformWaitingCall(channel, server2, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + // shutdown and destroy the client and server + grpc_channel_destroy(channel); + ServerShutdownAndDestroy(server1, cq); + ServerShutdownAndDestroy(server2, cq); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_completion_queue_destroy(cq); +} + +// Tests that when a channel has multiple subchannels and receives a GOAWAY with +// "too_many_pings" on one of them, all subchannels start any new transports +// with an updated keepalive time. +TEST_F(KeepaliveThrottlingTest, + ExistingSubchannelsUseNewKeepaliveTimeWhenReconnecting) { + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + std::string server_address1 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + std::string server_address2 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + // create a single channel with round robin load balancing policy. + auto response_generator = + grpc_core::MakeRefCounted(); + grpc_arg client_args[] = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), + grpc_core::FakeResolverResponseGenerator::MakeChannelArg( + response_generator.get())}; + grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), + client_args}; + grpc_channel* channel = + grpc_insecure_channel_create("fake:///", &client_channel_args, nullptr); + response_generator->SetResponse( + BuildResolverResult({absl::StrCat("ipv4:", server_address1), + absl::StrCat("ipv4:", server_address2)})); + // For a single subchannel 3 GOAWAYs would be sufficient to increase the + // keepalive time from 1 second to beyond 5 seconds. Even though we are + // alternating between two subchannels, 3 GOAWAYs should still be enough since + // the channel should start all new transports with the new keepalive value + // (even those from a different subchannel). + int expected_keepalive_time_sec = 1; + for (int i = 0; i < 3; i++) { + gpr_log(GPR_ERROR, "Expected keepalive time : %d", + expected_keepalive_time_sec); + grpc_server* server = ServerStart( + i % 2 == 0 ? server_address1.c_str() : server_address2.c_str(), cq); + VerifyChannelReady(channel, cq); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE); + ServerShutdownAndDestroy(server, cq); + VerifyChannelDisconnected(channel, cq); + expected_keepalive_time_sec *= 2; + } + gpr_log( + GPR_INFO, + "Client keepalive time %d should now be in sync with the server settings", + expected_keepalive_time_sec); + grpc_server* server = ServerStart(server_address1.c_str(), cq); + VerifyChannelReady(channel, cq); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + ServerShutdownAndDestroy(server, cq); + // shutdown and destroy the client + grpc_channel_destroy(channel); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_completion_queue_destroy(cq); +} + } // namespace int main(int argc, char** argv) {