[Chttp2Server] Fix race between connection manager updates and handshake (#37772)

We were failing to shutdown the connection in the case where we want to send goaways on the connection because we've gotten an update from the `ConnectionManager`, but the handshake hasn't completed.

Verified with 1000 runs of xds_end2end_test with the filter `*XdsRbacTestWithActionPermutations*`
Link to sample test failure - https://btx.cloud.google.com/invocations/d0dbd55e-25f8-44c9-8d90-0eb1f7717d95/targets/%2F%2Ftest%2Fcpp%2Fend2end%2Fxds:xds_end2end_test@poller%3Depoll1;config=1a7dc092b28796b045d00aec96c95b85c1d4dc656912e0021a1fc84b3ecb2ac9/tests

Closes #37772

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37772 from yashykt:Chttp2ServerConnectionShutdownRace 84a3206eb1
PiperOrigin-RevId: 676874525
pull/36792/head
Yash Tibrewal 2 months ago committed by Copybara-Service
parent c1512de73d
commit c6c461bc0e
  1. 58
      src/core/ext/transport/chttp2/server/chttp2_server.cc

@ -183,24 +183,27 @@ class Chttp2ServerListener : public Server::ListenerInterface {
void Start(OrphanablePtr<grpc_endpoint> endpoint,
const ChannelArgs& args);
void ShutdownLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ActiveConnection::mu_);
// Needed to be able to grab an external ref in
// ActiveConnection::Start()
using InternallyRefCounted<HandshakingState>::Ref;
private:
void OnTimeout() ABSL_LOCKS_EXCLUDED(&connection_->mu_);
void OnTimeout() ABSL_LOCKS_EXCLUDED(&ActiveConnection::mu_);
static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
void OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result);
RefCountedPtr<ActiveConnection> const connection_;
grpc_pollset* const accepting_pollset_;
AcceptorPtr acceptor_;
RefCountedPtr<HandshakeManager> handshake_mgr_
ABSL_GUARDED_BY(&connection_->mu_);
ABSL_GUARDED_BY(&ActiveConnection::mu_);
// State for enforcing handshake timeout on receiving HTTP/2 settings.
Timestamp const deadline_;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&connection_->mu_);
grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
ABSL_GUARDED_BY(&ActiveConnection::mu_);
grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&ActiveConnection::mu_);
grpc_pollset_set* const interested_parties_;
};
@ -400,9 +403,7 @@ Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
{
MutexLock lock(&connection_->mu_);
if (handshake_mgr_ != nullptr) {
handshake_mgr_->Shutdown(GRPC_ERROR_CREATE("Listener stopped serving."));
}
ShutdownLocked(absl::UnavailableError("Listener stopped serving."));
}
Unref();
}
@ -422,6 +423,13 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
});
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::ShutdownLocked(
absl::Status status) {
if (handshake_mgr_ != nullptr) {
handshake_mgr_->Shutdown(std::move(status));
}
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout() {
grpc_chttp2_transport* transport = nullptr;
{
@ -584,20 +592,28 @@ void Chttp2ServerListener::ActiveConnection::SendGoAway() {
grpc_chttp2_transport* transport = nullptr;
{
MutexLock lock(&mu_);
if (transport_ != nullptr && !shutdown_) {
transport = transport_.get();
drain_grace_timer_handle_ = event_engine_->RunAfter(
std::max(Duration::Zero(),
listener_->args_
.GetDurationFromIntMillis(
GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS)
.value_or(Duration::Minutes(10))),
[self = Ref(DEBUG_LOCATION, "drain_grace_timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnDrainGraceTimeExpiry();
self.reset(DEBUG_LOCATION, "drain_grace_timer");
});
if (!shutdown_) {
// Send a GOAWAY if the transport exists
if (transport_ != nullptr) {
transport = transport_.get();
drain_grace_timer_handle_ = event_engine_->RunAfter(
std::max(Duration::Zero(),
listener_->args_
.GetDurationFromIntMillis(
GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS)
.value_or(Duration::Minutes(10))),
[self = Ref(DEBUG_LOCATION, "drain_grace_timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnDrainGraceTimeExpiry();
self.reset(DEBUG_LOCATION, "drain_grace_timer");
});
}
// Shutdown the handshaker if it's still in progress.
if (handshaking_state_ != nullptr) {
handshaking_state_->ShutdownLocked(
absl::UnavailableError("Connection going away"));
}
shutdown_ = true;
}
}

Loading…
Cancel
Save