From 1618d2179646a6379dc190d439816fc8b31e3718 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 10 Aug 2020 14:05:27 -0700 Subject: [PATCH] Add roundrobin test and reviewer comments --- .../health/health_check_client.cc | 3 +- .../ext/filters/client_channel/subchannel.h | 2 +- .../transport/chttp2/too_many_pings_test.cc | 191 +++++++++++++----- 3 files changed, 148 insertions(+), 48 deletions(-) diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 5d65293af29..60a2ebb4bfa 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -91,11 +91,12 @@ void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state, gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this, ConnectivityStateName(state), reason); } - if (watcher_ != nullptr) + if (watcher_ != nullptr) { watcher_->Notify(state, state == GRPC_CHANNEL_TRANSIENT_FAILURE ? absl::Status(absl::StatusCode::kUnavailable, reason) : absl::Status()); + } } void HealthCheckClient::Orphan() { diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index f220db2fa27..f3de5c80747 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -230,7 +230,7 @@ class Subchannel { // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time // is larger than the subchannel's current keepalive time. The updated value - // would have an affect when the subchannel creates a new ConnectedSubchannel. + // will have an affect when the subchannel creates a new ConnectedSubchannel. void ThrottleKeepaliveTime(int new_keepalive_time); // Strong and weak refcounting. diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index 9b06ef41cb3..c53db6ef229 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -245,28 +245,76 @@ grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, return status; } -TEST(TooManyPings, KeepaliveThrottlingMultipleChannels) { +class KeepaliveThrottlingTest : public ::testing::Test { + protected: + // Starts the server and makes sure that the channel is able to get connected. + grpc_server* ServerStart(const char* addr, grpc_completion_queue* cq) { + // Set up server channel args to expect pings at an interval of 5 seconds + // and use a single ping strike + grpc_arg server_args[] = { + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), + 5 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; + grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), + server_args}; + // Create server + grpc_server* server = grpc_server_create(&server_channel_args, nullptr); + grpc_server_register_completion_queue(server, cq, nullptr); + GPR_ASSERT(grpc_server_add_insecure_http2_port(server, addr)); + grpc_server_start(server); + return server; + } + + // Shuts down and destroys the server. Also, makes sure that the channel + // receives the disconnection event. + void ServerShutdownAndDestroy(grpc_server* server, + grpc_completion_queue* cq) { + // Shutdown and destroy server + grpc_server_shutdown_and_notify(server, cq, (void*)(1000)); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .tag != (void*)(1000)) + ; + grpc_server_destroy(server); + } + + void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) { + grpc_connectivity_state state = + grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */); + while (state != GRPC_CHANNEL_READY) { + grpc_channel_watch_connectivity_state( + channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr); + grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), + nullptr); + state = grpc_channel_check_connectivity_state(channel, 0); + } + } + + void VerifyChannelDisconnected(grpc_channel* channel, + grpc_completion_queue* cq) { + // Verify channel gets disconnected. Use a ping to make sure that clients + // tries sending/receiving bytes if the channel is connected. + grpc_channel_ping(channel, cq, (void*)(2000), nullptr); + grpc_event ev = grpc_completion_queue_next( + cq, grpc_timeout_seconds_to_deadline(5), nullptr); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + GPR_ASSERT(ev.tag == (void*)(2000)); + GPR_ASSERT(ev.success == 0); + GPR_ASSERT(grpc_channel_check_connectivity_state(channel, 0) != + GRPC_CHANNEL_READY); + } +}; + +TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleChannels) { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); - // create the server with a ping interval of 5 seconds and a single ping - // strike. - grpc_arg server_args[] = { - grpc_channel_arg_integer_create( - const_cast( - GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), - 5 * 1000), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; - grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), - server_args}; - grpc_server* server = grpc_server_create(&server_channel_args, nullptr); std::string server_address = grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); - grpc_server_register_completion_queue(server, cq, nullptr); - GPR_ASSERT( - grpc_server_add_insecure_http2_port(server, server_address.c_str())); - grpc_server_start(server); + grpc_server* server = ServerStart(server_address.c_str(), cq); // create two channel with a keepalive ping interval of 1 second. - grpc_arg client_args[]{ + grpc_arg client_args[] = { grpc_channel_arg_integer_create( const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), grpc_channel_arg_integer_create( @@ -306,13 +354,12 @@ TEST(TooManyPings, KeepaliveThrottlingMultipleChannels) { // shutdown and destroy the client and server grpc_channel_destroy(channel); grpc_channel_destroy(channel_dup); - grpc_server_shutdown_and_notify(server, cq, nullptr); + ServerShutdownAndDestroy(server, cq); grpc_completion_queue_shutdown(cq); while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr) .type != GRPC_QUEUE_SHUTDOWN) ; - grpc_server_destroy(server); grpc_completion_queue_destroy(cq); } @@ -333,34 +380,17 @@ grpc_core::Resolver::Result BuildResolverResult( return result; } -TEST(TooManyPings, KeepaliveThrottlingMultipleSubchannels) { +// Tests that when new subchannels are created due to a change in resolved +// addresses, the new subchannels use the updated keepalive time. +TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleSubchannels1) { grpc_core::ExecCtx exec_ctx; grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); - // create two servers with a ping interval of 5 seconds and a single ping - // strike. - grpc_arg server_args[] = { - grpc_channel_arg_integer_create( - const_cast( - GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), - 5 * 1000), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; - grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), - server_args}; - grpc_server* server1 = grpc_server_create(&server_channel_args, nullptr); std::string server_address1 = grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); - grpc_server_register_completion_queue(server1, cq, nullptr); - GPR_ASSERT( - grpc_server_add_insecure_http2_port(server1, server_address1.c_str())); - grpc_server_start(server1); - grpc_server* server2 = grpc_server_create(&server_channel_args, nullptr); std::string server_address2 = grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); - grpc_server_register_completion_queue(server2, cq, nullptr); - GPR_ASSERT( - grpc_server_add_insecure_http2_port(server2, server_address2.c_str())); - grpc_server_start(server2); + grpc_server* server1 = ServerStart(server_address1.c_str(), cq); + grpc_server* server2 = ServerStart(server_address2.c_str(), cq); // create a single channel with multiple subchannels with a keepalive ping // interval of 1 second. To get finer control on subchannel connection times, // we are using pick_first instead of round_robin and using the fake resolver @@ -403,6 +433,7 @@ TEST(TooManyPings, KeepaliveThrottlingMultipleSubchannels) { gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); EXPECT_EQ(PerformWaitingCall(channel, i % 2 == 0 ? server1 : server2, cq), GRPC_STATUS_UNAVAILABLE); + expected_keepalive_time_sec *= 2; } gpr_log( GPR_INFO, @@ -416,15 +447,83 @@ TEST(TooManyPings, KeepaliveThrottlingMultipleSubchannels) { GRPC_STATUS_DEADLINE_EXCEEDED); // shutdown and destroy the client and server grpc_channel_destroy(channel); - grpc_server_shutdown_and_notify(server1, cq, nullptr); - grpc_server_shutdown_and_notify(server2, cq, nullptr); + ServerShutdownAndDestroy(server1, cq); + ServerShutdownAndDestroy(server2, cq); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_completion_queue_destroy(cq); +} + +// Tests that when a channel has multiple subchannels and receives a GOAWAY with +// "too_many_pings" on one of them, all subchannels start any new transports +// with an updated keepalive time. +TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleSubchannels2) { + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + std::string server_address1 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + std::string server_address2 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + // create a single channel with round robin load balancing policy. + auto response_generator = + grpc_core::MakeRefCounted(); + grpc_arg client_args[] = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), + grpc_core::FakeResolverResponseGenerator::MakeChannelArg( + response_generator.get())}; + grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), + client_args}; + grpc_channel* channel = + grpc_insecure_channel_create("fake:///", &client_channel_args, nullptr); + response_generator->SetResponse( + BuildResolverResult({absl::StrCat("ipv4:", server_address1), + absl::StrCat("ipv4:", server_address2)})); + // For a single subchannel 3 GOAWAYs would be sufficient to increase the + // keepalive time from 1 second to beyond 5 seconds. Even though we are + // alternating between two subchannels, 3 GOAWAYs should still be enough since + // the channel should start all new transports with the new keepalive value + // (even those from a different subchannel). + int expected_keepalive_time_sec = 1; + for (int i = 0; i < 3; i++) { + gpr_log(GPR_ERROR, "Expected keepalive time : %d", + expected_keepalive_time_sec); + grpc_server* server = ServerStart( + i % 2 == 0 ? server_address1.c_str() : server_address2.c_str(), cq); + VerifyChannelReady(channel, cq); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE); + ServerShutdownAndDestroy(server, cq); + VerifyChannelDisconnected(channel, cq); + expected_keepalive_time_sec *= 2; + } + gpr_log( + GPR_INFO, + "Client keepalive time %d should now be in sync with the server settings", + expected_keepalive_time_sec); + grpc_server* server = ServerStart(server_address1.c_str(), cq); + VerifyChannelReady(channel, cq); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + ServerShutdownAndDestroy(server, cq); + // shutdown and destroy the client + grpc_channel_destroy(channel); grpc_completion_queue_shutdown(cq); while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr) .type != GRPC_QUEUE_SHUTDOWN) ; - grpc_server_destroy(server1); - grpc_server_destroy(server2); grpc_completion_queue_destroy(cq); }