xDS status notifier (#25321)

* Serving status notification for xds enabled servers
pull/25656/head
Yash Tibrewal 4 years ago committed by GitHub
parent 1437dc9816
commit 81e90432e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      include/grpc/grpc.h
  2. 36
      include/grpcpp/xds_server_builder.h
  3. 2
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  4. 2
      src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
  5. 506
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  6. 2
      src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
  7. 8
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  8. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.h
  9. 1
      src/core/ext/transport/chttp2/transport/internal.h
  10. 78
      src/core/ext/xds/xds_server_config_fetcher.cc
  11. 39
      src/core/lib/channel/handshaker.cc
  12. 17
      src/core/lib/channel/handshaker.h
  13. 9
      src/core/lib/surface/server.h
  14. 9
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  15. 6
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  16. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  17. 2
      test/core/bad_client/bad_client.cc
  18. 4
      test/core/bad_connection/close_fd_test.cc
  19. 4
      test/core/end2end/fixtures/h2_sockpair+trace.cc
  20. 4
      test/core/end2end/fixtures/h2_sockpair.cc
  21. 4
      test/core/end2end/fixtures/h2_sockpair_1byte.cc
  22. 2
      test/core/end2end/fuzzers/client_fuzzer.cc
  23. 2
      test/core/end2end/fuzzers/server_fuzzer.cc
  24. 194
      test/cpp/end2end/xds_end2end_test.cc
  25. 2
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  26. 6
      test/cpp/microbenchmarks/fullstack_fixtures.h
  27. 4
      test/cpp/performance/writes_per_rpc_test.cc

@ -411,10 +411,20 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server* server,
grpc_completion_queue* cq,
void* reserved);
// There might be more methods added later, so users should take care to memset
// this to 0 before using it.
typedef struct {
void (*on_serving_status_change)(void* user_data, const char* uri,
grpc_status_code code,
const char* error_message);
void* user_data;
} grpc_server_xds_status_notifier;
typedef struct grpc_server_config_fetcher grpc_server_config_fetcher;
/** EXPERIMENTAL. Creates an xDS config fetcher. */
GRPCAPI grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create();
GRPCAPI grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
grpc_server_xds_status_notifier notifier);
/** EXPERIMENTAL. Destroys a config fetcher. */
GRPCAPI void grpc_server_config_fetcher_destroy(

@ -26,15 +26,47 @@
namespace grpc {
namespace experimental {
class XdsServerServingStatusNotifierInterface {
public:
virtual ~XdsServerServingStatusNotifierInterface() = default;
// \a uri contains the listening target associated with the notification. Note
// that a single target provided to XdsServerBuilder can get resolved to
// multiple listening addresses. Status::OK signifies that the server is
// serving, while a non-OK status signifies that the server is not serving.
virtual void OnServingStatusChange(std::string uri, grpc::Status status) = 0;
};
class XdsServerBuilder : public ::grpc::ServerBuilder {
public:
// It is the responsibility of the application to make sure that \a notifier
// outlasts the life of the server. Notifications will start being made
// asynchronously once `BuildAndStart()` has been called. Note that it is
// possible for notifications to be made before `BuildAndStart()` returns.
void set_status_notifier(XdsServerServingStatusNotifierInterface* notifier) {
notifier_ = notifier;
}
std::unique_ptr<Server> BuildAndStart() override {
grpc_server_config_fetcher* fetcher =
grpc_server_config_fetcher_xds_create();
grpc_server_config_fetcher* fetcher = grpc_server_config_fetcher_xds_create(
{OnServingStatusChange, notifier_});
if (fetcher == nullptr) return nullptr;
set_fetcher(fetcher);
return ServerBuilder::BuildAndStart();
}
private:
static void OnServingStatusChange(void* user_data, const char* uri,
grpc_status_code code,
const char* error_message) {
if (user_data == nullptr) return;
XdsServerServingStatusNotifierInterface* notifier =
static_cast<XdsServerServingStatusNotifierInterface*>(user_data);
notifier->OnServingStatusChange(
uri, grpc::Status(static_cast<StatusCode>(code), error_message));
}
XdsServerServingStatusNotifierInterface* notifier_ = nullptr;
};
} // namespace experimental

@ -178,7 +178,7 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error* error) {
self->Ref().release(); // Ref held by OnTimeout()
grpc_chttp2_transport_start_reading(self->result_->transport,
args->read_buffer,
&self->on_receive_settings_);
&self->on_receive_settings_, nullptr);
GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&self->timer_, self->args_.deadline, &self->on_timeout_);

@ -62,7 +62,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
transport, nullptr, &error);
grpc_channel_args_destroy(final_args);
if (channel != nullptr) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_core::ExecCtx::Get()->Flush();
} else {
intptr_t integer;

@ -90,77 +90,88 @@ class Chttp2ServerListener : public Server::ListenerInterface {
class ConfigFetcherWatcher
: public grpc_server_config_fetcher::WatcherInterface {
public:
explicit ConfigFetcherWatcher(Chttp2ServerListener* listener)
: listener_(listener) {}
explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
: listener_(std::move(listener)) {}
void UpdateConfig(grpc_channel_args* args) override {
{
MutexLock lock(&listener_->mu_);
grpc_channel_args_destroy(listener_->args_);
grpc_error* error = GRPC_ERROR_NONE;
args = listener_->args_modifier_(args, &error);
if (error != GRPC_ERROR_NONE) {
// TODO(yashykt): Set state to close down connections immediately
// after accepting.
GPR_ASSERT(0);
}
listener_->args_ = args;
if (!listener_->shutdown_) return; // Already started listening.
}
int port_temp;
grpc_error* error = grpc_tcp_server_add_port(
listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
if (error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error);
gpr_log(GPR_ERROR, "Error adding port to server: %s",
grpc_error_string(error));
// TODO(yashykt): We wouldn't need to assert here if we bound to the
// port earlier during AddPort.
GPR_ASSERT(0);
}
listener_->StartListening();
}
void UpdateConfig(grpc_channel_args* args) override;
void StopServing() override;
private:
Chttp2ServerListener* listener_;
RefCountedPtr<Chttp2ServerListener> listener_;
};
class ConnectionState : public RefCounted<ConnectionState> {
class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
public:
class HandshakingState : public InternallyRefCounted<HandshakingState> {
public:
ConnectionState(Chttp2ServerListener* listener,
HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor,
RefCountedPtr<HandshakeManager> handshake_mgr,
grpc_channel_args* args, grpc_endpoint* endpoint);
grpc_channel_args* args);
~ConnectionState() override;
~HandshakingState() override;
void Orphan() override;
void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
// Needed to be able to grab an external ref in ActiveConnection::Start()
using InternallyRefCounted<HandshakingState>::Ref;
private:
static void OnTimeout(void* arg, grpc_error* error);
static void OnReceiveSettings(void* arg, grpc_error* error);
static void OnReceiveSettings(void* arg, grpc_error* /* error */);
static void OnHandshakeDone(void* arg, grpc_error* error);
Chttp2ServerListener* const listener_;
RefCountedPtr<ActiveConnection> const connection_;
grpc_pollset* const accepting_pollset_;
grpc_tcp_server_acceptor* const acceptor_;
RefCountedPtr<HandshakeManager> handshake_mgr_;
RefCountedPtr<HandshakeManager> handshake_mgr_
ABSL_GUARDED_BY(&connection_->mu_);
// State for enforcing handshake timeout on receiving HTTP/2 settings.
grpc_chttp2_transport* transport_ = nullptr;
grpc_millis deadline_;
grpc_timer timer_;
grpc_closure on_timeout_;
grpc_closure on_receive_settings_;
grpc_millis const deadline_;
grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
grpc_pollset_set* const interested_parties_;
};
ActiveConnection(RefCountedPtr<Chttp2ServerListener> listener,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor,
grpc_channel_args* args);
~ActiveConnection() override;
void Orphan() override;
void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
// Needed to be able to grab an external ref in
// Chttp2ServerListener::OnAccept()
using InternallyRefCounted<ActiveConnection>::Ref;
private:
static void OnClose(void* arg, grpc_error* error);
RefCountedPtr<Chttp2ServerListener> const listener_;
Mutex mu_ ACQUIRED_AFTER(&listener_->mu_);
// Set by HandshakingState before the handshaking begins and reset when
// handshaking is done.
OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
// Set by HandshakingState when handshaking is done and a valid transport is
// created.
grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
grpc_closure on_close_;
bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
};
// Should only be called once so as to start the TCP server.
void StartListening();
static void OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor);
RefCountedPtr<HandshakeManager> CreateHandshakeManager();
static void TcpServerShutdownComplete(void* arg, grpc_error* error);
static void DestroyListener(Server* /*server*/, void* arg,
@ -169,19 +180,84 @@ class Chttp2ServerListener : public Server::ListenerInterface {
Server* const server_;
grpc_tcp_server* tcp_server_;
grpc_resolved_address resolved_address_;
Chttp2ServerArgsModifier args_modifier_;
Mutex mu_;
grpc_channel_args* args_; // guarded by mu_
Chttp2ServerArgsModifier const args_modifier_;
ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
bool shutdown_ = true;
grpc_closure tcp_server_shutdown_complete_;
grpc_closure* on_destroy_done_ = nullptr;
HandshakeManager* pending_handshake_mgrs_ = nullptr;
Mutex channel_args_mu_;
grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_);
Mutex mu_;
// Signals whether grpc_tcp_server_start() has been called.
bool started_ ABSL_GUARDED_BY(mu_) = false;
// Signals whether grpc_tcp_server_start() has completed.
CondVar started_cv_ ABSL_GUARDED_BY(mu_);
// Signals whether new requests/connections are to be accepted.
bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
// Signals whether the application has triggered shutdown.
bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
ABSL_GUARDED_BY(mu_);
grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
};
//
// Chttp2ServerListener::ConnectionState
// Chttp2ServerListener::ConfigFetcherWatcher
//
void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConfig(
grpc_channel_args* args) {
grpc_error* error = GRPC_ERROR_NONE;
args = listener_->args_modifier_(args, &error);
if (error != GRPC_ERROR_NONE) {
// TODO(yashykt): Set state to close down connections immediately
// after accepting.
GPR_ASSERT(0);
}
grpc_channel_args* args_to_destroy = nullptr;
{
MutexLock lock(&listener_->channel_args_mu_);
args_to_destroy = listener_->args_;
listener_->args_ = args;
}
grpc_channel_args_destroy(args_to_destroy);
{
MutexLock lock(&listener_->mu_);
if (listener_->shutdown_) {
return;
}
listener_->is_serving_ = true;
if (listener_->started_) return;
}
int port_temp;
error = grpc_tcp_server_add_port(listener_->tcp_server_,
&listener_->resolved_address_, &port_temp);
if (error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error);
gpr_log(GPR_ERROR, "Error adding port to server: %s",
grpc_error_string(error));
// TODO(yashykt): We wouldn't need to assert here if we bound to the
// port earlier during AddPort.
GPR_ASSERT(0);
}
listener_->StartListening();
{
MutexLock lock(&listener_->mu_);
listener_->started_ = true;
listener_->started_cv_.SignalAll();
}
}
void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
{
MutexLock lock(&listener_->mu_);
listener_->is_serving_ = false;
connections = std::move(listener_->connections_);
}
}
//
// Chttp2ServerListener::ActiveConnection::HandshakingState
//
grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
@ -191,73 +267,91 @@ grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
return ExecCtx::Get()->Now() + timeout_ms;
}
Chttp2ServerListener::ConnectionState::ConnectionState(
Chttp2ServerListener* listener, grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor,
RefCountedPtr<HandshakeManager> handshake_mgr, grpc_channel_args* args,
grpc_endpoint* endpoint)
: listener_(listener),
Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
RefCountedPtr<ActiveConnection> connection_ref,
grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
grpc_channel_args* args)
: connection_(std::move(connection_ref)),
accepting_pollset_(accepting_pollset),
acceptor_(acceptor),
handshake_mgr_(std::move(handshake_mgr)),
handshake_mgr_(MakeRefCounted<HandshakeManager>()),
deadline_(GetConnectionDeadline(args)),
interested_parties_(grpc_pollset_set_create()) {
grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
interested_parties_, handshake_mgr_.get());
handshake_mgr_->DoHandshake(endpoint, args, deadline_, acceptor_,
OnHandshakeDone, this);
}
Chttp2ServerListener::ConnectionState::~ConnectionState() {
if (transport_ != nullptr) {
GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "receive settings timeout");
}
Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
grpc_pollset_set_destroy(interested_parties_);
}
void Chttp2ServerListener::ConnectionState::OnTimeout(void* arg,
grpc_error* error) {
ConnectionState* self = static_cast<ConnectionState*>(arg);
void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
{
MutexLock lock(&connection_->mu_);
if (handshake_mgr_ != nullptr) {
handshake_mgr_->Shutdown(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
}
}
Unref();
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
grpc_endpoint* endpoint,
grpc_channel_args* args) ABSL_NO_THREAD_SAFETY_ANALYSIS {
Ref().release(); // Held by OnHandshakeDone
// Not acquiring a lock for handshake_mgr_ since it is only reset in
// OnHandshakeDone or on destruction.
handshake_mgr_->DoHandshake(endpoint, args, deadline_, acceptor_,
OnHandshakeDone, this);
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
void* arg, grpc_error* error) {
HandshakingState* self = static_cast<HandshakingState*>(arg);
// Note that we may be called with GRPC_ERROR_NONE when the timer fires
// or with an error indicating that the timer system is being shut down.
if (error != GRPC_ERROR_CANCELLED) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Did not receive HTTP/2 settings before handshake timeout");
grpc_transport_perform_op(&self->transport_->base, op);
grpc_chttp2_transport* transport = nullptr;
{
MutexLock lock(&self->connection_->mu_);
transport = self->connection_->transport_;
}
grpc_transport_perform_op(&transport->base, op);
}
self->Unref();
}
void Chttp2ServerListener::ConnectionState::OnReceiveSettings(
void* arg, grpc_error* error) {
ConnectionState* self = static_cast<ConnectionState*>(arg);
if (error == GRPC_ERROR_NONE) {
void Chttp2ServerListener::ActiveConnection::HandshakingState::
OnReceiveSettings(void* arg, grpc_error* /* error */) {
HandshakingState* self = static_cast<HandshakingState*>(arg);
grpc_timer_cancel(&self->timer_);
}
self->Unref();
}
void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
grpc_error* error) {
void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
void* arg, grpc_error* error) {
auto* args = static_cast<HandshakerArgs*>(arg);
ConnectionState* self = static_cast<ConnectionState*>(args->user_data);
{
MutexLock lock(&self->listener_->mu_);
HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
OrphanablePtr<HandshakingState> handshaking_state_ref;
RefCountedPtr<HandshakeManager> handshake_mgr;
bool cleanup_connection = false;
grpc_resource_user* resource_user =
self->listener_->server_->default_resource_user();
if (error != GRPC_ERROR_NONE || self->listener_->shutdown_) {
self->connection_->listener_->server_->default_resource_user();
{
MutexLock connection_lock(&self->connection_->mu_);
if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) {
const char* error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
}
cleanup_connection = true;
if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
// We were shut down or stopped serving after handshaking completed
// successfully, so destroy the endpoint here.
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
@ -275,9 +369,11 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
if (args->endpoint != nullptr) {
grpc_transport* transport = grpc_create_chttp2_transport(
args->args, args->endpoint, false, resource_user);
grpc_error* channel_init_err = self->listener_->server_->SetupTransport(
grpc_error* channel_init_err =
self->connection_->listener_->server_->SetupTransport(
transport, self->accepting_pollset_, args->args,
grpc_chttp2_transport_get_socket_node(transport), resource_user);
grpc_chttp2_transport_get_socket_node(transport),
resource_user);
if (channel_init_err == GRPC_ERROR_NONE) {
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
@ -287,18 +383,28 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
// static_cast<> to a derived class.
// TODO(roth): Change to static_cast<> when we C++-ify the
// transport API.
self->transport_ =
self->connection_->transport_ =
reinterpret_cast<grpc_chttp2_transport*>(transport);
GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
"ActiveConnection"); // Held by connection_
self->Ref().release(); // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
self, grpc_schedule_on_exec_ctx);
// If the listener has been configured with a config fetcher, we need
// to watch on the transport being closed so that we can an updated
// list of active connections.
grpc_closure* on_close = nullptr;
if (self->connection_->listener_->config_fetcher_watcher_ !=
nullptr) {
// Refs helds by OnClose()
self->connection_->Ref().release();
on_close = &self->connection_->on_close_;
}
grpc_chttp2_transport_start_reading(transport, args->read_buffer,
&self->on_receive_settings_);
&self->on_receive_settings_,
on_close);
grpc_channel_args_destroy(args->args);
self->Ref().release(); // Held by OnTimeout().
GRPC_CHTTP2_REF_TRANSPORT(
reinterpret_cast<grpc_chttp2_transport*>(transport),
"receive settings timeout");
GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
@ -310,25 +416,108 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
grpc_transport_destroy(transport);
grpc_slice_buffer_destroy_internal(args->read_buffer);
gpr_free(args->read_buffer);
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
}
cleanup_connection = true;
grpc_channel_args_destroy(args->args);
}
} else {
cleanup_connection = true;
}
}
// Since the handshake manager is done, the connection no longer needs to
// shutdown the handshake when the listener needs to stop serving.
// Avoid calling the destructor of HandshakeManager and HandshakingState
// from within the critical region.
handshake_mgr = std::move(self->handshake_mgr_);
handshaking_state_ref = std::move(self->connection_->handshaking_state_);
}
gpr_free(self->acceptor_);
OrphanablePtr<ActiveConnection> connection;
if (cleanup_connection) {
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
}
MutexLock listener_lock(&self->connection_->listener_->mu_);
auto it = self->connection_->listener_->connections_.find(
self->connection_.get());
if (it != self->connection_->listener_->connections_.end()) {
connection = std::move(it->second);
self->connection_->listener_->connections_.erase(it);
}
}
self->Unref();
}
//
// Chttp2ServerListener::ActiveConnection
//
Chttp2ServerListener::ActiveConnection::ActiveConnection(
RefCountedPtr<Chttp2ServerListener> listener,
grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
grpc_channel_args* args)
: listener_(std::move(listener)),
handshaking_state_(MakeOrphanable<HandshakingState>(
Ref(), accepting_pollset, acceptor, args)) {
GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
grpc_schedule_on_exec_ctx);
}
Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
if (transport_ != nullptr) {
GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
}
}
void Chttp2ServerListener::ActiveConnection::Orphan() {
OrphanablePtr<HandshakingState> handshaking_state;
grpc_chttp2_transport* transport = nullptr;
{
MutexLock lock(&mu_);
shutdown_ = true;
// Reset handshaking_state_ since we have been orphaned by the listener
// signaling that the listener has stopped serving.
handshaking_state = std::move(handshaking_state_);
transport = transport_;
}
if (transport != nullptr) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Server is stopping to serve requests.");
grpc_transport_perform_op(&transport->base, op);
}
Unref();
}
void Chttp2ServerListener::ActiveConnection::Start(grpc_endpoint* endpoint,
grpc_channel_args* args) {
RefCountedPtr<HandshakingState> handshaking_state_ref;
{
MutexLock lock(&mu_);
if (shutdown_) return;
// Hold a ref to HandshakingState to allow starting the handshake outside
// the critical region.
handshaking_state_ref = handshaking_state_->Ref();
}
handshaking_state_ref->Start(endpoint, args);
}
void Chttp2ServerListener::ActiveConnection::OnClose(void* arg,
grpc_error* /* error */) {
ActiveConnection* self = static_cast<ActiveConnection*>(arg);
OrphanablePtr<ActiveConnection> connection;
{
MutexLock listener_lock(&self->listener_->mu_);
MutexLock connection_lock(&self->mu_);
// The node was already deleted from the connections_ list if the connection
// is shutdown.
if (!self->shutdown_) {
auto it = self->listener_->connections_.find(self);
if (it != self->listener_->connections_.end()) {
connection = std::move(it->second);
self->listener_->connections_.erase(it);
}
}
self->handshake_mgr_->RemoveFromPendingMgrList(
&self->listener_->pending_handshake_mgrs_);
}
self->handshake_mgr_.reset();
gpr_free(self->acceptor_);
grpc_tcp_server_unref(self->listener_->tcp_server_);
self->Unref();
}
@ -414,6 +603,13 @@ Chttp2ServerListener::Chttp2ServerListener(
}
Chttp2ServerListener::~Chttp2ServerListener() {
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
ExecCtx::Get()->Flush();
if (on_destroy_done_ != nullptr) {
ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
ExecCtx::Get()->Flush();
}
grpc_channel_args_destroy(args_);
}
@ -422,24 +618,27 @@ void Chttp2ServerListener::Start(
Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
if (server_->config_fetcher() != nullptr) {
grpc_channel_args* args = nullptr;
auto watcher = absl::make_unique<ConfigFetcherWatcher>(this);
{
MutexLock lock(&mu_);
auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
config_fetcher_watcher_ = watcher.get();
{
MutexLock lock(&channel_args_mu_);
args = grpc_channel_args_copy(args_);
}
server_->config_fetcher()->StartWatch(
grpc_sockaddr_to_string(&resolved_address_, false), args,
std::move(watcher));
} else {
{
MutexLock lock(&mu_);
started_ = true;
is_serving_ = true;
}
StartListening();
}
}
void Chttp2ServerListener::StartListening() {
grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
MutexLock lock(&mu_);
shutdown_ = false;
}
void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
@ -447,68 +646,61 @@ void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
on_destroy_done_ = on_destroy_done;
}
RefCountedPtr<HandshakeManager> Chttp2ServerListener::CreateHandshakeManager() {
MutexLock lock(&mu_);
if (shutdown_) return nullptr;
grpc_resource_user* resource_user = server_->default_resource_user();
void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
grpc_channel_args* args = nullptr;
{
MutexLock lock(&self->channel_args_mu_);
args = grpc_channel_args_copy(self->args_);
}
auto connection = MakeOrphanable<ActiveConnection>(
self->Ref(), accepting_pollset, acceptor, args);
// Hold a ref to connection to allow starting handshake outside the
// critical region
RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
{
MutexLock lock(&self->mu_);
// Shutdown the the connection if listener's stopped serving.
if (!self->shutdown_ && self->is_serving_) {
grpc_resource_user* resource_user =
self->server_->default_resource_user();
if (resource_user != nullptr &&
!grpc_resource_user_safe_alloc(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
gpr_log(GPR_ERROR,
gpr_log(
GPR_ERROR,
"Memory quota exhausted, rejecting connection, no handshaking.");
return nullptr;
} else {
self->connections_.emplace(connection.get(), std::move(connection));
}
auto handshake_mgr = MakeRefCounted<HandshakeManager>();
handshake_mgr->AddToPendingMgrList(&pending_handshake_mgrs_);
grpc_tcp_server_ref(tcp_server_); // Ref held by ConnectionState.
return handshake_mgr;
}
void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
RefCountedPtr<HandshakeManager> handshake_mgr =
self->CreateHandshakeManager();
if (handshake_mgr == nullptr) {
}
}
if (connection != nullptr) {
grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
return;
}
grpc_channel_args* args = nullptr;
{
MutexLock lock(&self->mu_);
args = grpc_channel_args_copy(self->args_);
} else {
connection_ref->Start(tcp, args);
}
// Deletes itself when done.
new ConnectionState(self, accepting_pollset, acceptor,
std::move(handshake_mgr), args, tcp);
grpc_channel_args_destroy(args);
}
void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
grpc_error* error) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
/* ensure all threads have unlocked */
grpc_closure* destroy_done = nullptr;
{
MutexLock lock(&self->mu_);
destroy_done = self->on_destroy_done_;
GPR_ASSERT(self->shutdown_);
if (self->pending_handshake_mgrs_ != nullptr) {
self->pending_handshake_mgrs_->ShutdownAllPending(GRPC_ERROR_REF(error));
}
self->is_serving_ = false;
// Orphan the connections so that they can start cleaning up.
connections = std::move(self->connections_);
self->channelz_listen_socket_.reset();
}
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
ExecCtx::Get()->Flush();
if (destroy_done != nullptr) {
ExecCtx::Run(DEBUG_LOCATION, destroy_done, GRPC_ERROR_REF(error));
ExecCtx::Get()->Flush();
}
delete self;
GRPC_ERROR_UNREF(error);
self->Unref();
}
/* Server callback: destroy the tcp listener (so we don't generate further
@ -523,6 +715,12 @@ void Chttp2ServerListener::Orphan() {
{
MutexLock lock(&mu_);
shutdown_ = true;
// If the listener is currently set to be serving but has not been started
// yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
// operation to finish to avoid causing races.
while (is_serving_ && !started_) {
started_cv_.Wait(&mu_);
}
tcp_server = tcp_server_;
}
grpc_tcp_server_shutdown_listeners(tcp_server);

@ -57,7 +57,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
gpr_log(GPR_ERROR, "Failed to create channel: %s",
grpc_error_string(error));

@ -581,6 +581,11 @@ static void close_transport_locked(grpc_chttp2_transport* t,
GRPC_ERROR_REF(error));
t->notify_on_receive_settings = nullptr;
}
if (t->notify_on_close != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close,
GRPC_ERROR_REF(error));
t->notify_on_close = nullptr;
}
GRPC_ERROR_UNREF(error);
}
@ -3293,7 +3298,7 @@ grpc_transport* grpc_create_chttp2_transport(
void grpc_chttp2_transport_start_reading(
grpc_transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings) {
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
grpc_chttp2_transport* t =
reinterpret_cast<grpc_chttp2_transport*>(transport);
GRPC_CHTTP2_REF_TRANSPORT(
@ -3303,6 +3308,7 @@ void grpc_chttp2_transport_start_reading(
gpr_free(read_buffer);
}
t->notify_on_receive_settings = notify_on_receive_settings;
t->notify_on_close = notify_on_close;
t->combiner->Run(
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
GRPC_ERROR_NONE);

@ -47,6 +47,6 @@ grpc_chttp2_transport_get_socket_node(grpc_transport* transport);
/// HTTP/2 settings are received from the peer.
void grpc_chttp2_transport_start_reading(
grpc_transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings);
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */

@ -301,6 +301,7 @@ struct grpc_chttp2_transport {
grpc_core::Combiner* combiner;
grpc_closure* notify_on_receive_settings = nullptr;
grpc_closure* notify_on_close = nullptr;
/** write execution state of the transport */
grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;

@ -34,8 +34,9 @@ namespace {
class XdsServerConfigFetcher : public grpc_server_config_fetcher {
public:
explicit XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client)
: xds_client_(std::move(xds_client)) {
explicit XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client,
grpc_server_xds_status_notifier notifier)
: xds_client_(std::move(xds_client)), serving_status_notifier_(notifier) {
GPR_ASSERT(xds_client_ != nullptr);
}
@ -44,7 +45,8 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
watcher) override {
grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get();
auto listener_watcher = absl::make_unique<ListenerWatcher>(
std::move(watcher), args, xds_client_);
std::move(watcher), args, xds_client_, serving_status_notifier_,
listening_address);
auto* listener_watcher_ptr = listener_watcher.get();
// TODO(yashykt): Get the resource name id from bootstrap
listening_address = absl::StrCat(
@ -81,10 +83,14 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
explicit ListenerWatcher(
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
server_config_watcher,
grpc_channel_args* args, RefCountedPtr<XdsClient> xds_client)
grpc_channel_args* args, RefCountedPtr<XdsClient> xds_client,
grpc_server_xds_status_notifier serving_status_notifier,
std::string listening_address)
: server_config_watcher_(std::move(server_config_watcher)),
args_(args),
xds_client_(std::move(xds_client)) {}
xds_client_(std::move(xds_client)),
serving_status_notifier_(serving_status_notifier),
listening_address_(std::move(listening_address)) {}
~ListenerWatcher() override { grpc_channel_args_destroy(args_); }
@ -107,10 +113,21 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
return;
}
// Only send an update, if something changed.
if (updated_once_ && !update_needed) {
if (have_resource_ && !update_needed) {
return;
}
updated_once_ = true;
if (!have_resource_) {
have_resource_ = true;
if (serving_status_notifier_.on_serving_status_change != nullptr) {
serving_status_notifier_.on_serving_status_change(
serving_status_notifier_.user_data, listening_address_.c_str(),
GRPC_STATUS_OK, "");
} else {
gpr_log(GPR_INFO,
"xDS Listener resource obtained; will start serving on %s",
listening_address_.c_str());
}
}
grpc_channel_args* updated_args = nullptr;
if (xds_certificate_provider_ != nullptr) {
grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
@ -122,18 +139,43 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
}
void OnError(grpc_error* error) override {
gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports error: %s", this,
grpc_error_string(error));
if (have_resource_) {
gpr_log(GPR_ERROR,
"ListenerWatcher:%p XdsClient reports error: %s for %s; "
"ignoring in favor of existing resource",
this, grpc_error_string(error), listening_address_.c_str());
} else {
if (serving_status_notifier_.on_serving_status_change != nullptr) {
serving_status_notifier_.on_serving_status_change(
serving_status_notifier_.user_data, listening_address_.c_str(),
GRPC_STATUS_UNAVAILABLE, grpc_error_string(error));
} else {
gpr_log(
GPR_ERROR,
"ListenerWatcher:%p error obtaining xDS Listener resource: %s; "
"not serving on %s",
this, grpc_error_string(error), listening_address_.c_str());
}
}
GRPC_ERROR_UNREF(error);
// TODO(yashykt): We might want to bubble this error to the application.
}
void OnResourceDoesNotExist() override {
gpr_log(GPR_ERROR,
"ListenerWatcher:%p XdsClient reports requested listener does "
"not exist",
this);
// TODO(yashykt): We might want to bubble this error to the application.
"not exist; not serving on %s",
this, listening_address_.c_str());
if (have_resource_) {
// The server has started listening already, so we need to gracefully
// stop serving.
server_config_watcher_->StopServing();
have_resource_ = false;
}
if (serving_status_notifier_.on_serving_status_change != nullptr) {
serving_status_notifier_.on_serving_status_change(
serving_status_notifier_.user_data, listening_address_.c_str(),
GRPC_STATUS_NOT_FOUND, "Requested listener does not exist");
}
}
private:
@ -236,10 +278,12 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
server_config_watcher_;
grpc_channel_args* args_;
RefCountedPtr<XdsClient> xds_client_;
grpc_server_xds_status_notifier serving_status_notifier_;
std::string listening_address_;
RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
RefCountedPtr<XdsCertificateProvider> xds_certificate_provider_;
bool updated_once_ = false;
bool have_resource_ = false;
};
struct WatcherState {
@ -248,6 +292,7 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
};
RefCountedPtr<XdsClient> xds_client_;
grpc_server_xds_status_notifier serving_status_notifier_;
Mutex mu_;
std::map<grpc_server_config_fetcher::WatcherInterface*, WatcherState>
watchers_;
@ -256,7 +301,8 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
} // namespace
} // namespace grpc_core
grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() {
grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
grpc_server_xds_status_notifier notifier) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ());
@ -269,5 +315,5 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() {
GRPC_ERROR_UNREF(error);
return nullptr;
}
return new grpc_core::XdsServerConfigFetcher(std::move(xds_client));
return new grpc_core::XdsServerConfigFetcher(std::move(xds_client), notifier);
}

@ -55,44 +55,6 @@ std::string HandshakerArgsString(HandshakerArgs* args) {
HandshakeManager::HandshakeManager() {}
/// Add \a mgr to the server side list of all pending handshake managers, the
/// list starts with \a *head.
// Not thread-safe. Caller needs to synchronize.
void HandshakeManager::AddToPendingMgrList(HandshakeManager** head) {
GPR_ASSERT(prev_ == nullptr);
GPR_ASSERT(next_ == nullptr);
next_ = *head;
if (*head) {
(*head)->prev_ = this;
}
*head = this;
}
/// Remove \a mgr from the server side list of all pending handshake managers.
// Not thread-safe. Caller needs to synchronize.
void HandshakeManager::RemoveFromPendingMgrList(HandshakeManager** head) {
if (next_ != nullptr) {
next_->prev_ = prev_;
}
if (prev_ != nullptr) {
prev_->next_ = next_;
} else {
GPR_ASSERT(*head == this);
*head = next_;
}
}
/// Shutdown all pending handshake managers starting at head on the server
/// side. Not thread-safe. Caller needs to synchronize.
void HandshakeManager::ShutdownAllPending(grpc_error* why) {
auto* head = this;
while (head != nullptr) {
head->Shutdown(GRPC_ERROR_REF(why));
head = head->next_;
}
GRPC_ERROR_UNREF(why);
}
void HandshakeManager::Add(RefCountedPtr<Handshaker> handshaker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
gpr_log(
@ -213,7 +175,6 @@ void HandshakeManager::DoHandshake(grpc_endpoint* endpoint,
{
MutexLock lock(&mu_);
GPR_ASSERT(index_ == 0);
GPR_ASSERT(!is_shutdown_);
// Construct handshaker args. These will be passed through all
// handshakers and eventually be freed by the on_handshake_done callback.
args_.endpoint = endpoint;

@ -94,19 +94,6 @@ class HandshakeManager : public RefCounted<HandshakeManager> {
HandshakeManager();
~HandshakeManager() override;
/// Add \a mgr to the server side list of all pending handshake managers, the
/// list starts with \a *head.
// Not thread-safe. Caller needs to synchronize.
void AddToPendingMgrList(HandshakeManager** head);
/// Remove \a mgr from the server side list of all pending handshake managers.
// Not thread-safe. Caller needs to synchronize.
void RemoveFromPendingMgrList(HandshakeManager** head);
/// Shutdown all pending handshake managers starting at head on the server
/// side. Not thread-safe. Caller needs to synchronize.
void ShutdownAllPending(grpc_error* why);
/// Adds a handshaker to the handshake manager.
/// Takes ownership of \a handshaker.
void Add(RefCountedPtr<Handshaker> handshaker);
@ -161,10 +148,6 @@ class HandshakeManager : public RefCounted<HandshakeManager> {
grpc_closure on_handshake_done_;
// Handshaker args.
HandshakerArgs args_;
// Links to the previous and next managers in a list of all pending handshakes
// Used at server side only.
HandshakeManager* prev_ = nullptr;
HandshakeManager* next_ = nullptr;
};
} // namespace grpc_core

@ -71,7 +71,7 @@ class Server : public InternallyRefCounted<Server> {
/// Interface for listeners.
/// Implementations must override the Orphan() method, which should stop
/// listening and initiate destruction of the listener.
class ListenerInterface : public Orphanable {
class ListenerInterface : public InternallyRefCounted<ListenerInterface> {
public:
~ListenerInterface() override = default;
@ -459,8 +459,13 @@ struct grpc_server_config_fetcher {
class WatcherInterface {
public:
virtual ~WatcherInterface() = default;
// Ownership of \a args is transferred.
// UpdateConfig() is invoked the config fetcher when a new config is
// available. Implementations should update the configuration and start
// serving if not already serving. Ownership of \a args is transferred.
virtual void UpdateConfig(grpc_channel_args* args) = 0;
// Implementations should stop serving when this is called. Serving should
// only resume when UpdateConfig() is invoked.
virtual void StopServing() = 0;
};
virtual ~grpc_server_config_fetcher() = default;

@ -404,7 +404,14 @@ cdef extern from "grpc/grpc.h":
void grpc_server_set_config_fetcher(
grpc_server* server, grpc_server_config_fetcher* config_fetcher) nogil
grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() nogil
ctypedef struct grpc_server_xds_status_notifier:
void (*on_serving_status_change)(void* user_data, const char* uri,
grpc_status_code code,
const char* error_message)
void* user_data;
grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
grpc_server_xds_status_notifier notifier) nogil
int grpc_server_add_insecure_http2_port(

@ -25,8 +25,12 @@ cdef class Server:
self.c_server = NULL
cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
self.c_server = grpc_server_create(channel_args.c_args(), NULL)
cdef grpc_server_xds_status_notifier notifier
notifier.on_serving_status_change = NULL
notifier.user_data = NULL
if xds:
grpc_server_set_config_fetcher(self.c_server, grpc_server_config_fetcher_xds_create())
grpc_server_set_config_fetcher(self.c_server,
grpc_server_config_fetcher_xds_create(notifier))
self.references.append(arguments)
def request_call(

@ -215,7 +215,7 @@ extern grpc_server_create_type grpc_server_create_import;
typedef void(*grpc_server_register_completion_queue_type)(grpc_server* server, grpc_completion_queue* cq, void* reserved);
extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
#define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
typedef grpc_server_config_fetcher*(*grpc_server_config_fetcher_xds_create_type)();
typedef grpc_server_config_fetcher*(*grpc_server_config_fetcher_xds_create_type)(grpc_server_xds_status_notifier notifier);
extern grpc_server_config_fetcher_xds_create_type grpc_server_config_fetcher_xds_create_import;
#define grpc_server_config_fetcher_xds_create grpc_server_config_fetcher_xds_create_import
typedef void(*grpc_server_config_fetcher_destroy_type)(grpc_server_config_fetcher* config_fetcher);

@ -212,7 +212,7 @@ void grpc_run_bad_client_test(
grpc_server_start(a.server);
transport = grpc_create_chttp2_transport(nullptr, sfd.server, false);
server_setup_transport(&a, transport);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
/* Bind fds to pollsets */
grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(client_cq));

@ -99,7 +99,7 @@ static void init_client() {
transport = grpc_create_chttp2_transport(nullptr, g_ctx.ep->client, true);
client_setup_transport(transport);
GPR_ASSERT(g_ctx.client);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
}
static void init_server() {
@ -111,7 +111,7 @@ static void init_server() {
grpc_server_start(g_ctx.server);
transport = grpc_create_chttp2_transport(nullptr, g_ctx.ep->server, false);
server_setup_transport(transport);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
}
static void test_init() {

@ -55,7 +55,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_error* error = f->server->core_server->SetupTransport(
transport, nullptr, f->server->core_server->channel_args(), nullptr);
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
@ -80,7 +80,7 @@ static void client_setup_transport(void* ts, grpc_transport* transport) {
transport, nullptr, &error);
grpc_channel_args_destroy(args);
if (cs->f->client != nullptr) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;

@ -49,7 +49,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_error* error = f->server->core_server->SetupTransport(
transport, nullptr, f->server->core_server->channel_args(), nullptr);
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
@ -75,7 +75,7 @@ static void client_setup_transport(void* ts, grpc_transport* transport) {
transport, nullptr, &error);
grpc_channel_args_destroy(args);
if (cs->f->client != nullptr) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;

@ -49,7 +49,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_error* error = f->server->core_server->SetupTransport(
transport, nullptr, f->server->core_server->channel_args(), nullptr);
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
@ -75,7 +75,7 @@ static void client_setup_transport(void* ts, grpc_transport* transport) {
transport, nullptr, &error);
grpc_channel_args_destroy(args);
if (cs->f->client != nullptr) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;

@ -54,7 +54,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
grpc_transport* transport =
grpc_create_chttp2_transport(nullptr, mock_endpoint, true);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_arg authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),

@ -58,7 +58,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_transport* transport =
grpc_create_chttp2_transport(nullptr, mock_endpoint, false);
server->core_server->SetupTransport(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_call* call1 = nullptr;
grpc_call_details call_details1;

@ -2100,6 +2100,31 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
}
protected:
class XdsServingStatusNotifier
: public grpc::experimental::XdsServerServingStatusNotifierInterface {
public:
void OnServingStatusChange(std::string uri, grpc::Status status) override {
grpc_core::MutexLock lock(&mu_);
status_map[uri] = status;
cond_.Signal();
}
void WaitOnServingStatusChange(std::string uri,
grpc::StatusCode expected_status) {
grpc_core::MutexLock lock(&mu_);
std::map<std::string, grpc::Status>::iterator it;
while ((it = status_map.find(uri)) == status_map.end() ||
it->second.error_code() != expected_status) {
cond_.Wait(&mu_);
}
}
private:
grpc_core::Mutex mu_;
grpc_core::CondVar cond_;
std::map<std::string, grpc::Status> status_map;
};
class ServerThread {
public:
explicit ServerThread(bool use_xds_enabled_server = false)
@ -2131,6 +2156,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
server_address << "localhost:" << port_;
if (use_xds_enabled_server_) {
experimental::XdsServerBuilder builder;
builder.set_status_notifier(&notifier_);
builder.AddListeningPort(server_address.str(), Credentials());
RegisterAllServices(&builder);
server_ = builder.BuildAndStart();
@ -2162,6 +2188,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
bool use_xds_enabled_server() const { return use_xds_enabled_server_; }
XdsServingStatusNotifier* notifier() { return &notifier_; }
private:
virtual void RegisterAllServices(ServerBuilder* builder) = 0;
virtual void StartAllServices() = 0;
@ -2171,6 +2199,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
const int port_;
std::unique_ptr<Server> server_;
XdsServingStatusNotifier notifier_;
std::unique_ptr<std::thread> thread_;
bool running_ = false;
const bool use_xds_enabled_server_;
@ -7387,6 +7416,164 @@ TEST_P(XdsServerSecurityTest, TestFallbackToTls) {
server_authenticated_identity_, {});
}
class XdsEnabledServerStatusNotificationTest : public XdsServerSecurityTest {
protected:
void SetValidLdsUpdate() { SetLdsUpdate("", "", "", "", false); }
void SetInvalidLdsUpdate() {
// Set LDS update without root provider instance.
Listener listener;
listener.set_name(absl::StrCat(
"grpc/server?xds.resource.listening_address=",
ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port()));
auto* socket_address = listener.mutable_address()->mutable_socket_address();
socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1");
socket_address->set_port_value(backends_[0]->port());
auto* filter_chain = listener.add_filter_chains();
auto* transport_socket = filter_chain->mutable_transport_socket();
transport_socket->set_name("envoy.transport_sockets.tls");
DownstreamTlsContext downstream_tls_context;
transport_socket->mutable_typed_config()->PackFrom(downstream_tls_context);
balancers_[0]->ads_service()->SetLdsResource(listener);
}
void UnsetLdsUpdate() {
balancers_[0]->ads_service()->UnsetResource(
kLdsTypeUrl, absl::StrCat("grpc/server?xds.resource.listening_address=",
ipv6_only_ ? "[::1]:" : "127.0.0.1:",
backends_[0]->port()));
}
};
TEST_P(XdsEnabledServerStatusNotificationTest, ServingStatus) {
SetValidLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
}
TEST_P(XdsEnabledServerStatusNotificationTest, NotServingStatus) {
SetInvalidLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::UNAVAILABLE);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {},
true /* test_expects_failure */);
}
TEST_P(XdsEnabledServerStatusNotificationTest, ErrorUpdateWhenAlreadyServing) {
SetValidLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
// Invalid update does not lead to a change in the serving status.
SetInvalidLdsUpdate();
constexpr int kRetryCount = 100;
auto response_state = balancers_[0]->ads_service()->lds_response_state();
for (int i = 0; i < kRetryCount &&
response_state.state != AdsServiceImpl::ResponseState::NACKED;
i++) {
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
response_state = balancers_[0]->ads_service()->lds_response_state();
}
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
}
TEST_P(XdsEnabledServerStatusNotificationTest,
NotServingStatusToServingStatusTransition) {
SetInvalidLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::UNAVAILABLE);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {},
true /* test_expects_failure */);
// Send a valid LDS update to change to serving status
SetValidLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
}
// This test verifies that the resource getting deleted when already serving
// results in future connections being dropped.
TEST_P(XdsEnabledServerStatusNotificationTest,
ServingStatusToNonServingStatusTransition) {
SetValidLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
// Deleting the resource should result in a non-serving status.
UnsetLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::NOT_FOUND);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {},
true /* test_expects_failure */);
}
TEST_P(XdsEnabledServerStatusNotificationTest, RepeatedServingStatusChanges) {
for (int i = 0; i < 5; i++) {
// Send a valid LDS update to get the server to start listening
SetValidLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
// Deleting the resource will make the server start rejecting connections
UnsetLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::NOT_FOUND);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {},
true /* test_expects_failure */);
}
}
TEST_P(XdsEnabledServerStatusNotificationTest, ExistingRpcsOnResourceDeletion) {
// Send a valid LDS update to get the server to start listening
SetValidLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()), grpc::StatusCode::OK);
constexpr int kNumChannels = 10;
struct StreamingRpc {
std::shared_ptr<Channel> channel;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub;
ClientContext context;
std::unique_ptr<ClientWriter<EchoRequest>> writer;
} streaming_rpcs[kNumChannels];
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
for (int i = 0; i < kNumChannels; i++) {
streaming_rpcs[i].channel = CreateInsecureChannel();
streaming_rpcs[i].stub =
grpc::testing::EchoTestService::NewStub(streaming_rpcs[i].channel);
streaming_rpcs[i].context.set_wait_for_ready(true);
streaming_rpcs[i].writer = streaming_rpcs[i].stub->RequestStream(
&streaming_rpcs[i].context, &response);
EXPECT_TRUE(streaming_rpcs[i].writer->Write(request));
}
// Deleting the resource will make the server start rejecting connections
UnsetLdsUpdate();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat("127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::NOT_FOUND);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {},
true /* test_expects_failure */);
for (int i = 0; i < kNumChannels; i++) {
EXPECT_TRUE(streaming_rpcs[i].writer->Write(request));
EXPECT_TRUE(streaming_rpcs[i].writer->WritesDone());
EXPECT_TRUE(streaming_rpcs[i].writer->Finish().ok());
// New RPCs on the existing channels should fail.
ClientContext new_context;
new_context.set_deadline(grpc_timeout_milliseconds_to_deadline(1000));
EXPECT_FALSE(
streaming_rpcs[i].stub->Echo(&new_context, request, &response).ok());
}
}
using EdsTest = BasicTest;
// Tests that EDS client should send a NACK if the EDS update contains
@ -8736,6 +8923,13 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, XdsServerSecurityTest,
.set_use_xds_credentials()),
&TestTypeName);
// We are only testing the server here.
INSTANTIATE_TEST_SUITE_P(XdsTest, XdsEnabledServerStatusNotificationTest,
::testing::Values(TestType()
.set_use_fake_resolver()
.set_use_xds_credentials()),
&TestTypeName);
// EDS could be tested with or without XdsResolver, but the tests would
// be the same either way, so we test it only with XdsResolver.
INSTANTIATE_TEST_SUITE_P(

@ -139,7 +139,7 @@ class Fixture {
grpc_channel_args c_args = args.c_channel_args();
ep_ = new PhonyEndpoint;
t_ = grpc_create_chttp2_transport(&c_args, ep_, client);
grpc_chttp2_transport_start_reading(t_, nullptr, nullptr);
grpc_chttp2_transport_start_reading(t_, nullptr, nullptr, nullptr);
FlushExecCtx();
}

@ -187,7 +187,8 @@ class EndpointPairFixture : public BaseFixture {
server_->c_server()->core_server->SetupTransport(
server_transport_, nullptr, server_args, nullptr);
grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr);
grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr,
nullptr);
}
/* create channel */
@ -202,7 +203,8 @@ class EndpointPairFixture : public BaseFixture {
GPR_ASSERT(client_transport_);
grpc_channel* channel = grpc_channel_create(
"target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_);
grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr);
grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr,
nullptr);
channel_ = ::grpc::CreateChannelInternal(
"", channel,

@ -82,7 +82,7 @@ class EndpointPairFixture {
server_->c_server()->core_server->SetupTransport(transport, nullptr,
server_args, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
}
/* create channel */
@ -97,7 +97,7 @@ class EndpointPairFixture {
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create(
"target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
channel_ = ::grpc::CreateChannelInternal(
"", channel,

Loading…
Cancel
Save