xDS: Add graceful shutdown for old connections on listener resource update (#28154)

* xDS: Add graceful shutdown for old connections on listener resource update

* Add TODOs for review

* Reviewer comments

* Reviewer comments

* Fix merge

* Fix comment

* s/GRPC_ARG_DRAIN_GRACE_TIME_MS/GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS
pull/28268/head
Yash Tibrewal 3 years ago committed by GitHub
parent 260145c517
commit 3689072979
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      include/grpc/impl/codegen/grpc_types.h
  2. 28
      include/grpcpp/xds_server_builder.h
  3. 77
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  4. 9
      src/core/ext/xds/xds_server_config_fetcher.cc
  5. 57
      test/cpp/end2end/xds/xds_end2end_test.cc

@ -445,6 +445,12 @@ typedef struct {
gRPC authorization check. */
#define GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER \
"grpc.authorization_policy_provider"
/** EXPERIMENTAL. Updates to a server's configuration from a config fetcher (for
* example, listener updates from xDS) cause all older connections to be
* gracefully shut down (i.e., "drained") with a grace period configured by this
* channel arg. Int valued, milliseconds. Defaults to 10 minutes.*/
#define GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS \
"grpc.experimental.server_config_change_drain_grace_time_ms"
/** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a

@ -46,6 +46,24 @@ class XdsServerServingStatusNotifierInterface {
class XdsServerBuilder : public ::grpc::ServerBuilder {
public:
// NOTE: class experimental_type is not part of the public API of this class
// TODO(yashykt): Integrate into public API when this is no longer
// experimental.
class experimental_type : public ::grpc::ServerBuilder::experimental_type {
public:
explicit experimental_type(XdsServerBuilder* builder)
: ServerBuilder::experimental_type(builder), builder_(builder) {}
// EXPERIMENTAL: Sets the drain grace period in ms for older connections
// when updates to a Listener is received.
void set_drain_grace_time(int drain_grace_time_ms) {
builder_->drain_grace_time_ms_ = drain_grace_time_ms;
}
private:
XdsServerBuilder* builder_;
};
// It is the responsibility of the application to make sure that \a notifier
// outlasts the life of the server. Notifications will start being made
// asynchronously once `BuildAndStart()` has been called. Note that it is
@ -54,10 +72,19 @@ class XdsServerBuilder : public ::grpc::ServerBuilder {
notifier_ = notifier;
}
/// NOTE: The function experimental() is not stable public API. It is a view
/// to the experimental components of this class. It may be changed or removed
/// at any time.
experimental_type experimental() { return experimental_type(this); }
private:
// Called at the beginning of BuildAndStart().
ChannelArguments BuildChannelArgs() override {
ChannelArguments args = ServerBuilder::BuildChannelArgs();
if (drain_grace_time_ms_ >= 0) {
args.SetInt(GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS,
drain_grace_time_ms_);
}
grpc_channel_args c_channel_args = args.c_channel_args();
grpc_server_config_fetcher* fetcher = grpc_server_config_fetcher_xds_create(
{OnServingStatusUpdate, notifier_}, &c_channel_args);
@ -76,6 +103,7 @@ class XdsServerBuilder : public ::grpc::ServerBuilder {
}
XdsServerServingStatusNotifierInterface* notifier_ = nullptr;
int drain_grace_time_ms_ = -1;
};
namespace experimental {

@ -160,6 +160,7 @@ class Chttp2ServerListener : public Server::ListenerInterface {
private:
static void OnClose(void* arg, grpc_error_handle error);
static void OnDrainGraceTimeExpiry(void* arg, grpc_error_handle error);
RefCountedPtr<Chttp2ServerListener> listener_;
Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
@ -170,6 +171,9 @@ class Chttp2ServerListener : public Server::ListenerInterface {
// created.
grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
grpc_closure on_close_;
grpc_timer drain_grace_timer_;
grpc_closure on_drain_grace_time_expiry_;
bool drain_grace_timer_expiry_callback_pending_ = false;
bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
};
@ -221,10 +225,9 @@ class Chttp2ServerListener : public Server::ListenerInterface {
Chttp2ServerArgsModifier const args_modifier_;
ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
grpc_channel_args* args_;
Mutex connection_manager_mu_;
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager_ ABSL_GUARDED_BY(connection_manager_mu_);
Mutex mu_;
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager_ ABSL_GUARDED_BY(mu_);
// Signals whether grpc_tcp_server_start() has been called.
bool started_ ABSL_GUARDED_BY(mu_) = false;
// Signals whether grpc_tcp_server_start() has completed.
@ -250,13 +253,31 @@ void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
connection_manager) {
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager_to_destroy;
class GracefulShutdownExistingConnections {
public:
~GracefulShutdownExistingConnections() {
// Send GOAWAYs on the transports so that they get disconnected when
// existing RPCs finish, and so that no new RPC is started on them.
for (auto& connection : connections_) {
connection.first->SendGoAway();
}
}
void set_connections(
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>>
connections) {
GPR_ASSERT(connections_.empty());
connections_ = std::move(connections);
}
private:
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_;
} connections_to_shutdown;
{
MutexLock lock(&listener_->connection_manager_mu_);
MutexLock lock(&listener_->mu_);
connection_manager_to_destroy = listener_->connection_manager_;
listener_->connection_manager_ = std::move(connection_manager);
}
{
MutexLock lock(&listener_->mu_);
connections_to_shutdown.set_connections(std::move(listener_->connections_));
if (listener_->shutdown_) {
return;
}
@ -532,6 +553,17 @@ void Chttp2ServerListener::ActiveConnection::SendGoAway() {
op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Server is stopping to serve requests.");
grpc_transport_perform_op(&transport->base, op);
Ref().release(); // Ref held by OnDrainGraceTimeExpiry
GRPC_CLOSURE_INIT(&on_drain_grace_time_expiry_, OnDrainGraceTimeExpiry,
this, nullptr);
grpc_timer_init(&drain_grace_timer_,
ExecCtx::Get()->Now() +
grpc_channel_args_find_integer(
listener_->args_,
GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS,
{10 * 60 * GPR_MS_PER_SEC, 0, INT_MAX}),
&on_drain_grace_time_expiry_);
drain_grace_timer_expiry_callback_pending_ = true;
}
}
@ -566,6 +598,29 @@ void Chttp2ServerListener::ActiveConnection::OnClose(
self->listener_->connections_.erase(it);
}
}
// Cancel the drain_grace_timer_ if needed.
if (self->drain_grace_timer_expiry_callback_pending_) {
grpc_timer_cancel(&self->drain_grace_timer_);
}
}
self->Unref();
}
void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry(
void* arg, grpc_error_handle error) {
ActiveConnection* self = static_cast<ActiveConnection*>(arg);
// If the drain_grace_timer_ was not cancelled, disconnect the transport
// immediately.
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_transport* transport = nullptr;
{
MutexLock lock(&self->mu_);
transport = self->transport_;
}
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Drain grace time expired. Closing connection immediately.");
grpc_transport_perform_op(&transport->base, op);
}
self->Unref();
}
@ -700,7 +755,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager;
{
MutexLock lock(&self->connection_manager_mu_);
MutexLock lock(&self->mu_);
connection_manager = self->connection_manager_;
}
auto endpoint_cleanup = [&](grpc_error_handle error) {
@ -751,8 +806,10 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
RefCountedPtr<Chttp2ServerListener> listener_ref;
{
MutexLock lock(&self->mu_);
// Shutdown the the connection if listener's stopped serving.
if (!self->shutdown_ && self->is_serving_) {
// Shutdown the the connection if listener's stopped serving or if the
// connection manager has changed.
if (!self->shutdown_ && self->is_serving_ &&
connection_manager == self->connection_manager_) {
// This ref needs to be taken in the critical region after having made
// sure that the listener has not been Orphaned, so as to avoid
// heap-use-after-free issues where `Ref()` is invoked when the ref of

@ -68,8 +68,8 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
private:
class ListenerWatcher;
RefCountedPtr<XdsClient> xds_client_;
grpc_server_xds_status_notifier serving_status_notifier_;
const RefCountedPtr<XdsClient> xds_client_;
const grpc_server_xds_status_notifier serving_status_notifier_;
Mutex mu_;
std::map<grpc_server_config_fetcher::WatcherInterface*, ListenerWatcher*>
listener_watchers_ ABSL_GUARDED_BY(mu_);
@ -1235,7 +1235,10 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ());
GRPC_API_TRACE(
"grpc_server_config_fetcher_xds_create(notifier={on_serving_status_"
"update=%p, user_data=%p}, args=%p)",
3, (notifier.on_serving_status_update, notifier.user_data, args));
grpc_error_handle error = GRPC_ERROR_NONE;
grpc_core::RefCountedPtr<grpc_core::XdsClient> xds_client =
grpc_core::XdsClient::GetOrCreate(args, &error);

@ -1617,6 +1617,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
absl::make_unique<XdsChannelArgsServerBuilderOption>(test_obj_));
}
builder.set_status_notifier(&notifier_);
builder.experimental().set_drain_grace_time(
test_obj_->xds_drain_grace_time_ms_);
builder.AddListeningPort(server_address.str(), Credentials());
RegisterAllServices(&builder);
server_ = builder.BuildAndStart();
@ -1929,6 +1931,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
RouteConfiguration default_server_route_config_;
Cluster default_cluster_;
bool use_xds_enabled_server_;
int xds_drain_grace_time_ms_ = 10 * 60 * 1000; // 10 mins
bool bootstrap_contents_from_env_var_;
};
@ -9277,6 +9280,60 @@ TEST_P(XdsEnabledServerStatusNotificationTest, ExistingRpcsOnResourceDeletion) {
}
}
TEST_P(XdsEnabledServerStatusNotificationTest,
ExistingRpcsFailOnResourceUpdateAfterDrainGraceTimeExpires) {
constexpr int kDrainGraceTimeMs = 100;
xds_drain_grace_time_ms_ = kDrainGraceTimeMs;
FakeCertificateProvider::CertDataMap fake1_cert_map = {
{"", {root_cert_, identity_pair_}}};
g_fake1_cert_data_map = &fake1_cert_map;
// Send a valid LDS update to get the server to start listening
SetValidLdsUpdate();
backends_[0]->Start();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::OK);
constexpr int kNumChannels = 10;
struct StreamingRpc {
std::shared_ptr<Channel> channel;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub;
ClientContext context;
std::unique_ptr<ClientReaderWriter<EchoRequest, EchoResponse>> stream;
} streaming_rpcs[kNumChannels];
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
for (int i = 0; i < kNumChannels; i++) {
streaming_rpcs[i].channel = CreateInsecureChannel();
streaming_rpcs[i].stub =
grpc::testing::EchoTestService::NewStub(streaming_rpcs[i].channel);
streaming_rpcs[i].context.set_wait_for_ready(true);
streaming_rpcs[i].stream =
streaming_rpcs[i].stub->BidiStream(&streaming_rpcs[i].context);
EXPECT_TRUE(streaming_rpcs[i].stream->Write(request));
streaming_rpcs[i].stream->Read(&response);
EXPECT_EQ(request.message(), response.message());
}
grpc_millis update_time = NowFromCycleCounter();
// Update the resource.
SetLdsUpdate("", "", "fake_plugin1", "", false);
// Wait for the updated resource to take effect.
SendRpc([this]() { return CreateTlsChannel(); },
server_authenticated_identity_, {});
// After the drain grace time expires, the existing RPCs should all fail.
for (int i = 0; i < kNumChannels; i++) {
// Wait for the drain grace time to expire
EXPECT_FALSE(streaming_rpcs[i].stream->Read(&response));
// Make sure that the drain grace interval is honored.
EXPECT_GE(NowFromCycleCounter() - update_time, kDrainGraceTimeMs);
auto status = streaming_rpcs[i].stream->Finish();
EXPECT_EQ(status.error_code(), grpc::StatusCode::UNAVAILABLE)
<< status.error_code() << ", " << status.error_message() << ", "
<< status.error_details() << ", "
<< streaming_rpcs[i].context.debug_error_string();
}
}
using XdsServerFilterChainMatchTest = XdsServerSecurityTest;
TEST_P(XdsServerFilterChainMatchTest,

Loading…
Cancel
Save