From 81e90432e196ada987ad2c938207dc5f34453e0f Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 8 Mar 2021 21:47:13 -0800 Subject: [PATCH] xDS status notifier (#25321) * Serving status notification for xds enabled servers --- include/grpc/grpc.h | 12 +- include/grpcpp/xds_server_builder.h | 36 +- .../chttp2/client/chttp2_connector.cc | 2 +- .../client/insecure/channel_create_posix.cc | 2 +- .../transport/chttp2/server/chttp2_server.cc | 532 ++++++++++++------ .../server/insecure/server_chttp2_posix.cc | 2 +- .../chttp2/transport/chttp2_transport.cc | 8 +- .../chttp2/transport/chttp2_transport.h | 2 +- .../ext/transport/chttp2/transport/internal.h | 1 + src/core/ext/xds/xds_server_config_fetcher.cc | 78 ++- src/core/lib/channel/handshaker.cc | 39 -- src/core/lib/channel/handshaker.h | 17 - src/core/lib/surface/server.h | 9 +- .../grpcio/grpc/_cython/_cygrpc/grpc.pxi | 9 +- .../grpc/_cython/_cygrpc/server.pyx.pxi | 6 +- src/ruby/ext/grpc/rb_grpc_imports.generated.h | 2 +- test/core/bad_client/bad_client.cc | 2 +- test/core/bad_connection/close_fd_test.cc | 4 +- .../end2end/fixtures/h2_sockpair+trace.cc | 4 +- test/core/end2end/fixtures/h2_sockpair.cc | 4 +- .../end2end/fixtures/h2_sockpair_1byte.cc | 4 +- test/core/end2end/fuzzers/client_fuzzer.cc | 2 +- test/core/end2end/fuzzers/server_fuzzer.cc | 2 +- test/cpp/end2end/xds_end2end_test.cc | 194 +++++++ .../microbenchmarks/bm_chttp2_transport.cc | 2 +- test/cpp/microbenchmarks/fullstack_fixtures.h | 6 +- test/cpp/performance/writes_per_rpc_test.cc | 4 +- 27 files changed, 717 insertions(+), 268 deletions(-) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 09bb1061922..88f78b8639a 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -411,10 +411,20 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server* server, grpc_completion_queue* cq, void* reserved); +// There might be more methods added later, so users should take care to memset +// this to 0 before using it. +typedef struct { + void (*on_serving_status_change)(void* user_data, const char* uri, + grpc_status_code code, + const char* error_message); + void* user_data; +} grpc_server_xds_status_notifier; + typedef struct grpc_server_config_fetcher grpc_server_config_fetcher; /** EXPERIMENTAL. Creates an xDS config fetcher. */ -GRPCAPI grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(); +GRPCAPI grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create( + grpc_server_xds_status_notifier notifier); /** EXPERIMENTAL. Destroys a config fetcher. */ GRPCAPI void grpc_server_config_fetcher_destroy( diff --git a/include/grpcpp/xds_server_builder.h b/include/grpcpp/xds_server_builder.h index 1ed9e83ac3f..96f1d5af70f 100644 --- a/include/grpcpp/xds_server_builder.h +++ b/include/grpcpp/xds_server_builder.h @@ -26,15 +26,47 @@ namespace grpc { namespace experimental { +class XdsServerServingStatusNotifierInterface { + public: + virtual ~XdsServerServingStatusNotifierInterface() = default; + + // \a uri contains the listening target associated with the notification. Note + // that a single target provided to XdsServerBuilder can get resolved to + // multiple listening addresses. Status::OK signifies that the server is + // serving, while a non-OK status signifies that the server is not serving. + virtual void OnServingStatusChange(std::string uri, grpc::Status status) = 0; +}; + class XdsServerBuilder : public ::grpc::ServerBuilder { public: + // It is the responsibility of the application to make sure that \a notifier + // outlasts the life of the server. Notifications will start being made + // asynchronously once `BuildAndStart()` has been called. Note that it is + // possible for notifications to be made before `BuildAndStart()` returns. + void set_status_notifier(XdsServerServingStatusNotifierInterface* notifier) { + notifier_ = notifier; + } + std::unique_ptr BuildAndStart() override { - grpc_server_config_fetcher* fetcher = - grpc_server_config_fetcher_xds_create(); + grpc_server_config_fetcher* fetcher = grpc_server_config_fetcher_xds_create( + {OnServingStatusChange, notifier_}); if (fetcher == nullptr) return nullptr; set_fetcher(fetcher); return ServerBuilder::BuildAndStart(); } + + private: + static void OnServingStatusChange(void* user_data, const char* uri, + grpc_status_code code, + const char* error_message) { + if (user_data == nullptr) return; + XdsServerServingStatusNotifierInterface* notifier = + static_cast(user_data); + notifier->OnServingStatusChange( + uri, grpc::Status(static_cast(code), error_message)); + } + + XdsServerServingStatusNotifierInterface* notifier_ = nullptr; }; } // namespace experimental diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 6ab3531121a..345663a04cd 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -178,7 +178,7 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error* error) { self->Ref().release(); // Ref held by OnTimeout() grpc_chttp2_transport_start_reading(self->result_->transport, args->read_buffer, - &self->on_receive_settings_); + &self->on_receive_settings_, nullptr); GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self, grpc_schedule_on_exec_ctx); grpc_timer_init(&self->timer_, self->args_.deadline, &self->on_timeout_); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index 27e64fb8f3e..1360047cd56 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -62,7 +62,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( transport, nullptr, &error); grpc_channel_args_destroy(final_args); if (channel != nullptr) { - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); grpc_core::ExecCtx::Get()->Flush(); } else { intptr_t integer; diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index e930407f3f1..b206eef59fb 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -90,77 +90,88 @@ class Chttp2ServerListener : public Server::ListenerInterface { class ConfigFetcherWatcher : public grpc_server_config_fetcher::WatcherInterface { public: - explicit ConfigFetcherWatcher(Chttp2ServerListener* listener) - : listener_(listener) {} - - void UpdateConfig(grpc_channel_args* args) override { - { - MutexLock lock(&listener_->mu_); - grpc_channel_args_destroy(listener_->args_); - grpc_error* error = GRPC_ERROR_NONE; - args = listener_->args_modifier_(args, &error); - if (error != GRPC_ERROR_NONE) { - // TODO(yashykt): Set state to close down connections immediately - // after accepting. - GPR_ASSERT(0); - } - listener_->args_ = args; - if (!listener_->shutdown_) return; // Already started listening. - } - int port_temp; - grpc_error* error = grpc_tcp_server_add_port( - listener_->tcp_server_, &listener_->resolved_address_, &port_temp); - if (error != GRPC_ERROR_NONE) { - GRPC_ERROR_UNREF(error); - gpr_log(GPR_ERROR, "Error adding port to server: %s", - grpc_error_string(error)); - // TODO(yashykt): We wouldn't need to assert here if we bound to the - // port earlier during AddPort. - GPR_ASSERT(0); - } - listener_->StartListening(); - } + explicit ConfigFetcherWatcher(RefCountedPtr listener) + : listener_(std::move(listener)) {} + + void UpdateConfig(grpc_channel_args* args) override; + + void StopServing() override; private: - Chttp2ServerListener* listener_; + RefCountedPtr listener_; }; - class ConnectionState : public RefCounted { + class ActiveConnection : public InternallyRefCounted { public: - ConnectionState(Chttp2ServerListener* listener, - grpc_pollset* accepting_pollset, - grpc_tcp_server_acceptor* acceptor, - RefCountedPtr handshake_mgr, - grpc_channel_args* args, grpc_endpoint* endpoint); + class HandshakingState : public InternallyRefCounted { + public: + HandshakingState(RefCountedPtr connection_ref, + grpc_pollset* accepting_pollset, + grpc_tcp_server_acceptor* acceptor, + grpc_channel_args* args); + + ~HandshakingState() override; - ~ConnectionState() override; + void Orphan() override; + + void Start(grpc_endpoint* endpoint, grpc_channel_args* args); + + // Needed to be able to grab an external ref in ActiveConnection::Start() + using InternallyRefCounted::Ref; + + private: + static void OnTimeout(void* arg, grpc_error* error); + static void OnReceiveSettings(void* arg, grpc_error* /* error */); + static void OnHandshakeDone(void* arg, grpc_error* error); + RefCountedPtr const connection_; + grpc_pollset* const accepting_pollset_; + grpc_tcp_server_acceptor* const acceptor_; + RefCountedPtr handshake_mgr_ + ABSL_GUARDED_BY(&connection_->mu_); + // State for enforcing handshake timeout on receiving HTTP/2 settings. + grpc_millis const deadline_; + grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_); + grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_); + grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_); + grpc_pollset_set* const interested_parties_; + }; + + ActiveConnection(RefCountedPtr listener, + grpc_pollset* accepting_pollset, + grpc_tcp_server_acceptor* acceptor, + grpc_channel_args* args); + ~ActiveConnection() override; + + void Orphan() override; + + void Start(grpc_endpoint* endpoint, grpc_channel_args* args); + + // Needed to be able to grab an external ref in + // Chttp2ServerListener::OnAccept() + using InternallyRefCounted::Ref; private: - static void OnTimeout(void* arg, grpc_error* error); - static void OnReceiveSettings(void* arg, grpc_error* error); - static void OnHandshakeDone(void* arg, grpc_error* error); - - Chttp2ServerListener* const listener_; - grpc_pollset* const accepting_pollset_; - grpc_tcp_server_acceptor* const acceptor_; - RefCountedPtr handshake_mgr_; - // State for enforcing handshake timeout on receiving HTTP/2 settings. - grpc_chttp2_transport* transport_ = nullptr; - grpc_millis deadline_; - grpc_timer timer_; - grpc_closure on_timeout_; - grpc_closure on_receive_settings_; - grpc_pollset_set* const interested_parties_; + static void OnClose(void* arg, grpc_error* error); + + RefCountedPtr const listener_; + Mutex mu_ ACQUIRED_AFTER(&listener_->mu_); + // Set by HandshakingState before the handshaking begins and reset when + // handshaking is done. + OrphanablePtr handshaking_state_ ABSL_GUARDED_BY(&mu_); + // Set by HandshakingState when handshaking is done and a valid transport is + // created. + grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr; + grpc_closure on_close_; + bool shutdown_ ABSL_GUARDED_BY(&mu_) = false; }; + // Should only be called once so as to start the TCP server. void StartListening(); static void OnAccept(void* arg, grpc_endpoint* tcp, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor); - RefCountedPtr CreateHandshakeManager(); - static void TcpServerShutdownComplete(void* arg, grpc_error* error); static void DestroyListener(Server* /*server*/, void* arg, @@ -169,19 +180,84 @@ class Chttp2ServerListener : public Server::ListenerInterface { Server* const server_; grpc_tcp_server* tcp_server_; grpc_resolved_address resolved_address_; - Chttp2ServerArgsModifier args_modifier_; - Mutex mu_; - grpc_channel_args* args_; // guarded by mu_ + Chttp2ServerArgsModifier const args_modifier_; ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; - bool shutdown_ = true; - grpc_closure tcp_server_shutdown_complete_; - grpc_closure* on_destroy_done_ = nullptr; - HandshakeManager* pending_handshake_mgrs_ = nullptr; + Mutex channel_args_mu_; + grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_); + Mutex mu_; + // Signals whether grpc_tcp_server_start() has been called. + bool started_ ABSL_GUARDED_BY(mu_) = false; + // Signals whether grpc_tcp_server_start() has completed. + CondVar started_cv_ ABSL_GUARDED_BY(mu_); + // Signals whether new requests/connections are to be accepted. + bool is_serving_ ABSL_GUARDED_BY(mu_) = false; + // Signals whether the application has triggered shutdown. + bool shutdown_ ABSL_GUARDED_BY(mu_) = false; + std::map> connections_ + ABSL_GUARDED_BY(mu_); + grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_); + grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr; RefCountedPtr channelz_listen_socket_; }; // -// Chttp2ServerListener::ConnectionState +// Chttp2ServerListener::ConfigFetcherWatcher +// + +void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConfig( + grpc_channel_args* args) { + grpc_error* error = GRPC_ERROR_NONE; + args = listener_->args_modifier_(args, &error); + if (error != GRPC_ERROR_NONE) { + // TODO(yashykt): Set state to close down connections immediately + // after accepting. + GPR_ASSERT(0); + } + grpc_channel_args* args_to_destroy = nullptr; + { + MutexLock lock(&listener_->channel_args_mu_); + args_to_destroy = listener_->args_; + listener_->args_ = args; + } + grpc_channel_args_destroy(args_to_destroy); + { + MutexLock lock(&listener_->mu_); + if (listener_->shutdown_) { + return; + } + listener_->is_serving_ = true; + if (listener_->started_) return; + } + int port_temp; + error = grpc_tcp_server_add_port(listener_->tcp_server_, + &listener_->resolved_address_, &port_temp); + if (error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(error); + gpr_log(GPR_ERROR, "Error adding port to server: %s", + grpc_error_string(error)); + // TODO(yashykt): We wouldn't need to assert here if we bound to the + // port earlier during AddPort. + GPR_ASSERT(0); + } + listener_->StartListening(); + { + MutexLock lock(&listener_->mu_); + listener_->started_ = true; + listener_->started_cv_.SignalAll(); + } +} + +void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() { + std::map> connections; + { + MutexLock lock(&listener_->mu_); + listener_->is_serving_ = false; + connections = std::move(listener_->connections_); + } +} + +// +// Chttp2ServerListener::ActiveConnection::HandshakingState // grpc_millis GetConnectionDeadline(const grpc_channel_args* args) { @@ -191,73 +267,91 @@ grpc_millis GetConnectionDeadline(const grpc_channel_args* args) { return ExecCtx::Get()->Now() + timeout_ms; } -Chttp2ServerListener::ConnectionState::ConnectionState( - Chttp2ServerListener* listener, grpc_pollset* accepting_pollset, - grpc_tcp_server_acceptor* acceptor, - RefCountedPtr handshake_mgr, grpc_channel_args* args, - grpc_endpoint* endpoint) - : listener_(listener), +Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState( + RefCountedPtr connection_ref, + grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor, + grpc_channel_args* args) + : connection_(std::move(connection_ref)), accepting_pollset_(accepting_pollset), acceptor_(acceptor), - handshake_mgr_(std::move(handshake_mgr)), + handshake_mgr_(MakeRefCounted()), deadline_(GetConnectionDeadline(args)), interested_parties_(grpc_pollset_set_create()) { grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_); HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get()); - handshake_mgr_->DoHandshake(endpoint, args, deadline_, acceptor_, - OnHandshakeDone, this); } -Chttp2ServerListener::ConnectionState::~ConnectionState() { - if (transport_ != nullptr) { - GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "receive settings timeout"); - } +Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() { grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_); grpc_pollset_set_destroy(interested_parties_); } -void Chttp2ServerListener::ConnectionState::OnTimeout(void* arg, - grpc_error* error) { - ConnectionState* self = static_cast(arg); +void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() { + { + MutexLock lock(&connection_->mu_); + if (handshake_mgr_ != nullptr) { + handshake_mgr_->Shutdown( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving.")); + } + } + Unref(); +} + +void Chttp2ServerListener::ActiveConnection::HandshakingState::Start( + grpc_endpoint* endpoint, + grpc_channel_args* args) ABSL_NO_THREAD_SAFETY_ANALYSIS { + Ref().release(); // Held by OnHandshakeDone + // Not acquiring a lock for handshake_mgr_ since it is only reset in + // OnHandshakeDone or on destruction. + handshake_mgr_->DoHandshake(endpoint, args, deadline_, acceptor_, + OnHandshakeDone, this); +} + +void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout( + void* arg, grpc_error* error) { + HandshakingState* self = static_cast(arg); // Note that we may be called with GRPC_ERROR_NONE when the timer fires // or with an error indicating that the timer system is being shut down. if (error != GRPC_ERROR_CANCELLED) { grpc_transport_op* op = grpc_make_transport_op(nullptr); op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Did not receive HTTP/2 settings before handshake timeout"); - grpc_transport_perform_op(&self->transport_->base, op); + grpc_chttp2_transport* transport = nullptr; + { + MutexLock lock(&self->connection_->mu_); + transport = self->connection_->transport_; + } + grpc_transport_perform_op(&transport->base, op); } self->Unref(); } -void Chttp2ServerListener::ConnectionState::OnReceiveSettings( - void* arg, grpc_error* error) { - ConnectionState* self = static_cast(arg); - if (error == GRPC_ERROR_NONE) { - grpc_timer_cancel(&self->timer_); - } +void Chttp2ServerListener::ActiveConnection::HandshakingState:: + OnReceiveSettings(void* arg, grpc_error* /* error */) { + HandshakingState* self = static_cast(arg); + grpc_timer_cancel(&self->timer_); self->Unref(); } -void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg, - grpc_error* error) { +void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( + void* arg, grpc_error* error) { auto* args = static_cast(arg); - ConnectionState* self = static_cast(args->user_data); + HandshakingState* self = static_cast(args->user_data); + OrphanablePtr handshaking_state_ref; + RefCountedPtr handshake_mgr; + bool cleanup_connection = false; + grpc_resource_user* resource_user = + self->connection_->listener_->server_->default_resource_user(); { - MutexLock lock(&self->listener_->mu_); - grpc_resource_user* resource_user = - self->listener_->server_->default_resource_user(); - if (error != GRPC_ERROR_NONE || self->listener_->shutdown_) { + MutexLock connection_lock(&self->connection_->mu_); + if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) { const char* error_str = grpc_error_string(error); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); - if (resource_user != nullptr) { - grpc_resource_user_free(resource_user, - GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); - } + cleanup_connection = true; if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) { - // We were shut down after handshaking completed successfully, so - // destroy the endpoint here. + // We were shut down or stopped serving after handshaking completed + // successfully, so destroy the endpoint here. // TODO(ctiller): It is currently necessary to shutdown endpoints // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which @@ -275,9 +369,11 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg, if (args->endpoint != nullptr) { grpc_transport* transport = grpc_create_chttp2_transport( args->args, args->endpoint, false, resource_user); - grpc_error* channel_init_err = self->listener_->server_->SetupTransport( - transport, self->accepting_pollset_, args->args, - grpc_chttp2_transport_get_socket_node(transport), resource_user); + grpc_error* channel_init_err = + self->connection_->listener_->server_->SetupTransport( + transport, self->accepting_pollset_, args->args, + grpc_chttp2_transport_get_socket_node(transport), + resource_user); if (channel_init_err == GRPC_ERROR_NONE) { // Use notify_on_receive_settings callback to enforce the // handshake deadline. @@ -287,18 +383,28 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg, // static_cast<> to a derived class. // TODO(roth): Change to static_cast<> when we C++-ify the // transport API. - self->transport_ = + self->connection_->transport_ = reinterpret_cast(transport); + GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_, + "ActiveConnection"); // Held by connection_ self->Ref().release(); // Held by OnReceiveSettings(). GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self, grpc_schedule_on_exec_ctx); + // If the listener has been configured with a config fetcher, we need + // to watch on the transport being closed so that we can an updated + // list of active connections. + grpc_closure* on_close = nullptr; + if (self->connection_->listener_->config_fetcher_watcher_ != + nullptr) { + // Refs helds by OnClose() + self->connection_->Ref().release(); + on_close = &self->connection_->on_close_; + } grpc_chttp2_transport_start_reading(transport, args->read_buffer, - &self->on_receive_settings_); + &self->on_receive_settings_, + on_close); grpc_channel_args_destroy(args->args); self->Ref().release(); // Held by OnTimeout(). - GRPC_CHTTP2_REF_TRANSPORT( - reinterpret_cast(transport), - "receive settings timeout"); GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self, grpc_schedule_on_exec_ctx); grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_); @@ -310,25 +416,108 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg, grpc_transport_destroy(transport); grpc_slice_buffer_destroy_internal(args->read_buffer); gpr_free(args->read_buffer); - if (resource_user != nullptr) { - grpc_resource_user_free(resource_user, - GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); - } + cleanup_connection = true; grpc_channel_args_destroy(args->args); } } else { - if (resource_user != nullptr) { - grpc_resource_user_free(resource_user, - GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); - } + cleanup_connection = true; } } - self->handshake_mgr_->RemoveFromPendingMgrList( - &self->listener_->pending_handshake_mgrs_); + // Since the handshake manager is done, the connection no longer needs to + // shutdown the handshake when the listener needs to stop serving. + // Avoid calling the destructor of HandshakeManager and HandshakingState + // from within the critical region. + handshake_mgr = std::move(self->handshake_mgr_); + handshaking_state_ref = std::move(self->connection_->handshaking_state_); } - self->handshake_mgr_.reset(); gpr_free(self->acceptor_); - grpc_tcp_server_unref(self->listener_->tcp_server_); + OrphanablePtr connection; + if (cleanup_connection) { + if (resource_user != nullptr) { + grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); + } + MutexLock listener_lock(&self->connection_->listener_->mu_); + auto it = self->connection_->listener_->connections_.find( + self->connection_.get()); + if (it != self->connection_->listener_->connections_.end()) { + connection = std::move(it->second); + self->connection_->listener_->connections_.erase(it); + } + } + self->Unref(); +} + +// +// Chttp2ServerListener::ActiveConnection +// + +Chttp2ServerListener::ActiveConnection::ActiveConnection( + RefCountedPtr listener, + grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor, + grpc_channel_args* args) + : listener_(std::move(listener)), + handshaking_state_(MakeOrphanable( + Ref(), accepting_pollset, acceptor, args)) { + GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this, + grpc_schedule_on_exec_ctx); +} + +Chttp2ServerListener::ActiveConnection::~ActiveConnection() { + if (transport_ != nullptr) { + GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection"); + } +} + +void Chttp2ServerListener::ActiveConnection::Orphan() { + OrphanablePtr handshaking_state; + grpc_chttp2_transport* transport = nullptr; + { + MutexLock lock(&mu_); + shutdown_ = true; + // Reset handshaking_state_ since we have been orphaned by the listener + // signaling that the listener has stopped serving. + handshaking_state = std::move(handshaking_state_); + transport = transport_; + } + if (transport != nullptr) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Server is stopping to serve requests."); + grpc_transport_perform_op(&transport->base, op); + } + Unref(); +} + +void Chttp2ServerListener::ActiveConnection::Start(grpc_endpoint* endpoint, + grpc_channel_args* args) { + RefCountedPtr handshaking_state_ref; + { + MutexLock lock(&mu_); + if (shutdown_) return; + // Hold a ref to HandshakingState to allow starting the handshake outside + // the critical region. + handshaking_state_ref = handshaking_state_->Ref(); + } + handshaking_state_ref->Start(endpoint, args); +} + +void Chttp2ServerListener::ActiveConnection::OnClose(void* arg, + grpc_error* /* error */) { + ActiveConnection* self = static_cast(arg); + OrphanablePtr connection; + { + MutexLock listener_lock(&self->listener_->mu_); + MutexLock connection_lock(&self->mu_); + // The node was already deleted from the connections_ list if the connection + // is shutdown. + if (!self->shutdown_) { + auto it = self->listener_->connections_.find(self); + if (it != self->listener_->connections_.end()) { + connection = std::move(it->second); + self->listener_->connections_.erase(it); + } + } + } self->Unref(); } @@ -414,6 +603,13 @@ Chttp2ServerListener::Chttp2ServerListener( } Chttp2ServerListener::~Chttp2ServerListener() { + // Flush queued work before destroying handshaker factory, since that + // may do a synchronous unref. + ExecCtx::Get()->Flush(); + if (on_destroy_done_ != nullptr) { + ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE); + ExecCtx::Get()->Flush(); + } grpc_channel_args_destroy(args_); } @@ -422,24 +618,27 @@ void Chttp2ServerListener::Start( Server* /*server*/, const std::vector* /* pollsets */) { if (server_->config_fetcher() != nullptr) { grpc_channel_args* args = nullptr; - auto watcher = absl::make_unique(this); + auto watcher = absl::make_unique(Ref()); + config_fetcher_watcher_ = watcher.get(); { - MutexLock lock(&mu_); - config_fetcher_watcher_ = watcher.get(); + MutexLock lock(&channel_args_mu_); args = grpc_channel_args_copy(args_); } server_->config_fetcher()->StartWatch( grpc_sockaddr_to_string(&resolved_address_, false), args, std::move(watcher)); } else { + { + MutexLock lock(&mu_); + started_ = true; + is_serving_ = true; + } StartListening(); } } void Chttp2ServerListener::StartListening() { grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this); - MutexLock lock(&mu_); - shutdown_ = false; } void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) { @@ -447,68 +646,61 @@ void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) { on_destroy_done_ = on_destroy_done; } -RefCountedPtr Chttp2ServerListener::CreateHandshakeManager() { - MutexLock lock(&mu_); - if (shutdown_) return nullptr; - grpc_resource_user* resource_user = server_->default_resource_user(); - if (resource_user != nullptr && - !grpc_resource_user_safe_alloc(resource_user, - GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) { - gpr_log(GPR_ERROR, - "Memory quota exhausted, rejecting connection, no handshaking."); - return nullptr; - } - auto handshake_mgr = MakeRefCounted(); - handshake_mgr->AddToPendingMgrList(&pending_handshake_mgrs_); - grpc_tcp_server_ref(tcp_server_); // Ref held by ConnectionState. - return handshake_mgr; -} - void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { Chttp2ServerListener* self = static_cast(arg); - RefCountedPtr handshake_mgr = - self->CreateHandshakeManager(); - if (handshake_mgr == nullptr) { - grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE); - grpc_endpoint_destroy(tcp); - gpr_free(acceptor); - return; - } grpc_channel_args* args = nullptr; { - MutexLock lock(&self->mu_); + MutexLock lock(&self->channel_args_mu_); args = grpc_channel_args_copy(self->args_); } - // Deletes itself when done. - new ConnectionState(self, accepting_pollset, acceptor, - std::move(handshake_mgr), args, tcp); + auto connection = MakeOrphanable( + self->Ref(), accepting_pollset, acceptor, args); + // Hold a ref to connection to allow starting handshake outside the + // critical region + RefCountedPtr connection_ref = connection->Ref(); + { + MutexLock lock(&self->mu_); + // Shutdown the the connection if listener's stopped serving. + if (!self->shutdown_ && self->is_serving_) { + grpc_resource_user* resource_user = + self->server_->default_resource_user(); + if (resource_user != nullptr && + !grpc_resource_user_safe_alloc(resource_user, + GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) { + gpr_log( + GPR_ERROR, + "Memory quota exhausted, rejecting connection, no handshaking."); + } else { + self->connections_.emplace(connection.get(), std::move(connection)); + } + } + } + if (connection != nullptr) { + grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE); + grpc_endpoint_destroy(tcp); + gpr_free(acceptor); + } else { + connection_ref->Start(tcp, args); + } grpc_channel_args_destroy(args); } void Chttp2ServerListener::TcpServerShutdownComplete(void* arg, grpc_error* error) { Chttp2ServerListener* self = static_cast(arg); + std::map> connections; /* ensure all threads have unlocked */ - grpc_closure* destroy_done = nullptr; { MutexLock lock(&self->mu_); - destroy_done = self->on_destroy_done_; - GPR_ASSERT(self->shutdown_); - if (self->pending_handshake_mgrs_ != nullptr) { - self->pending_handshake_mgrs_->ShutdownAllPending(GRPC_ERROR_REF(error)); - } + self->is_serving_ = false; + // Orphan the connections so that they can start cleaning up. + connections = std::move(self->connections_); self->channelz_listen_socket_.reset(); } - // Flush queued work before destroying handshaker factory, since that - // may do a synchronous unref. - ExecCtx::Get()->Flush(); - if (destroy_done != nullptr) { - ExecCtx::Run(DEBUG_LOCATION, destroy_done, GRPC_ERROR_REF(error)); - ExecCtx::Get()->Flush(); - } - delete self; + GRPC_ERROR_UNREF(error); + self->Unref(); } /* Server callback: destroy the tcp listener (so we don't generate further @@ -523,6 +715,12 @@ void Chttp2ServerListener::Orphan() { { MutexLock lock(&mu_); shutdown_ = true; + // If the listener is currently set to be serving but has not been started + // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the + // operation to finish to avoid causing races. + while (is_serving_ && !started_) { + started_cv_.Wait(&mu_); + } tcp_server = tcp_server_; } grpc_tcp_server_shutdown_listeners(tcp_server); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index e27c90795fd..0ee9eeecf82 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -57,7 +57,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, for (grpc_pollset* pollset : core_server->pollsets()) { grpc_endpoint_add_to_pollset(server_endpoint, pollset); } - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } else { gpr_log(GPR_ERROR, "Failed to create channel: %s", grpc_error_string(error)); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 9a47d546953..62dca19b0db 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -581,6 +581,11 @@ static void close_transport_locked(grpc_chttp2_transport* t, GRPC_ERROR_REF(error)); t->notify_on_receive_settings = nullptr; } + if (t->notify_on_close != nullptr) { + grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close, + GRPC_ERROR_REF(error)); + t->notify_on_close = nullptr; + } GRPC_ERROR_UNREF(error); } @@ -3293,7 +3298,7 @@ grpc_transport* grpc_create_chttp2_transport( void grpc_chttp2_transport_start_reading( grpc_transport* transport, grpc_slice_buffer* read_buffer, - grpc_closure* notify_on_receive_settings) { + grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) { grpc_chttp2_transport* t = reinterpret_cast(transport); GRPC_CHTTP2_REF_TRANSPORT( @@ -3303,6 +3308,7 @@ void grpc_chttp2_transport_start_reading( gpr_free(read_buffer); } t->notify_on_receive_settings = notify_on_receive_settings; + t->notify_on_close = notify_on_close; t->combiner->Run( GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr), GRPC_ERROR_NONE); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index b04630bbe2b..8d9f309a9c8 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -47,6 +47,6 @@ grpc_chttp2_transport_get_socket_node(grpc_transport* transport); /// HTTP/2 settings are received from the peer. void grpc_chttp2_transport_start_reading( grpc_transport* transport, grpc_slice_buffer* read_buffer, - grpc_closure* notify_on_receive_settings); + grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 408a425135b..30a963df156 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -301,6 +301,7 @@ struct grpc_chttp2_transport { grpc_core::Combiner* combiner; grpc_closure* notify_on_receive_settings = nullptr; + grpc_closure* notify_on_close = nullptr; /** write execution state of the transport */ grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; diff --git a/src/core/ext/xds/xds_server_config_fetcher.cc b/src/core/ext/xds/xds_server_config_fetcher.cc index b276b5b292c..78df37c2d1e 100644 --- a/src/core/ext/xds/xds_server_config_fetcher.cc +++ b/src/core/ext/xds/xds_server_config_fetcher.cc @@ -34,8 +34,9 @@ namespace { class XdsServerConfigFetcher : public grpc_server_config_fetcher { public: - explicit XdsServerConfigFetcher(RefCountedPtr xds_client) - : xds_client_(std::move(xds_client)) { + explicit XdsServerConfigFetcher(RefCountedPtr xds_client, + grpc_server_xds_status_notifier notifier) + : xds_client_(std::move(xds_client)), serving_status_notifier_(notifier) { GPR_ASSERT(xds_client_ != nullptr); } @@ -44,7 +45,8 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { watcher) override { grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get(); auto listener_watcher = absl::make_unique( - std::move(watcher), args, xds_client_); + std::move(watcher), args, xds_client_, serving_status_notifier_, + listening_address); auto* listener_watcher_ptr = listener_watcher.get(); // TODO(yashykt): Get the resource name id from bootstrap listening_address = absl::StrCat( @@ -81,10 +83,14 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { explicit ListenerWatcher( std::unique_ptr server_config_watcher, - grpc_channel_args* args, RefCountedPtr xds_client) + grpc_channel_args* args, RefCountedPtr xds_client, + grpc_server_xds_status_notifier serving_status_notifier, + std::string listening_address) : server_config_watcher_(std::move(server_config_watcher)), args_(args), - xds_client_(std::move(xds_client)) {} + xds_client_(std::move(xds_client)), + serving_status_notifier_(serving_status_notifier), + listening_address_(std::move(listening_address)) {} ~ListenerWatcher() override { grpc_channel_args_destroy(args_); } @@ -107,10 +113,21 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { return; } // Only send an update, if something changed. - if (updated_once_ && !update_needed) { + if (have_resource_ && !update_needed) { return; } - updated_once_ = true; + if (!have_resource_) { + have_resource_ = true; + if (serving_status_notifier_.on_serving_status_change != nullptr) { + serving_status_notifier_.on_serving_status_change( + serving_status_notifier_.user_data, listening_address_.c_str(), + GRPC_STATUS_OK, ""); + } else { + gpr_log(GPR_INFO, + "xDS Listener resource obtained; will start serving on %s", + listening_address_.c_str()); + } + } grpc_channel_args* updated_args = nullptr; if (xds_certificate_provider_ != nullptr) { grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg(); @@ -122,18 +139,43 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { } void OnError(grpc_error* error) override { - gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports error: %s", this, - grpc_error_string(error)); + if (have_resource_) { + gpr_log(GPR_ERROR, + "ListenerWatcher:%p XdsClient reports error: %s for %s; " + "ignoring in favor of existing resource", + this, grpc_error_string(error), listening_address_.c_str()); + } else { + if (serving_status_notifier_.on_serving_status_change != nullptr) { + serving_status_notifier_.on_serving_status_change( + serving_status_notifier_.user_data, listening_address_.c_str(), + GRPC_STATUS_UNAVAILABLE, grpc_error_string(error)); + } else { + gpr_log( + GPR_ERROR, + "ListenerWatcher:%p error obtaining xDS Listener resource: %s; " + "not serving on %s", + this, grpc_error_string(error), listening_address_.c_str()); + } + } GRPC_ERROR_UNREF(error); - // TODO(yashykt): We might want to bubble this error to the application. } void OnResourceDoesNotExist() override { gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports requested listener does " - "not exist", - this); - // TODO(yashykt): We might want to bubble this error to the application. + "not exist; not serving on %s", + this, listening_address_.c_str()); + if (have_resource_) { + // The server has started listening already, so we need to gracefully + // stop serving. + server_config_watcher_->StopServing(); + have_resource_ = false; + } + if (serving_status_notifier_.on_serving_status_change != nullptr) { + serving_status_notifier_.on_serving_status_change( + serving_status_notifier_.user_data, listening_address_.c_str(), + GRPC_STATUS_NOT_FOUND, "Requested listener does not exist"); + } } private: @@ -236,10 +278,12 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { server_config_watcher_; grpc_channel_args* args_; RefCountedPtr xds_client_; + grpc_server_xds_status_notifier serving_status_notifier_; + std::string listening_address_; RefCountedPtr root_certificate_provider_; RefCountedPtr identity_certificate_provider_; RefCountedPtr xds_certificate_provider_; - bool updated_once_ = false; + bool have_resource_ = false; }; struct WatcherState { @@ -248,6 +292,7 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { }; RefCountedPtr xds_client_; + grpc_server_xds_status_notifier serving_status_notifier_; Mutex mu_; std::map watchers_; @@ -256,7 +301,8 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { } // namespace } // namespace grpc_core -grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() { +grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create( + grpc_server_xds_status_notifier notifier) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ()); @@ -269,5 +315,5 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() { GRPC_ERROR_UNREF(error); return nullptr; } - return new grpc_core::XdsServerConfigFetcher(std::move(xds_client)); + return new grpc_core::XdsServerConfigFetcher(std::move(xds_client), notifier); } diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc index 000a7d2e26b..e3ba3fcc55a 100644 --- a/src/core/lib/channel/handshaker.cc +++ b/src/core/lib/channel/handshaker.cc @@ -55,44 +55,6 @@ std::string HandshakerArgsString(HandshakerArgs* args) { HandshakeManager::HandshakeManager() {} -/// Add \a mgr to the server side list of all pending handshake managers, the -/// list starts with \a *head. -// Not thread-safe. Caller needs to synchronize. -void HandshakeManager::AddToPendingMgrList(HandshakeManager** head) { - GPR_ASSERT(prev_ == nullptr); - GPR_ASSERT(next_ == nullptr); - next_ = *head; - if (*head) { - (*head)->prev_ = this; - } - *head = this; -} - -/// Remove \a mgr from the server side list of all pending handshake managers. -// Not thread-safe. Caller needs to synchronize. -void HandshakeManager::RemoveFromPendingMgrList(HandshakeManager** head) { - if (next_ != nullptr) { - next_->prev_ = prev_; - } - if (prev_ != nullptr) { - prev_->next_ = next_; - } else { - GPR_ASSERT(*head == this); - *head = next_; - } -} - -/// Shutdown all pending handshake managers starting at head on the server -/// side. Not thread-safe. Caller needs to synchronize. -void HandshakeManager::ShutdownAllPending(grpc_error* why) { - auto* head = this; - while (head != nullptr) { - head->Shutdown(GRPC_ERROR_REF(why)); - head = head->next_; - } - GRPC_ERROR_UNREF(why); -} - void HandshakeManager::Add(RefCountedPtr handshaker) { if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) { gpr_log( @@ -213,7 +175,6 @@ void HandshakeManager::DoHandshake(grpc_endpoint* endpoint, { MutexLock lock(&mu_); GPR_ASSERT(index_ == 0); - GPR_ASSERT(!is_shutdown_); // Construct handshaker args. These will be passed through all // handshakers and eventually be freed by the on_handshake_done callback. args_.endpoint = endpoint; diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 3dc6da85966..942b5fed4dd 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -94,19 +94,6 @@ class HandshakeManager : public RefCounted { HandshakeManager(); ~HandshakeManager() override; - /// Add \a mgr to the server side list of all pending handshake managers, the - /// list starts with \a *head. - // Not thread-safe. Caller needs to synchronize. - void AddToPendingMgrList(HandshakeManager** head); - - /// Remove \a mgr from the server side list of all pending handshake managers. - // Not thread-safe. Caller needs to synchronize. - void RemoveFromPendingMgrList(HandshakeManager** head); - - /// Shutdown all pending handshake managers starting at head on the server - /// side. Not thread-safe. Caller needs to synchronize. - void ShutdownAllPending(grpc_error* why); - /// Adds a handshaker to the handshake manager. /// Takes ownership of \a handshaker. void Add(RefCountedPtr handshaker); @@ -161,10 +148,6 @@ class HandshakeManager : public RefCounted { grpc_closure on_handshake_done_; // Handshaker args. HandshakerArgs args_; - // Links to the previous and next managers in a list of all pending handshakes - // Used at server side only. - HandshakeManager* prev_ = nullptr; - HandshakeManager* next_ = nullptr; }; } // namespace grpc_core diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index fee963ba0ee..bc2f4a6c627 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -71,7 +71,7 @@ class Server : public InternallyRefCounted { /// Interface for listeners. /// Implementations must override the Orphan() method, which should stop /// listening and initiate destruction of the listener. - class ListenerInterface : public Orphanable { + class ListenerInterface : public InternallyRefCounted { public: ~ListenerInterface() override = default; @@ -459,8 +459,13 @@ struct grpc_server_config_fetcher { class WatcherInterface { public: virtual ~WatcherInterface() = default; - // Ownership of \a args is transferred. + // UpdateConfig() is invoked the config fetcher when a new config is + // available. Implementations should update the configuration and start + // serving if not already serving. Ownership of \a args is transferred. virtual void UpdateConfig(grpc_channel_args* args) = 0; + // Implementations should stop serving when this is called. Serving should + // only resume when UpdateConfig() is invoked. + virtual void StopServing() = 0; }; virtual ~grpc_server_config_fetcher() = default; diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index b7d5e2e4c87..4d0c608a6a5 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -404,7 +404,14 @@ cdef extern from "grpc/grpc.h": void grpc_server_set_config_fetcher( grpc_server* server, grpc_server_config_fetcher* config_fetcher) nogil - grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() nogil + ctypedef struct grpc_server_xds_status_notifier: + void (*on_serving_status_change)(void* user_data, const char* uri, + grpc_status_code code, + const char* error_message) + void* user_data; + + grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create( + grpc_server_xds_status_notifier notifier) nogil int grpc_server_add_insecure_http2_port( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index ffd06ca4edc..ed1dda292d6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -25,8 +25,12 @@ cdef class Server: self.c_server = NULL cdef _ChannelArgs channel_args = _ChannelArgs(arguments) self.c_server = grpc_server_create(channel_args.c_args(), NULL) + cdef grpc_server_xds_status_notifier notifier + notifier.on_serving_status_change = NULL + notifier.user_data = NULL if xds: - grpc_server_set_config_fetcher(self.c_server, grpc_server_config_fetcher_xds_create()) + grpc_server_set_config_fetcher(self.c_server, + grpc_server_config_fetcher_xds_create(notifier)) self.references.append(arguments) def request_call( diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index c526b3a7c03..36af484e6a8 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -215,7 +215,7 @@ extern grpc_server_create_type grpc_server_create_import; typedef void(*grpc_server_register_completion_queue_type)(grpc_server* server, grpc_completion_queue* cq, void* reserved); extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; #define grpc_server_register_completion_queue grpc_server_register_completion_queue_import -typedef grpc_server_config_fetcher*(*grpc_server_config_fetcher_xds_create_type)(); +typedef grpc_server_config_fetcher*(*grpc_server_config_fetcher_xds_create_type)(grpc_server_xds_status_notifier notifier); extern grpc_server_config_fetcher_xds_create_type grpc_server_config_fetcher_xds_create_import; #define grpc_server_config_fetcher_xds_create grpc_server_config_fetcher_xds_create_import typedef void(*grpc_server_config_fetcher_destroy_type)(grpc_server_config_fetcher* config_fetcher); diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index 000215edf2d..ee446f9c2c4 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -212,7 +212,7 @@ void grpc_run_bad_client_test( grpc_server_start(a.server); transport = grpc_create_chttp2_transport(nullptr, sfd.server, false); server_setup_transport(&a, transport); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); /* Bind fds to pollsets */ grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(client_cq)); diff --git a/test/core/bad_connection/close_fd_test.cc b/test/core/bad_connection/close_fd_test.cc index 1c89ee72c11..2be36fcc61d 100644 --- a/test/core/bad_connection/close_fd_test.cc +++ b/test/core/bad_connection/close_fd_test.cc @@ -99,7 +99,7 @@ static void init_client() { transport = grpc_create_chttp2_transport(nullptr, g_ctx.ep->client, true); client_setup_transport(transport); GPR_ASSERT(g_ctx.client); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } static void init_server() { @@ -111,7 +111,7 @@ static void init_server() { grpc_server_start(g_ctx.server); transport = grpc_create_chttp2_transport(nullptr, g_ctx.ep->server, false); server_setup_transport(transport); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } static void test_init() { diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc index 97fe368348d..3a2f18d5202 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.cc +++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc @@ -55,7 +55,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { grpc_error* error = f->server->core_server->SetupTransport( transport, nullptr, f->server->core_server->channel_args(), nullptr); if (error == GRPC_ERROR_NONE) { - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } else { GRPC_ERROR_UNREF(error); grpc_transport_destroy(transport); @@ -80,7 +80,7 @@ static void client_setup_transport(void* ts, grpc_transport* transport) { transport, nullptr, &error); grpc_channel_args_destroy(args); if (cs->f->client != nullptr) { - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } else { intptr_t integer; grpc_status_code status = GRPC_STATUS_INTERNAL; diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc index d92305a2ff6..b3c40d8ddcb 100644 --- a/test/core/end2end/fixtures/h2_sockpair.cc +++ b/test/core/end2end/fixtures/h2_sockpair.cc @@ -49,7 +49,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { grpc_error* error = f->server->core_server->SetupTransport( transport, nullptr, f->server->core_server->channel_args(), nullptr); if (error == GRPC_ERROR_NONE) { - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } else { GRPC_ERROR_UNREF(error); grpc_transport_destroy(transport); @@ -75,7 +75,7 @@ static void client_setup_transport(void* ts, grpc_transport* transport) { transport, nullptr, &error); grpc_channel_args_destroy(args); if (cs->f->client != nullptr) { - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } else { intptr_t integer; grpc_status_code status = GRPC_STATUS_INTERNAL; diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc index 6b161dbcc4c..f1f4feb2cd4 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc @@ -49,7 +49,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { grpc_error* error = f->server->core_server->SetupTransport( transport, nullptr, f->server->core_server->channel_args(), nullptr); if (error == GRPC_ERROR_NONE) { - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } else { GRPC_ERROR_UNREF(error); grpc_transport_destroy(transport); @@ -75,7 +75,7 @@ static void client_setup_transport(void* ts, grpc_transport* transport) { transport, nullptr, &error); grpc_channel_args_destroy(args); if (cs->f->client != nullptr) { - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } else { intptr_t integer; grpc_status_code status = GRPC_STATUS_INTERNAL; diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index 3d55bffd814..31ec46666f6 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -54,7 +54,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); grpc_transport* transport = grpc_create_chttp2_transport(nullptr, mock_endpoint, true); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); grpc_arg authority_arg = grpc_channel_arg_string_create( const_cast(GRPC_ARG_DEFAULT_AUTHORITY), diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index 1f1b40ed7d0..8651b2ed3db 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -58,7 +58,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_transport* transport = grpc_create_chttp2_transport(nullptr, mock_endpoint, false); server->core_server->SetupTransport(transport, nullptr, nullptr, nullptr); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); grpc_call* call1 = nullptr; grpc_call_details call_details1; diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 276cadc55c2..7f18ad30b61 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -2100,6 +2100,31 @@ class XdsEnd2endTest : public ::testing::TestWithParam { } protected: + class XdsServingStatusNotifier + : public grpc::experimental::XdsServerServingStatusNotifierInterface { + public: + void OnServingStatusChange(std::string uri, grpc::Status status) override { + grpc_core::MutexLock lock(&mu_); + status_map[uri] = status; + cond_.Signal(); + } + + void WaitOnServingStatusChange(std::string uri, + grpc::StatusCode expected_status) { + grpc_core::MutexLock lock(&mu_); + std::map::iterator it; + while ((it = status_map.find(uri)) == status_map.end() || + it->second.error_code() != expected_status) { + cond_.Wait(&mu_); + } + } + + private: + grpc_core::Mutex mu_; + grpc_core::CondVar cond_; + std::map status_map; + }; + class ServerThread { public: explicit ServerThread(bool use_xds_enabled_server = false) @@ -2131,6 +2156,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { server_address << "localhost:" << port_; if (use_xds_enabled_server_) { experimental::XdsServerBuilder builder; + builder.set_status_notifier(¬ifier_); builder.AddListeningPort(server_address.str(), Credentials()); RegisterAllServices(&builder); server_ = builder.BuildAndStart(); @@ -2162,6 +2188,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam { bool use_xds_enabled_server() const { return use_xds_enabled_server_; } + XdsServingStatusNotifier* notifier() { return ¬ifier_; } + private: virtual void RegisterAllServices(ServerBuilder* builder) = 0; virtual void StartAllServices() = 0; @@ -2171,6 +2199,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { const int port_; std::unique_ptr server_; + XdsServingStatusNotifier notifier_; std::unique_ptr thread_; bool running_ = false; const bool use_xds_enabled_server_; @@ -7387,6 +7416,164 @@ TEST_P(XdsServerSecurityTest, TestFallbackToTls) { server_authenticated_identity_, {}); } +class XdsEnabledServerStatusNotificationTest : public XdsServerSecurityTest { + protected: + void SetValidLdsUpdate() { SetLdsUpdate("", "", "", "", false); } + + void SetInvalidLdsUpdate() { + // Set LDS update without root provider instance. + Listener listener; + listener.set_name(absl::StrCat( + "grpc/server?xds.resource.listening_address=", + ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port())); + auto* socket_address = listener.mutable_address()->mutable_socket_address(); + socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1"); + socket_address->set_port_value(backends_[0]->port()); + auto* filter_chain = listener.add_filter_chains(); + auto* transport_socket = filter_chain->mutable_transport_socket(); + transport_socket->set_name("envoy.transport_sockets.tls"); + DownstreamTlsContext downstream_tls_context; + transport_socket->mutable_typed_config()->PackFrom(downstream_tls_context); + balancers_[0]->ads_service()->SetLdsResource(listener); + } + + void UnsetLdsUpdate() { + balancers_[0]->ads_service()->UnsetResource( + kLdsTypeUrl, absl::StrCat("grpc/server?xds.resource.listening_address=", + ipv6_only_ ? "[::1]:" : "127.0.0.1:", + backends_[0]->port())); + } +}; + +TEST_P(XdsEnabledServerStatusNotificationTest, ServingStatus) { + SetValidLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}); +} + +TEST_P(XdsEnabledServerStatusNotificationTest, NotServingStatus) { + SetInvalidLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), + grpc::StatusCode::UNAVAILABLE); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}, + true /* test_expects_failure */); +} + +TEST_P(XdsEnabledServerStatusNotificationTest, ErrorUpdateWhenAlreadyServing) { + SetValidLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}); + // Invalid update does not lead to a change in the serving status. + SetInvalidLdsUpdate(); + constexpr int kRetryCount = 100; + auto response_state = balancers_[0]->ads_service()->lds_response_state(); + for (int i = 0; i < kRetryCount && + response_state.state != AdsServiceImpl::ResponseState::NACKED; + i++) { + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}); + response_state = balancers_[0]->ads_service()->lds_response_state(); + } + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}); +} + +TEST_P(XdsEnabledServerStatusNotificationTest, + NotServingStatusToServingStatusTransition) { + SetInvalidLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), + grpc::StatusCode::UNAVAILABLE); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}, + true /* test_expects_failure */); + // Send a valid LDS update to change to serving status + SetValidLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}); +} + +// This test verifies that the resource getting deleted when already serving +// results in future connections being dropped. +TEST_P(XdsEnabledServerStatusNotificationTest, + ServingStatusToNonServingStatusTransition) { + SetValidLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}); + // Deleting the resource should result in a non-serving status. + UnsetLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), + grpc::StatusCode::NOT_FOUND); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}, + true /* test_expects_failure */); +} + +TEST_P(XdsEnabledServerStatusNotificationTest, RepeatedServingStatusChanges) { + for (int i = 0; i < 5; i++) { + // Send a valid LDS update to get the server to start listening + SetValidLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}); + // Deleting the resource will make the server start rejecting connections + UnsetLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), + grpc::StatusCode::NOT_FOUND); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}, + true /* test_expects_failure */); + } +} + +TEST_P(XdsEnabledServerStatusNotificationTest, ExistingRpcsOnResourceDeletion) { + // Send a valid LDS update to get the server to start listening + SetValidLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK); + constexpr int kNumChannels = 10; + struct StreamingRpc { + std::shared_ptr channel; + std::unique_ptr stub; + ClientContext context; + std::unique_ptr> writer; + } streaming_rpcs[kNumChannels]; + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + for (int i = 0; i < kNumChannels; i++) { + streaming_rpcs[i].channel = CreateInsecureChannel(); + streaming_rpcs[i].stub = + grpc::testing::EchoTestService::NewStub(streaming_rpcs[i].channel); + streaming_rpcs[i].context.set_wait_for_ready(true); + streaming_rpcs[i].writer = streaming_rpcs[i].stub->RequestStream( + &streaming_rpcs[i].context, &response); + EXPECT_TRUE(streaming_rpcs[i].writer->Write(request)); + } + // Deleting the resource will make the server start rejecting connections + UnsetLdsUpdate(); + backends_[0]->notifier()->WaitOnServingStatusChange( + absl::StrCat("127.0.0.1:", backends_[0]->port()), + grpc::StatusCode::NOT_FOUND); + SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}, + true /* test_expects_failure */); + for (int i = 0; i < kNumChannels; i++) { + EXPECT_TRUE(streaming_rpcs[i].writer->Write(request)); + EXPECT_TRUE(streaming_rpcs[i].writer->WritesDone()); + EXPECT_TRUE(streaming_rpcs[i].writer->Finish().ok()); + // New RPCs on the existing channels should fail. + ClientContext new_context; + new_context.set_deadline(grpc_timeout_milliseconds_to_deadline(1000)); + EXPECT_FALSE( + streaming_rpcs[i].stub->Echo(&new_context, request, &response).ok()); + } +} + using EdsTest = BasicTest; // Tests that EDS client should send a NACK if the EDS update contains @@ -8736,6 +8923,13 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, XdsServerSecurityTest, .set_use_xds_credentials()), &TestTypeName); +// We are only testing the server here. +INSTANTIATE_TEST_SUITE_P(XdsTest, XdsEnabledServerStatusNotificationTest, + ::testing::Values(TestType() + .set_use_fake_resolver() + .set_use_xds_credentials()), + &TestTypeName); + // EDS could be tested with or without XdsResolver, but the tests would // be the same either way, so we test it only with XdsResolver. INSTANTIATE_TEST_SUITE_P( diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 45144017d5b..2e16542928d 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -139,7 +139,7 @@ class Fixture { grpc_channel_args c_args = args.c_channel_args(); ep_ = new PhonyEndpoint; t_ = grpc_create_chttp2_transport(&c_args, ep_, client); - grpc_chttp2_transport_start_reading(t_, nullptr, nullptr); + grpc_chttp2_transport_start_reading(t_, nullptr, nullptr, nullptr); FlushExecCtx(); } diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index b5e8028173d..4526dee952a 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -187,7 +187,8 @@ class EndpointPairFixture : public BaseFixture { server_->c_server()->core_server->SetupTransport( server_transport_, nullptr, server_args, nullptr); - grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr); + grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr, + nullptr); } /* create channel */ @@ -202,7 +203,8 @@ class EndpointPairFixture : public BaseFixture { GPR_ASSERT(client_transport_); grpc_channel* channel = grpc_channel_create( "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_); - grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr); + grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr, + nullptr); channel_ = ::grpc::CreateChannelInternal( "", channel, diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc index d3273ee7949..045e29776e7 100644 --- a/test/cpp/performance/writes_per_rpc_test.cc +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -82,7 +82,7 @@ class EndpointPairFixture { server_->c_server()->core_server->SetupTransport(transport, nullptr, server_args, nullptr); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); } /* create channel */ @@ -97,7 +97,7 @@ class EndpointPairFixture { GPR_ASSERT(transport); grpc_channel* channel = grpc_channel_create( "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); channel_ = ::grpc::CreateChannelInternal( "", channel,