From cf0b46c4d678793d63ab7b6be511c539da93deb0 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 5 Aug 2020 07:37:54 -0700 Subject: [PATCH] Reviewer comments --- .../filters/client_channel/client_channel.cc | 2 +- .../ext/filters/client_channel/subchannel.cc | 1 - .../ext/filters/client_channel/subchannel.h | 4 +- .../chttp2/transport/chttp2_transport.cc | 2 +- src/core/lib/transport/transport.h | 2 +- test/core/end2end/cq_verifier.cc | 4 +- test/core/end2end/cq_verifier.h | 8 +- .../transport/chttp2/too_many_pings_test.cc | 196 +++++++++++++++--- 8 files changed, 171 insertions(+), 48 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index db5358519fc..fd201ee1825 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1108,7 +1108,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { } ConnectivityStateChange state_change = PopConnectivityStateChange(); absl::optional keepalive_throttling = - state_change.status.GetPayload(grpc_core::keepalive_throttling_key); + state_change.status.GetPayload(grpc_core::kKeepaliveThrottlingKey); if (keepalive_throttling.has_value()) { int new_keepalive_time = -1; if (absl::SimpleAtoi(std::string(keepalive_throttling.value()), diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 9bae661c7e5..c872e3ed7e4 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -26,7 +26,6 @@ #include #include -#include "absl/strings/numbers.h" #include "absl/strings/str_format.h" #include diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 53a3924da99..f220db2fa27 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -228,7 +228,9 @@ class Subchannel { static Subchannel* Create(OrphanablePtr connector, const grpc_channel_args* args); - // Throttle keepalive time + // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time + // is larger than the subchannel's current keepalive time. The updated value + // would have an affect when the subchannel creates a new ConnectedSubchannel. void ThrottleKeepaliveTime(int new_keepalive_time); // Strong and weak refcounting. diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 2cfc1cb6959..4cb4afa8a0e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1103,7 +1103,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, ? GRPC_MILLIS_INF_FUTURE : static_cast(current_keepalive_time_ms * KEEPALIVE_TIME_BACKOFF_MULTIPLIER); - status.SetPayload(grpc_core::keepalive_throttling_key, + status.SetPayload(grpc_core::kKeepaliveThrottlingKey, absl::Cord(std::to_string(t->keepalive_time))); } // lie: use transient failure from the transport to indicate goaway has been diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 46794d0e869..cbb23b6cf0d 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -462,7 +462,7 @@ grpc_transport_stream_op_batch* grpc_make_transport_stream_op( namespace grpc_core { // This is the key to be used for loading/storing keepalive_throttling in the // absl::Status object. -constexpr const char* keepalive_throttling_key = +constexpr const char* kKeepaliveThrottlingKey = "grpc.internal.keepalive_throttling"; } // namespace grpc_core diff --git a/test/core/end2end/cq_verifier.cc b/test/core/end2end/cq_verifier.cc index e73a587dc67..959315d6e69 100644 --- a/test/core/end2end/cq_verifier.cc +++ b/test/core/end2end/cq_verifier.cc @@ -237,7 +237,7 @@ static void verify_matches(expectation* e, grpc_event* ev) { } } -void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec) { +void cq_verify(cq_verifier* v, int timeout_sec) { const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(timeout_sec); while (v->first_expectation != nullptr) { grpc_event ev = grpc_completion_queue_next(v->cq, deadline, nullptr); @@ -266,8 +266,6 @@ void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec) { } } -void cq_verify(cq_verifier* v) { cq_verify_custom_timeout(v, 10); } - void cq_verify_empty_timeout(cq_verifier* v, int timeout_sec) { gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h index 67382f7fd4d..fd0a7380f91 100644 --- a/test/core/end2end/cq_verifier.h +++ b/test/core/end2end/cq_verifier.h @@ -34,12 +34,8 @@ cq_verifier* cq_verifier_create(grpc_completion_queue* cq); void cq_verifier_destroy(cq_verifier* v); /* ensure all expected events (and only those events) are present on the - bound completion queue */ -void cq_verify(cq_verifier* v); - -/* ensure all expected events (and only those events) are present on the - bound completion queue withing \a timeout_sec */ -void cq_verify_custom_timeout(cq_verifier* v, int timeout_sec); + bound completion queue within \a timeout_sec */ +void cq_verify(cq_verifier* v, int timeout_sec = 10); /* ensure that the completion queue is empty */ void cq_verify_empty(cq_verifier* v); diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index 9b774c50689..9b06ef41cb3 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -25,6 +25,8 @@ #include #include +#include "absl/strings/str_cat.h" + #include #include #include @@ -37,6 +39,8 @@ #include #include +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/thd.h" @@ -176,8 +180,7 @@ TEST(TooManyPings, TestLotsOfServerCancelledRpcsDoesntGiveTooManyPings) { // and server continue reading so that gRPC can send and receive keepalive // pings. grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, - grpc_completion_queue* cq, - int expected_keepalive_time) { + grpc_completion_queue* cq) { grpc_call* c; grpc_call* s; cq_verifier* cqv = cq_verifier_create(cq); @@ -189,7 +192,7 @@ grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, grpc_status_code status; grpc_call_error error; grpc_slice details; - gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30); + gpr_timespec deadline = grpc_timeout_seconds_to_deadline(15); // Start a call c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, grpc_slice_from_static_string("/foo"), nullptr, @@ -229,9 +232,8 @@ grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, // won't be a bad ping, and the call will end due to the deadline expiring // instead. CQ_EXPECT_COMPLETION(cqv, tag(1), 1); - cq_verify_custom_timeout(cqv, 60); // The call will end after this - cq_verify(cqv); + cq_verify(cqv, 60); // cleanup grpc_slice_unref(details); grpc_metadata_array_destroy(&trailing_metadata_recv); @@ -242,54 +244,68 @@ grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server, cq_verifier_destroy(cqv); return status; } -TEST(TooManyPings, KeepaliveThrottling) { + +TEST(TooManyPings, KeepaliveThrottlingMultipleChannels) { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); - // create the server with a ping interval of 10 seconds and a single ping + // create the server with a ping interval of 5 seconds and a single ping // strike. - grpc_arg server_args[2]; - server_args[0] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), - 10 * 1000); - server_args[1] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1); + grpc_arg server_args[] = { + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), + 5 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), server_args}; grpc_server* server = grpc_server_create(&server_channel_args, nullptr); std::string server_address = - grpc_core::JoinHostPort("localhost", grpc_pick_unused_port_or_die()); + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); grpc_server_register_completion_queue(server, cq, nullptr); GPR_ASSERT( grpc_server_add_insecure_http2_port(server, server_address.c_str())); grpc_server_start(server); - // create the channel with a keepalive ping interval of 1 second. - grpc_arg client_args[4]; - client_args[0] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0); - client_args[1] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), - 0); - client_args[2] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000); - client_args[3] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0); + // create two channel with a keepalive ping interval of 1 second. + grpc_arg client_args[]{ + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0)}; grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), client_args}; grpc_channel* channel = grpc_insecure_channel_create( server_address.c_str(), &client_channel_args, nullptr); + grpc_channel* channel_dup = grpc_insecure_channel_create( + server_address.c_str(), &client_channel_args, nullptr); int expected_keepalive_time_sec = 1; - // We need 4 GOAWAY frames to throttle the keepalive time from 1 second to 16 - // seconds (> 10sec). - for (int i = 0; i < 4; i++) { - EXPECT_EQ( - PerformWaitingCall(channel, server, cq, expected_keepalive_time_sec), - GRPC_STATUS_UNAVAILABLE); + // We need 3 GOAWAY frames to throttle the keepalive time from 1 second to 8 + // seconds (> 5sec). + for (int i = 0; i < 3; i++) { + gpr_log(GPR_INFO, "Expected keepalive time : %d", + expected_keepalive_time_sec); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE); expected_keepalive_time_sec *= 2; } - EXPECT_EQ( - PerformWaitingCall(channel, server, cq, expected_keepalive_time_sec), - GRPC_STATUS_DEADLINE_EXCEEDED); + gpr_log( + GPR_INFO, + "Client keepalive time %d should now be in sync with the server settings", + expected_keepalive_time_sec); + EXPECT_EQ(PerformWaitingCall(channel, server, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + // Since the subchannel is shared, the second channel should also have + // keepalive settings in sync with the server. + gpr_log(GPR_INFO, "Now testing second channel sharing the same subchannel"); + EXPECT_EQ(PerformWaitingCall(channel_dup, server, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); // shutdown and destroy the client and server grpc_channel_destroy(channel); + grpc_channel_destroy(channel_dup); grpc_server_shutdown_and_notify(server, cq, nullptr); grpc_completion_queue_shutdown(cq); while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), @@ -300,6 +316,118 @@ TEST(TooManyPings, KeepaliveThrottling) { grpc_completion_queue_destroy(cq); } +grpc_core::Resolver::Result BuildResolverResult( + const std::vector& addresses) { + grpc_core::Resolver::Result result; + for (const auto& address_str : addresses) { + grpc_uri* uri = grpc_uri_parse(address_str.c_str(), true); + if (uri == nullptr) { + gpr_log(GPR_ERROR, "Failed to parse uri:%s", address_str.c_str()); + GPR_ASSERT(0); + } + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(uri, &address)); + result.addresses.emplace_back(address.addr, address.len, nullptr); + grpc_uri_destroy(uri); + } + return result; +} + +TEST(TooManyPings, KeepaliveThrottlingMultipleSubchannels) { + grpc_core::ExecCtx exec_ctx; + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + // create two servers with a ping interval of 5 seconds and a single ping + // strike. + grpc_arg server_args[] = { + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS), + 5 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)}; + grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), + server_args}; + grpc_server* server1 = grpc_server_create(&server_channel_args, nullptr); + std::string server_address1 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + grpc_server_register_completion_queue(server1, cq, nullptr); + GPR_ASSERT( + grpc_server_add_insecure_http2_port(server1, server_address1.c_str())); + grpc_server_start(server1); + grpc_server* server2 = grpc_server_create(&server_channel_args, nullptr); + std::string server_address2 = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + grpc_server_register_completion_queue(server2, cq, nullptr); + GPR_ASSERT( + grpc_server_add_insecure_http2_port(server2, server_address2.c_str())); + grpc_server_start(server2); + // create a single channel with multiple subchannels with a keepalive ping + // interval of 1 second. To get finer control on subchannel connection times, + // we are using pick_first instead of round_robin and using the fake resolver + // response generator to switch between the two. + auto response_generator = + grpc_core::MakeRefCounted(); + grpc_arg client_args[] = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), + grpc_channel_arg_integer_create( + const_cast( + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS), + 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), + grpc_core::FakeResolverResponseGenerator::MakeChannelArg( + response_generator.get())}; + grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), + client_args}; + grpc_channel* channel = + grpc_insecure_channel_create("fake:///", &client_channel_args, nullptr); + // For a single subchannel 3 GOAWAYs would be sufficient to increase the + // keepalive time from 1 second to beyond 5 seconds. Even though we are + // alternating between two subchannels, 3 GOAWAYs should still be enough since + // the channel should start all new transports with the new keepalive value + // (even those from a different subchannel). + int expected_keepalive_time_sec = 1; + for (int i = 0; i < 3; i++) { + gpr_log(GPR_INFO, "Expected keepalive time : %d", + expected_keepalive_time_sec); + response_generator->SetResponse(BuildResolverResult({absl::StrCat( + "ipv4:", i % 2 == 0 ? server_address1 : server_address2)})); + // ExecCtx::Flush() might not be enough to make sure that the resolver + // result has been propagated, so sleep for a bit. + grpc_core::ExecCtx::Get()->Flush(); + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + EXPECT_EQ(PerformWaitingCall(channel, i % 2 == 0 ? server1 : server2, cq), + GRPC_STATUS_UNAVAILABLE); + } + gpr_log( + GPR_INFO, + "Client keepalive time %d should now be in sync with the server settings", + expected_keepalive_time_sec); + response_generator->SetResponse( + BuildResolverResult({absl::StrCat("ipv4:", server_address2)})); + grpc_core::ExecCtx::Get()->Flush(); + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + EXPECT_EQ(PerformWaitingCall(channel, server2, cq), + GRPC_STATUS_DEADLINE_EXCEEDED); + // shutdown and destroy the client and server + grpc_channel_destroy(channel); + grpc_server_shutdown_and_notify(server1, cq, nullptr); + grpc_server_shutdown_and_notify(server2, cq, nullptr); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_server_destroy(server1); + grpc_server_destroy(server2); + grpc_completion_queue_destroy(cq); +} + } // namespace int main(int argc, char** argv) {