pull/36509/head
Craig Tiller 10 months ago
parent ee3d8cb453
commit fdb5c3fca8
  1. 40
      src/core/lib/surface/server.cc
  2. 5
      src/core/lib/surface/server.h

@ -947,6 +947,9 @@ grpc_error_handle Server::SetupTransport(
return absl_status_to_grpc_error(destination.status());
}
t->SetCallDestination(std::move(*destination));
MutexLock lock(&mu_global_);
connections_.emplace(std::move(t));
++connections_open_;
} else {
GPR_ASSERT(transport->filter_stack_transport() != nullptr);
absl::StatusOr<OrphanablePtr<Channel>> channel = LegacyChannel::Create(
@ -975,7 +978,7 @@ grpc_error_handle Server::SetupTransport(
bool Server::HasOpenConnections() {
MutexLock lock(&mu_global_);
return !channels_.empty();
return !channels_.empty() || !connections_.empty();
}
void Server::SetRegisteredMethodAllocator(
@ -1055,16 +1058,18 @@ void Server::MaybeFinishShutdown() {
MutexLock lock(&mu_call_);
KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
}
if (!channels_.empty() || listeners_destroyed_ < listeners_.size()) {
if (!channels_.empty() || !connections_.empty() ||
listeners_destroyed_ < listeners_.size()) {
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
last_shutdown_message_time_),
gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
gpr_log(GPR_DEBUG,
"Waiting for %" PRIuPTR " channels and %" PRIuPTR "/%" PRIuPTR
"Waiting for %" PRIuPTR " channels %" PRIuPTR
" connections and %" PRIuPTR "/%" PRIuPTR
" listeners to be destroyed before shutting down server",
channels_.size(), listeners_.size() - listeners_destroyed_,
listeners_.size());
channels_.size(), connections_.size(),
listeners_.size() - listeners_destroyed_, listeners_.size());
}
return;
}
@ -1127,6 +1132,7 @@ void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
// -- Once there are no more calls in progress, the channel is closed.
void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
ChannelBroadcaster broadcaster;
absl::flat_hash_set<OrphanablePtr<ServerTransport>> removing_connections;
{
// Wait for startup to be finished. Locks mu_global.
MutexLock lock(&mu_global_);
@ -1146,6 +1152,7 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
}
last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
broadcaster.FillChannelsLocked(GetChannelsLocked());
removing_connections.swap(connections_);
// Collect all unregistered then registered calls.
{
MutexLock lock(&mu_call_);
@ -1312,6 +1319,29 @@ class Server::ChannelData::ConnectivityWatcher
const RefCountedPtr<Channel> channel_;
};
//
// Server::TransportConnectivityWatcher
//
class Server::TransportConnectivityWatcher
: public AsyncConnectivityStateWatcherInterface {
public:
explicit TransportConnectivityWatcher(ServerTransport* transport)
: transport_(transport) {}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& /*status*/) override {
// Don't do anything until we are being shut down.
if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
// Shut down channel.
MutexLock lock(&chand_->server_->mu_global_);
chand_->Destroy();
}
ServerTransport* transport_;
};
//
// Server::ChannelData
//

@ -379,6 +379,8 @@ class Server : public ServerInterface,
using is_transparent = void;
};
class TransportConnectivityWatcher;
RegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
const absl::string_view& path);
void SetRegisteredMethodOnMetadata(ClientMetadata& metadata);
@ -504,6 +506,9 @@ class Server : public ServerInterface,
absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_);
std::list<ChannelData*> channels_;
absl::flat_hash_set<OrphanablePtr<ServerTransport>> connections_
ABSL_GUARDED_BY(mu_global_);
size_t connections_open_ ABSL_GUARDED_BY(mu_global_) = 0;
std::list<Listener> listeners_;
size_t listeners_destroyed_ = 0;

Loading…
Cancel
Save