Merge branch 'transport-refs-3' into transport-refs-4

pull/36598/head
Craig Tiller 11 months ago
commit 9ae9771eec
  1. 7
      src/core/lib/transport/transport.h
  2. 12
      src/core/server/server.cc
  3. 6
      test/core/transport/chaotic_good/chaotic_good_server_test.cc

@ -559,6 +559,13 @@ class Transport : public InternallyRefCounted<Transport> {
PerformOp(op);
}
void DisconnectWithError(grpc_error_handle error) {
CHECK(!error.ok()) << error;
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = error;
PerformOp(op);
}
// implementation of grpc_transport_get_endpoint
virtual grpc_endpoint* GetEndpoint() = 0;
};

@ -755,6 +755,8 @@ class Server::TransportConnectivityWatcher
// Shut down channel.
MutexLock lock(&server_->mu_global_);
server_->connections_.erase(transport_.get());
--server_->connections_open_;
server_->MaybeFinishShutdown();
}
RefCountedPtr<ServerTransport> transport_;
@ -972,9 +974,13 @@ grpc_error_handle Server::SetupTransport(
}
// TODO(ctiller): add channelz node
t->SetCallDestination(std::move(*destination));
MutexLock lock(&mu_global_);
if (ShutdownCalled()) {
t->DisconnectWithError(GRPC_ERROR_CREATE("Server shutdown"));
}
t->StartConnectivityWatch(MakeOrphanable<TransportConnectivityWatcher>(
t->RefAsSubclass<ServerTransport>(), Ref()));
MutexLock lock(&mu_global_);
gpr_log(GPR_INFO, "Adding connection");
connections_.emplace(std::move(t));
++connections_open_;
} else {
@ -1090,7 +1096,7 @@ void Server::MaybeFinishShutdown() {
MutexLock lock(&mu_call_);
KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
}
if (!channels_.empty() || !connections_.empty() ||
if (!channels_.empty() || connections_open_ > 0 ||
listeners_destroyed_ < listeners_.size()) {
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
last_shutdown_message_time_),
@ -1100,7 +1106,7 @@ void Server::MaybeFinishShutdown() {
"Waiting for %" PRIuPTR " channels %" PRIuPTR
" connections and %" PRIuPTR "/%" PRIuPTR
" listeners to be destroyed before shutting down server",
channels_.size(), connections_.size(),
channels_.size(), connections_open_,
listeners_.size() - listeners_destroyed_, listeners_.size());
}
return;

@ -41,6 +41,7 @@
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/test_util/port.h"
#include "test/core/test_util/test_config.h"
#include "test/core/test_util/build.h"
namespace grpc_core {
namespace chaotic_good {
@ -67,7 +68,10 @@ class ChaoticGoodServerTest : public ::testing::Test {
auto ev = grpc_completion_queue_pluck(
shutdown_cq, nullptr, grpc_timeout_milliseconds_to_deadline(15000),
nullptr);
CHECK(ev.type == GRPC_OP_COMPLETE);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
AsanAssertNoLeaks();
}
CHECK_EQ(ev.type, GRPC_OP_COMPLETE);
CHECK_EQ(ev.tag, nullptr);
grpc_completion_queue_destroy(shutdown_cq);
grpc_server_destroy(server_);

Loading…
Cancel
Save