diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 97ca3e4c2c4..53545748586 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -445,6 +445,12 @@ typedef struct { gRPC authorization check. */ #define GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER \ "grpc.authorization_policy_provider" +/** EXPERIMENTAL. Updates to a server's configuration from a config fetcher (for + * example, listener updates from xDS) cause all older connections to be + * gracefully shut down (i.e., "drained") with a grace period configured by this + * channel arg. Int valued, milliseconds. Defaults to 10 minutes.*/ +#define GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS \ + "grpc.experimental.server_config_change_drain_grace_time_ms" /** \} */ /** Result of a grpc call. If the caller satisfies the prerequisites of a diff --git a/include/grpcpp/xds_server_builder.h b/include/grpcpp/xds_server_builder.h index b5663ee35e8..01ae0257454 100644 --- a/include/grpcpp/xds_server_builder.h +++ b/include/grpcpp/xds_server_builder.h @@ -46,6 +46,24 @@ class XdsServerServingStatusNotifierInterface { class XdsServerBuilder : public ::grpc::ServerBuilder { public: + // NOTE: class experimental_type is not part of the public API of this class + // TODO(yashykt): Integrate into public API when this is no longer + // experimental. + class experimental_type : public ::grpc::ServerBuilder::experimental_type { + public: + explicit experimental_type(XdsServerBuilder* builder) + : ServerBuilder::experimental_type(builder), builder_(builder) {} + + // EXPERIMENTAL: Sets the drain grace period in ms for older connections + // when updates to a Listener is received. + void set_drain_grace_time(int drain_grace_time_ms) { + builder_->drain_grace_time_ms_ = drain_grace_time_ms; + } + + private: + XdsServerBuilder* builder_; + }; + // It is the responsibility of the application to make sure that \a notifier // outlasts the life of the server. Notifications will start being made // asynchronously once `BuildAndStart()` has been called. Note that it is @@ -54,10 +72,19 @@ class XdsServerBuilder : public ::grpc::ServerBuilder { notifier_ = notifier; } + /// NOTE: The function experimental() is not stable public API. It is a view + /// to the experimental components of this class. It may be changed or removed + /// at any time. + experimental_type experimental() { return experimental_type(this); } + private: // Called at the beginning of BuildAndStart(). ChannelArguments BuildChannelArgs() override { ChannelArguments args = ServerBuilder::BuildChannelArgs(); + if (drain_grace_time_ms_ >= 0) { + args.SetInt(GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS, + drain_grace_time_ms_); + } grpc_channel_args c_channel_args = args.c_channel_args(); grpc_server_config_fetcher* fetcher = grpc_server_config_fetcher_xds_create( {OnServingStatusUpdate, notifier_}, &c_channel_args); @@ -76,6 +103,7 @@ class XdsServerBuilder : public ::grpc::ServerBuilder { } XdsServerServingStatusNotifierInterface* notifier_ = nullptr; + int drain_grace_time_ms_ = -1; }; namespace experimental { diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 01347000c93..1badf18ec2b 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -160,6 +160,7 @@ class Chttp2ServerListener : public Server::ListenerInterface { private: static void OnClose(void* arg, grpc_error_handle error); + static void OnDrainGraceTimeExpiry(void* arg, grpc_error_handle error); RefCountedPtr listener_; Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_); @@ -170,6 +171,9 @@ class Chttp2ServerListener : public Server::ListenerInterface { // created. grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr; grpc_closure on_close_; + grpc_timer drain_grace_timer_; + grpc_closure on_drain_grace_time_expiry_; + bool drain_grace_timer_expiry_callback_pending_ = false; bool shutdown_ ABSL_GUARDED_BY(&mu_) = false; }; @@ -221,10 +225,9 @@ class Chttp2ServerListener : public Server::ListenerInterface { Chttp2ServerArgsModifier const args_modifier_; ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; grpc_channel_args* args_; - Mutex connection_manager_mu_; - RefCountedPtr - connection_manager_ ABSL_GUARDED_BY(connection_manager_mu_); Mutex mu_; + RefCountedPtr + connection_manager_ ABSL_GUARDED_BY(mu_); // Signals whether grpc_tcp_server_start() has been called. bool started_ ABSL_GUARDED_BY(mu_) = false; // Signals whether grpc_tcp_server_start() has completed. @@ -250,13 +253,31 @@ void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager( connection_manager) { RefCountedPtr connection_manager_to_destroy; + class GracefulShutdownExistingConnections { + public: + ~GracefulShutdownExistingConnections() { + // Send GOAWAYs on the transports so that they get disconnected when + // existing RPCs finish, and so that no new RPC is started on them. + for (auto& connection : connections_) { + connection.first->SendGoAway(); + } + } + + void set_connections( + std::map> + connections) { + GPR_ASSERT(connections_.empty()); + connections_ = std::move(connections); + } + + private: + std::map> connections_; + } connections_to_shutdown; { - MutexLock lock(&listener_->connection_manager_mu_); + MutexLock lock(&listener_->mu_); connection_manager_to_destroy = listener_->connection_manager_; listener_->connection_manager_ = std::move(connection_manager); - } - { - MutexLock lock(&listener_->mu_); + connections_to_shutdown.set_connections(std::move(listener_->connections_)); if (listener_->shutdown_) { return; } @@ -532,6 +553,17 @@ void Chttp2ServerListener::ActiveConnection::SendGoAway() { op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Server is stopping to serve requests."); grpc_transport_perform_op(&transport->base, op); + Ref().release(); // Ref held by OnDrainGraceTimeExpiry + GRPC_CLOSURE_INIT(&on_drain_grace_time_expiry_, OnDrainGraceTimeExpiry, + this, nullptr); + grpc_timer_init(&drain_grace_timer_, + ExecCtx::Get()->Now() + + grpc_channel_args_find_integer( + listener_->args_, + GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS, + {10 * 60 * GPR_MS_PER_SEC, 0, INT_MAX}), + &on_drain_grace_time_expiry_); + drain_grace_timer_expiry_callback_pending_ = true; } } @@ -566,6 +598,29 @@ void Chttp2ServerListener::ActiveConnection::OnClose( self->listener_->connections_.erase(it); } } + // Cancel the drain_grace_timer_ if needed. + if (self->drain_grace_timer_expiry_callback_pending_) { + grpc_timer_cancel(&self->drain_grace_timer_); + } + } + self->Unref(); +} + +void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry( + void* arg, grpc_error_handle error) { + ActiveConnection* self = static_cast(arg); + // If the drain_grace_timer_ was not cancelled, disconnect the transport + // immediately. + if (error == GRPC_ERROR_NONE) { + grpc_chttp2_transport* transport = nullptr; + { + MutexLock lock(&self->mu_); + transport = self->transport_; + } + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Drain grace time expired. Closing connection immediately."); + grpc_transport_perform_op(&transport->base, op); } self->Unref(); } @@ -700,7 +755,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, RefCountedPtr connection_manager; { - MutexLock lock(&self->connection_manager_mu_); + MutexLock lock(&self->mu_); connection_manager = self->connection_manager_; } auto endpoint_cleanup = [&](grpc_error_handle error) { @@ -751,8 +806,10 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, RefCountedPtr listener_ref; { MutexLock lock(&self->mu_); - // Shutdown the the connection if listener's stopped serving. - if (!self->shutdown_ && self->is_serving_) { + // Shutdown the the connection if listener's stopped serving or if the + // connection manager has changed. + if (!self->shutdown_ && self->is_serving_ && + connection_manager == self->connection_manager_) { // This ref needs to be taken in the critical region after having made // sure that the listener has not been Orphaned, so as to avoid // heap-use-after-free issues where `Ref()` is invoked when the ref of diff --git a/src/core/ext/xds/xds_server_config_fetcher.cc b/src/core/ext/xds/xds_server_config_fetcher.cc index 315efa0ff54..a67377ac02a 100644 --- a/src/core/ext/xds/xds_server_config_fetcher.cc +++ b/src/core/ext/xds/xds_server_config_fetcher.cc @@ -68,8 +68,8 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { private: class ListenerWatcher; - RefCountedPtr xds_client_; - grpc_server_xds_status_notifier serving_status_notifier_; + const RefCountedPtr xds_client_; + const grpc_server_xds_status_notifier serving_status_notifier_; Mutex mu_; std::map listener_watchers_ ABSL_GUARDED_BY(mu_); @@ -1235,7 +1235,10 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create( args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() .PreconditionChannelArgs(args); - GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ()); + GRPC_API_TRACE( + "grpc_server_config_fetcher_xds_create(notifier={on_serving_status_" + "update=%p, user_data=%p}, args=%p)", + 3, (notifier.on_serving_status_update, notifier.user_data, args)); grpc_error_handle error = GRPC_ERROR_NONE; grpc_core::RefCountedPtr xds_client = grpc_core::XdsClient::GetOrCreate(args, &error); diff --git a/test/cpp/end2end/xds/xds_end2end_test.cc b/test/cpp/end2end/xds/xds_end2end_test.cc index 3f968c75dea..ac934babbe4 100644 --- a/test/cpp/end2end/xds/xds_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_end2end_test.cc @@ -1617,6 +1617,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam { absl::make_unique(test_obj_)); } builder.set_status_notifier(¬ifier_); + builder.experimental().set_drain_grace_time( + test_obj_->xds_drain_grace_time_ms_); builder.AddListeningPort(server_address.str(), Credentials()); RegisterAllServices(&builder); server_ = builder.BuildAndStart(); @@ -1929,6 +1931,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { RouteConfiguration default_server_route_config_; Cluster default_cluster_; bool use_xds_enabled_server_; + int xds_drain_grace_time_ms_ = 10 * 60 * 1000; // 10 mins bool bootstrap_contents_from_env_var_; }; @@ -9277,6 +9280,60 @@ TEST_P(XdsEnabledServerStatusNotificationTest, ExistingRpcsOnResourceDeletion) { } } +TEST_P(XdsEnabledServerStatusNotificationTest, + ExistingRpcsFailOnResourceUpdateAfterDrainGraceTimeExpires) { + constexpr int kDrainGraceTimeMs = 100; + xds_drain_grace_time_ms_ = kDrainGraceTimeMs; + FakeCertificateProvider::CertDataMap fake1_cert_map = { + {"", {root_cert_, identity_pair_}}}; + g_fake1_cert_data_map = &fake1_cert_map; + // Send a valid LDS update to get the server to start listening + SetValidLdsUpdate(); + backends_[0]->Start(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port()), + grpc::StatusCode::OK); + constexpr int kNumChannels = 10; + struct StreamingRpc { + std::shared_ptr channel; + std::unique_ptr stub; + ClientContext context; + std::unique_ptr> stream; + } streaming_rpcs[kNumChannels]; + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + for (int i = 0; i < kNumChannels; i++) { + streaming_rpcs[i].channel = CreateInsecureChannel(); + streaming_rpcs[i].stub = + grpc::testing::EchoTestService::NewStub(streaming_rpcs[i].channel); + streaming_rpcs[i].context.set_wait_for_ready(true); + streaming_rpcs[i].stream = + streaming_rpcs[i].stub->BidiStream(&streaming_rpcs[i].context); + EXPECT_TRUE(streaming_rpcs[i].stream->Write(request)); + streaming_rpcs[i].stream->Read(&response); + EXPECT_EQ(request.message(), response.message()); + } + grpc_millis update_time = NowFromCycleCounter(); + // Update the resource. + SetLdsUpdate("", "", "fake_plugin1", "", false); + // Wait for the updated resource to take effect. + SendRpc([this]() { return CreateTlsChannel(); }, + server_authenticated_identity_, {}); + // After the drain grace time expires, the existing RPCs should all fail. + for (int i = 0; i < kNumChannels; i++) { + // Wait for the drain grace time to expire + EXPECT_FALSE(streaming_rpcs[i].stream->Read(&response)); + // Make sure that the drain grace interval is honored. + EXPECT_GE(NowFromCycleCounter() - update_time, kDrainGraceTimeMs); + auto status = streaming_rpcs[i].stream->Finish(); + EXPECT_EQ(status.error_code(), grpc::StatusCode::UNAVAILABLE) + << status.error_code() << ", " << status.error_message() << ", " + << status.error_details() << ", " + << streaming_rpcs[i].context.debug_error_string(); + } +} + using XdsServerFilterChainMatchTest = XdsServerSecurityTest; TEST_P(XdsServerFilterChainMatchTest,