From 522e0c4d45119849aff845772f58cc824d6cf48d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 May 2024 15:58:18 -0700 Subject: [PATCH] x --- src/core/lib/transport/transport.h | 7 +++++++ src/core/server/server.cc | 12 +++++++++--- .../chaotic_good/chaotic_good_server_test.cc | 6 +++++- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 5ea8acce2f8..ac2cc186cd8 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -559,6 +559,13 @@ class Transport : public InternallyRefCounted { PerformOp(op); } + void DisconnectWithError(grpc_error_handle error) { + CHECK(!error.ok()) << error; + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->disconnect_with_error = error; + PerformOp(op); + } + // implementation of grpc_transport_get_endpoint virtual grpc_endpoint* GetEndpoint() = 0; }; diff --git a/src/core/server/server.cc b/src/core/server/server.cc index cd0a15d3653..40c87eda174 100644 --- a/src/core/server/server.cc +++ b/src/core/server/server.cc @@ -755,6 +755,8 @@ class Server::TransportConnectivityWatcher // Shut down channel. MutexLock lock(&server_->mu_global_); server_->connections_.erase(transport_.get()); + --server_->connections_open_; + server_->MaybeFinishShutdown(); } RefCountedPtr transport_; @@ -972,9 +974,13 @@ grpc_error_handle Server::SetupTransport( } // TODO(ctiller): add channelz node t->SetCallDestination(std::move(*destination)); + MutexLock lock(&mu_global_); + if (ShutdownCalled()) { + t->DisconnectWithError(GRPC_ERROR_CREATE("Server shutdown")); + } t->StartConnectivityWatch(MakeOrphanable( t->RefAsSubclass(), Ref())); - MutexLock lock(&mu_global_); + gpr_log(GPR_INFO, "Adding connection"); connections_.emplace(std::move(t)); ++connections_open_; } else { @@ -1090,7 +1096,7 @@ void Server::MaybeFinishShutdown() { MutexLock lock(&mu_call_); KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown")); } - if (!channels_.empty() || !connections_.empty() || + if (!channels_.empty() || connections_open_ > 0 || listeners_destroyed_ < listeners_.size()) { if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_shutdown_message_time_), @@ -1100,7 +1106,7 @@ void Server::MaybeFinishShutdown() { "Waiting for %" PRIuPTR " channels %" PRIuPTR " connections and %" PRIuPTR "/%" PRIuPTR " listeners to be destroyed before shutting down server", - channels_.size(), connections_.size(), + channels_.size(), connections_open_, listeners_.size() - listeners_destroyed_, listeners_.size()); } return; diff --git a/test/core/transport/chaotic_good/chaotic_good_server_test.cc b/test/core/transport/chaotic_good/chaotic_good_server_test.cc index b5b058b43ee..c7ff33038cc 100644 --- a/test/core/transport/chaotic_good/chaotic_good_server_test.cc +++ b/test/core/transport/chaotic_good/chaotic_good_server_test.cc @@ -41,6 +41,7 @@ #include "test/core/event_engine/event_engine_test_utils.h" #include "test/core/test_util/port.h" #include "test/core/test_util/test_config.h" +#include "test/core/test_util/build.h" namespace grpc_core { namespace chaotic_good { @@ -67,7 +68,10 @@ class ChaoticGoodServerTest : public ::testing::Test { auto ev = grpc_completion_queue_pluck( shutdown_cq, nullptr, grpc_timeout_milliseconds_to_deadline(15000), nullptr); - CHECK(ev.type == GRPC_OP_COMPLETE); + if (ev.type == GRPC_QUEUE_TIMEOUT) { + AsanAssertNoLeaks(); + } + CHECK_EQ(ev.type, GRPC_OP_COMPLETE); CHECK_EQ(ev.tag, nullptr); grpc_completion_queue_destroy(shutdown_cq); grpc_server_destroy(server_);