Merge pull request #23581 from markdroth/server_c++

Convert grpc_server to idiomatic C++.
pull/23728/head
Mark D. Roth 4 years ago committed by GitHub
commit 7f7a6916f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  2. 3
      src/core/ext/transport/chttp2/server/chttp2_server.h
  3. 4
      src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
  4. 12
      src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
  5. 5
      src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc
  6. 6
      src/core/ext/transport/inproc/inproc_transport.cc
  7. 2
      src/core/lib/channel/channelz.cc
  8. 2
      src/core/lib/channel/channelz.h
  9. 24
      src/core/lib/surface/call.cc
  10. 3
      src/core/lib/surface/call.h
  11. 2
      src/core/lib/surface/init.cc
  12. 2308
      src/core/lib/surface/server.cc
  13. 450
      src/core/lib/surface/server.h
  14. 14
      src/cpp/server/server_cc.cc
  15. 6
      test/core/bad_client/bad_client.cc
  16. 2
      test/core/bad_client/tests/bad_streaming_id.cc
  17. 2
      test/core/bad_client/tests/badreq.cc
  18. 2
      test/core/bad_client/tests/connection_prefix.cc
  19. 2
      test/core/bad_client/tests/headers.cc
  20. 2
      test/core/bad_client/tests/initial_settings_frame.cc
  21. 2
      test/core/bad_client/tests/out_of_bounds.cc
  22. 2
      test/core/bad_client/tests/server_registered_method.cc
  23. 2
      test/core/bad_client/tests/simple_request.cc
  24. 2
      test/core/bad_client/tests/unknown_frame.cc
  25. 2
      test/core/bad_client/tests/window_overflow.cc
  26. 5
      test/core/bad_connection/close_fd_test.cc
  27. 2
      test/core/channel/channelz_test.cc
  28. 4
      test/core/end2end/fixtures/h2_sockpair+trace.cc
  29. 4
      test/core/end2end/fixtures/h2_sockpair.cc
  30. 4
      test/core/end2end/fixtures/h2_sockpair_1byte.cc
  31. 2
      test/core/end2end/fuzzers/server_fuzzer.cc
  32. 4
      test/core/end2end/tests/channelz.cc
  33. 8
      test/cpp/microbenchmarks/fullstack_fixtures.h
  34. 8
      test/cpp/performance/writes_per_rpc_test.cc

@ -53,19 +53,19 @@
namespace grpc_core {
namespace {
class Chttp2ServerListener : public ServerListenerInterface {
class Chttp2ServerListener : public Server::ListenerInterface {
public:
static grpc_error* Create(grpc_server* server, const char* addr,
static grpc_error* Create(Server* server, const char* addr,
grpc_channel_args* args, int* port_num);
static grpc_error* CreateWithAcceptor(grpc_server* server, const char* name,
static grpc_error* CreateWithAcceptor(Server* server, const char* name,
grpc_channel_args* args);
// Do not instantiate directly. Use one of the factory methods above.
Chttp2ServerListener(grpc_server* server, grpc_channel_args* args);
Chttp2ServerListener(Server* server, grpc_channel_args* args);
~Chttp2ServerListener();
void Start(grpc_server* server,
void Start(Server* server,
const std::vector<grpc_pollset*>* pollsets) override;
channelz::ListenSocketNode* channelz_listen_socket_node() const override {
@ -113,10 +113,10 @@ class Chttp2ServerListener : public ServerListenerInterface {
static void TcpServerShutdownComplete(void* arg, grpc_error* error);
static void DestroyListener(grpc_server* /*server*/, void* arg,
static void DestroyListener(Server* /*server*/, void* arg,
grpc_closure* destroy_done);
grpc_server* const server_;
Server* const server_;
grpc_channel_args* const args_;
grpc_tcp_server* tcp_server_;
Mutex mu_;
@ -194,12 +194,10 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
{
MutexLock lock(&self->listener_->mu_);
grpc_resource_user* resource_user =
grpc_server_get_default_resource_user(self->listener_->server_);
self->listener_->server_->default_resource_user();
if (error != GRPC_ERROR_NONE || self->listener_->shutdown_) {
const char* error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
grpc_resource_user* resource_user =
grpc_server_get_default_resource_user(self->listener_->server_);
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
@ -224,10 +222,9 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
if (args->endpoint != nullptr) {
grpc_transport* transport = grpc_create_chttp2_transport(
args->args, args->endpoint, false, resource_user);
grpc_server_setup_transport(
self->listener_->server_, transport, self->accepting_pollset_,
args->args, grpc_chttp2_transport_get_socket_node(transport),
resource_user);
self->listener_->server_->SetupTransport(
transport, self->accepting_pollset_, args->args,
grpc_chttp2_transport_get_socket_node(transport), resource_user);
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
// Note: The reinterpret_cast<>s here are safe, because
@ -270,7 +267,7 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
// Chttp2ServerListener
//
grpc_error* Chttp2ServerListener::Create(grpc_server* server, const char* addr,
grpc_error* Chttp2ServerListener::Create(Server* server, const char* addr,
grpc_channel_args* args,
int* port_num) {
std::vector<grpc_error*> error_list;
@ -327,8 +324,7 @@ grpc_error* Chttp2ServerListener::Create(grpc_server* server, const char* addr,
addr, absl::StrFormat("chttp2 listener %s", addr));
}
/* Register with the server only upon success */
grpc_server_add_listener(server,
OrphanablePtr<ServerListenerInterface>(listener));
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
return GRPC_ERROR_NONE;
}();
if (resolved != nullptr) {
@ -352,7 +348,7 @@ grpc_error* Chttp2ServerListener::Create(grpc_server* server, const char* addr,
return error;
}
grpc_error* Chttp2ServerListener::CreateWithAcceptor(grpc_server* server,
grpc_error* Chttp2ServerListener::CreateWithAcceptor(Server* server,
const char* name,
grpc_channel_args* args) {
Chttp2ServerListener* listener = new Chttp2ServerListener(server, args);
@ -366,12 +362,11 @@ grpc_error* Chttp2ServerListener::CreateWithAcceptor(grpc_server* server,
TcpServerFdHandler** arg_val =
grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
*arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
grpc_server_add_listener(server,
OrphanablePtr<ServerListenerInterface>(listener));
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
return GRPC_ERROR_NONE;
}
Chttp2ServerListener::Chttp2ServerListener(grpc_server* server,
Chttp2ServerListener::Chttp2ServerListener(Server* server,
grpc_channel_args* args)
: server_(server), args_(args) {
GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
@ -383,7 +378,7 @@ Chttp2ServerListener::~Chttp2ServerListener() {
}
/* Server callback: start listening on our ports */
void Chttp2ServerListener::Start(grpc_server* /*server*/,
void Chttp2ServerListener::Start(Server* /*server*/,
const std::vector<grpc_pollset*>* pollsets) {
{
MutexLock lock(&mu_);
@ -400,8 +395,7 @@ void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
RefCountedPtr<HandshakeManager> Chttp2ServerListener::CreateHandshakeManager() {
MutexLock lock(&mu_);
if (shutdown_) return nullptr;
grpc_resource_user* resource_user =
grpc_server_get_default_resource_user(server_);
grpc_resource_user* resource_user = server_->default_resource_user();
if (resource_user != nullptr &&
!grpc_resource_user_safe_alloc(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
@ -475,7 +469,7 @@ void Chttp2ServerListener::Orphan() {
// Chttp2ServerAddPort()
//
grpc_error* Chttp2ServerAddPort(grpc_server* server, const char* addr,
grpc_error* Chttp2ServerAddPort(Server* server, const char* addr,
grpc_channel_args* args, int* port_num) {
if (strncmp(addr, "external:", 9) == 0) {
return grpc_core::Chttp2ServerListener::CreateWithAcceptor(server, addr,

@ -24,12 +24,13 @@
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/surface/server.h"
namespace grpc_core {
/// Adds a port to \a server. Sets \a port_num to the port number.
/// Takes ownership of \a args.
grpc_error* Chttp2ServerAddPort(grpc_server* server, const char* addr,
grpc_error* Chttp2ServerAddPort(Server* server, const char* addr,
grpc_channel_args* args, int* port_num);
} // namespace grpc_core

@ -33,8 +33,8 @@ int grpc_server_add_insecure_http2_port(grpc_server* server, const char* addr) {
GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2,
(server, addr));
grpc_error* err = grpc_core::Chttp2ServerAddPort(
server, addr,
grpc_channel_args_copy(grpc_server_get_channel_args(server)), &port_num);
server->core_server.get(), addr,
grpc_channel_args_copy(server->core_server->channel_args()), &port_num);
if (err != GRPC_ERROR_NONE) {
const char* msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "%s", msg);

@ -41,21 +41,21 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
GPR_ASSERT(reserved == nullptr);
grpc_core::ExecCtx exec_ctx;
grpc_core::Server* core_server = server->core_server.get();
const grpc_channel_args* server_args = core_server->channel_args();
std::string name = absl::StrCat("fd:", fd);
grpc_endpoint* server_endpoint =
grpc_tcp_create(grpc_fd_create(fd, name.c_str(), true),
grpc_server_get_channel_args(server), name.c_str());
grpc_endpoint* server_endpoint = grpc_tcp_create(
grpc_fd_create(fd, name.c_str(), true), server_args, name.c_str());
const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
grpc_transport* transport = grpc_create_chttp2_transport(
server_args, server_endpoint, false /* is_client */);
for (grpc_pollset* pollset : grpc_server_get_pollsets(server)) {
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_server_setup_transport(server, transport, nullptr, server_args, nullptr);
core_server->SetupTransport(transport, nullptr, server_args, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}

@ -68,10 +68,11 @@ int grpc_server_add_secure_http2_port(grpc_server* server, const char* addr,
args_to_add[0] = grpc_server_credentials_to_arg(creds);
args_to_add[1] = grpc_security_connector_to_arg(sc.get());
args =
grpc_channel_args_copy_and_add(grpc_server_get_channel_args(server),
grpc_channel_args_copy_and_add(server->core_server->channel_args(),
args_to_add, GPR_ARRAY_SIZE(args_to_add));
// Add server port.
err = grpc_core::Chttp2ServerAddPort(server, addr, args, &port_num);
err = grpc_core::Chttp2ServerAddPort(server->core_server.get(), addr, args,
&port_num);
done:
sc.reset(DEBUG_LOCATION, "server");

@ -1276,7 +1276,7 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
const char* args_to_remove[] = {GRPC_ARG_MAX_CONNECTION_IDLE_MS,
GRPC_ARG_MAX_CONNECTION_AGE_MS};
const grpc_channel_args* server_args = grpc_channel_args_copy_and_remove(
grpc_server_get_channel_args(server), args_to_remove,
server->core_server->channel_args(), args_to_remove,
GPR_ARRAY_SIZE(args_to_remove));
// Add a default authority channel argument for the client
@ -1293,8 +1293,8 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
client_args);
// TODO(ncteisen): design and support channelz GetSocket for inproc.
grpc_server_setup_transport(server, server_transport, nullptr, server_args,
nullptr);
server->core_server->SetupTransport(server_transport, nullptr, server_args,
nullptr);
grpc_channel* channel = grpc_channel_create(
"inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);

@ -283,7 +283,7 @@ void ChannelNode::RemoveChildSubchannel(intptr_t child_uuid) {
// ServerNode
//
ServerNode::ServerNode(grpc_server* /*server*/, size_t channel_tracer_max_nodes)
ServerNode::ServerNode(size_t channel_tracer_max_nodes)
: BaseNode(EntityType::kServer, ""), trace_(channel_tracer_max_nodes) {}
ServerNode::~ServerNode() {}

@ -238,7 +238,7 @@ class ChannelNode : public BaseNode {
// Handles channelz bookkeeping for servers
class ServerNode : public BaseNode {
public:
ServerNode(grpc_server* server, size_t channel_tracer_max_nodes);
explicit ServerNode(size_t channel_tracer_max_nodes);
~ServerNode() override;

@ -245,7 +245,7 @@ struct grpc_call {
struct {
int* cancelled;
// backpointer to owning server if this is a server side call.
grpc_server* server;
grpc_core::Server* core_server;
} server;
} final_op;
gpr_atm status_error = 0;
@ -374,7 +374,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
} else {
GRPC_STATS_INC_SERVER_CALLS_CREATED();
call->final_op.server.cancelled = nullptr;
call->final_op.server.server = args->server;
call->final_op.server.core_server = args->server;
GPR_ASSERT(args->add_initial_metadata_count == 0);
call->send_extra_metadata_count = 0;
}
@ -476,11 +476,11 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
if (channelz_channel != nullptr) {
channelz_channel->RecordCallStarted();
}
} else {
grpc_core::channelz::ServerNode* channelz_server =
grpc_server_get_channelz_node(call->final_op.server.server);
if (channelz_server != nullptr) {
channelz_server->RecordCallStarted();
} else if (call->final_op.server.core_server != nullptr) {
grpc_core::channelz::ServerNode* channelz_node =
call->final_op.server.core_server->channelz_node();
if (channelz_node != nullptr) {
channelz_node->RecordCallStarted();
}
}
@ -759,15 +759,15 @@ static void set_final_status(grpc_call* call, grpc_error* error) {
} else {
*call->final_op.server.cancelled =
error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
grpc_core::channelz::ServerNode* channelz_server =
grpc_server_get_channelz_node(call->final_op.server.server);
if (channelz_server != nullptr) {
grpc_core::channelz::ServerNode* channelz_node =
call->final_op.server.core_server->channelz_node();
if (channelz_node != nullptr) {
if (*call->final_op.server.cancelled ||
reinterpret_cast<grpc_error*>(
gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
channelz_server->RecordCallFailed();
channelz_node->RecordCallFailed();
} else {
channelz_server->RecordCallSucceeded();
channelz_node->RecordCallSucceeded();
}
}
GRPC_ERROR_UNREF(error);

@ -25,6 +25,7 @@
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/arena.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
@ -34,7 +35,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
typedef struct grpc_call_create_args {
grpc_channel* channel;
grpc_server* server;
grpc_core::Server* server;
grpc_call* parent;
uint32_t propagation_mask;

@ -102,7 +102,7 @@ static void register_builtin_channel_init() {
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
append_filter, (void*)&grpc_lame_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
(void*)&grpc_server_top_filter);
(void*)&grpc_core::Server::kServerTopFilter);
}
typedef struct grpc_plugin {

File diff suppressed because it is too large Load Diff

@ -1,121 +1,397 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_LIB_SURFACE_SERVER_H
#define GRPC_CORE_LIB_SURFACE_SERVER_H
#include <grpc/support/port_platform.h>
#include <list>
#include <vector>
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/transport/transport.h"
extern const grpc_channel_filter grpc_server_top_filter;
/** Lightweight tracing of server channel state */
extern grpc_core::TraceFlag grpc_server_channel_trace;
namespace grpc_core {
/// Interface for listeners.
/// Implementations must override the Orphan() method, which should stop
/// listening and initiate destruction of the listener.
class ServerListenerInterface : public Orphanable {
extern TraceFlag grpc_server_channel_trace;
class Server : public InternallyRefCounted<Server> {
public:
virtual ~ServerListenerInterface() = default;
// Filter vtable.
static const grpc_channel_filter kServerTopFilter;
/// Starts listening. This listener may refer to the pollset object beyond
/// this call, so it is a pointer rather than a reference.
virtual void Start(grpc_server* server,
const std::vector<grpc_pollset*>* pollsets) = 0;
// Opaque type used for registered methods.
struct RegisteredMethod;
/// Returns the channelz node for the listen socket, or null if not
/// supported.
virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
// An object to represent the most relevant characteristics of a
// newly-allocated call object when using an AllocatingRequestMatcherBatch.
struct BatchCallAllocation {
grpc_experimental_completion_queue_functor* tag;
grpc_call** call;
grpc_metadata_array* initial_metadata;
grpc_call_details* details;
};
/// Sets a closure to be invoked by the listener when its destruction
/// is complete.
virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
};
// An object to represent the most relevant characteristics of a
// newly-allocated call object when using an
// AllocatingRequestMatcherRegistered.
struct RegisteredCallAllocation {
grpc_experimental_completion_queue_functor* tag;
grpc_call** call;
grpc_metadata_array* initial_metadata;
gpr_timespec* deadline;
grpc_byte_buffer** optional_payload;
};
} // namespace grpc_core
/// Interface for listeners.
/// Implementations must override the Orphan() method, which should stop
/// listening and initiate destruction of the listener.
class ListenerInterface : public Orphanable {
public:
virtual ~ListenerInterface() = default;
/* Add a listener to the server: when the server starts, it will call Start(),
and when it shuts down, it will orphan the listener. */
void grpc_server_add_listener(
grpc_server* server,
grpc_core::OrphanablePtr<grpc_core::ServerListenerInterface> listener);
/// Starts listening. This listener may refer to the pollset object beyond
/// this call, so it is a pointer rather than a reference.
virtual void Start(Server* server,
const std::vector<grpc_pollset*>* pollsets) = 0;
/* Setup a transport - creates a channel stack, binds the transport to the
server */
void grpc_server_setup_transport(
grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset, const grpc_channel_args* args,
const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
socket_node,
grpc_resource_user* resource_user = nullptr);
/// Returns the channelz node for the listen socket, or null if not
/// supported.
virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
grpc_server* server);
/// Sets a closure to be invoked by the listener when its destruction
/// is complete.
virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
};
const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
explicit Server(const grpc_channel_args* args);
~Server();
grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server);
void Orphan() override;
bool grpc_server_has_open_connections(grpc_server* server);
const grpc_channel_args* channel_args() const { return channel_args_; }
grpc_resource_user* default_resource_user() const {
return default_resource_user_;
}
channelz::ServerNode* channelz_node() const { return channelz_node_.get(); }
// Do not call this before grpc_server_start. Returns the pollsets. The vector
// itself is immutable, but the pollsets inside are mutable. The result is valid
// for the lifetime of the server.
const std::vector<grpc_pollset*>& grpc_server_get_pollsets(grpc_server* server);
// Do not call this before Start(). Returns the pollsets. The
// vector itself is immutable, but the pollsets inside are mutable. The
// result is valid for the lifetime of the server.
const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
namespace grpc_core {
bool HasOpenConnections();
// An object to represent the most relevant characteristics of a newly-allocated
// call object when using an AllocatingRequestMatcherBatch
struct ServerBatchCallAllocation {
grpc_experimental_completion_queue_functor* tag;
grpc_call** call;
grpc_metadata_array* initial_metadata;
grpc_call_details* details;
};
// Adds a listener to the server. When the server starts, it will call
// the listener's Start() method, and when it shuts down, it will orphan
// the listener.
void AddListener(OrphanablePtr<ListenerInterface> listener);
// An object to represent the most relevant characteristics of a newly-allocated
// call object when using an AllocatingRequestMatcherRegistered
struct ServerRegisteredCallAllocation {
grpc_experimental_completion_queue_functor* tag;
grpc_call** call;
grpc_metadata_array* initial_metadata;
gpr_timespec* deadline;
grpc_byte_buffer** optional_payload;
};
// Starts listening for connections.
void Start();
// Sets up a transport. Creates a channel stack and binds the transport to
// the server. Called from the listener when a new connection is accepted.
void SetupTransport(grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
const RefCountedPtr<channelz::SocketNode>& socket_node,
grpc_resource_user* resource_user = nullptr);
void RegisterCompletionQueue(grpc_completion_queue* cq);
// Functions to specify that a specific registered method or the unregistered
// collection should use a specific allocator for request matching.
void SetRegisteredMethodAllocator(
grpc_completion_queue* cq, void* method_tag,
std::function<RegisteredCallAllocation()> allocator);
void SetBatchMethodAllocator(grpc_completion_queue* cq,
std::function<BatchCallAllocation()> allocator);
RegisteredMethod* RegisterMethod(
const char* method, const char* host,
grpc_server_register_method_payload_handling payload_handling,
uint32_t flags);
grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details,
grpc_metadata_array* request_metadata,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification,
void* tag);
grpc_call_error RequestRegisteredCall(
RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
grpc_metadata_array* request_metadata,
grpc_byte_buffer** optional_payload,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag_new);
void ShutdownAndNotify(grpc_completion_queue* cq, void* tag);
void CancelAllCalls();
private:
struct RequestedCall;
struct ChannelRegisteredMethod {
RegisteredMethod* server_registered_method = nullptr;
uint32_t flags;
bool has_host;
ExternallyManagedSlice method;
ExternallyManagedSlice host;
};
class RequestMatcherInterface;
class RealRequestMatcher;
class AllocatingRequestMatcherBase;
class AllocatingRequestMatcherBatch;
class AllocatingRequestMatcherRegistered;
class ChannelData {
public:
ChannelData() = default;
~ChannelData();
void InitTransport(RefCountedPtr<Server> server, grpc_channel* channel,
size_t cq_idx, grpc_transport* transport,
intptr_t channelz_socket_uuid);
RefCountedPtr<Server> server() const { return server_; }
grpc_channel* channel() const { return channel_; }
size_t cq_idx() const { return cq_idx_; }
ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host,
const grpc_slice& path,
bool is_idempotent);
// Filter vtable functions.
static grpc_error* InitChannelElement(grpc_channel_element* elem,
grpc_channel_element_args* args);
static void DestroyChannelElement(grpc_channel_element* elem);
private:
class ConnectivityWatcher;
static void AcceptStream(void* arg, grpc_transport* /*transport*/,
const void* transport_server_data);
void Destroy();
static void FinishDestroy(void* arg, grpc_error* error);
RefCountedPtr<Server> server_;
grpc_channel* channel_;
// The index into Server::cqs_ of the CQ used as a starting point for
// where to publish new incoming calls.
size_t cq_idx_;
absl::optional<std::list<ChannelData*>::iterator> list_position_;
// A hash-table of the methods and hosts of the registered methods.
// TODO(vjpai): Convert this to an STL map type as opposed to a direct
// bucket implementation. (Consider performance impact, hash function to
// use, etc.)
std::unique_ptr<std::vector<ChannelRegisteredMethod>> registered_methods_;
uint32_t registered_method_max_probes_;
grpc_closure finish_destroy_channel_closure_;
intptr_t channelz_socket_uuid_;
};
class CallData {
public:
enum class CallState {
NOT_STARTED, // Waiting for metadata.
PENDING, // Initial metadata read, not flow controlled in yet.
ACTIVATED, // Flow controlled in, on completion queue.
ZOMBIED, // Cancelled before being queued.
};
CallData(grpc_call_element* elem, const grpc_call_element_args& args,
RefCountedPtr<Server> server);
~CallData();
// Starts the recv_initial_metadata batch on the call.
// Invoked from ChannelData::AcceptStream().
void Start(grpc_call_element* elem);
void SetState(CallState state);
// Attempts to move from PENDING to ACTIVATED state. Returns true
// on success.
bool MaybeActivate();
// Publishes an incoming call to the application after it has been
// matched.
void Publish(size_t cq_idx, RequestedCall* rc);
void KillZombie();
// Functions to specify that a specific registered method or the unregistered
// collection should use a specific allocator for request matching.
void SetServerRegisteredMethodAllocator(
grpc_server* server, grpc_completion_queue* cq, void* method_tag,
std::function<ServerRegisteredCallAllocation()> allocator);
void SetServerBatchMethodAllocator(
grpc_server* server, grpc_completion_queue* cq,
std::function<ServerBatchCallAllocation()> allocator);
void FailCallCreation();
// Filter vtable functions.
static grpc_error* InitCallElement(grpc_call_element* elem,
const grpc_call_element_args* args);
static void DestroyCallElement(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/);
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
private:
// Helper functions for handling calls at the top of the call stack.
static void RecvInitialMetadataBatchComplete(void* arg, grpc_error* error);
void StartNewRpc(grpc_call_element* elem);
static void PublishNewRpc(void* arg, grpc_error* error);
// Functions used inside the call stack.
void StartTransportStreamOpBatchImpl(grpc_call_element* elem,
grpc_transport_stream_op_batch* batch);
static void RecvInitialMetadataReady(void* arg, grpc_error* error);
static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
RefCountedPtr<Server> server_;
grpc_call* call_;
Atomic<CallState> state_{CallState::NOT_STARTED};
absl::optional<grpc_slice> path_;
absl::optional<grpc_slice> host_;
grpc_millis deadline_ = GRPC_MILLIS_INF_FUTURE;
grpc_completion_queue* cq_new_ = nullptr;
RequestMatcherInterface* matcher_ = nullptr;
grpc_byte_buffer* payload_ = nullptr;
grpc_closure kill_zombie_closure_;
grpc_metadata_array initial_metadata_ =
grpc_metadata_array(); // Zero-initialize the C struct.
grpc_closure recv_initial_metadata_batch_complete_;
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
uint32_t recv_initial_metadata_flags_ = 0;
grpc_closure recv_initial_metadata_ready_;
grpc_closure* original_recv_initial_metadata_ready_;
grpc_error* recv_initial_metadata_error_ = GRPC_ERROR_NONE;
bool seen_recv_trailing_metadata_ready_ = false;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_;
grpc_error* recv_trailing_metadata_error_ = GRPC_ERROR_NONE;
grpc_closure publish_;
CallCombiner* call_combiner_;
};
struct Listener {
explicit Listener(OrphanablePtr<ListenerInterface> l)
: listener(std::move(l)) {}
OrphanablePtr<ListenerInterface> listener;
grpc_closure destroy_done;
};
struct ShutdownTag {
ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg)
: tag(tag_arg), cq(cq_arg) {}
void* const tag;
grpc_completion_queue* const cq;
grpc_cq_completion completion;
};
static void ListenerDestroyDone(void* arg, grpc_error* error);
static void DoneShutdownEvent(void* server,
grpc_cq_completion* /*completion*/) {
static_cast<Server*>(server)->Unref();
}
static void DoneRequestEvent(void* req, grpc_cq_completion* completion);
void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error* error);
grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc);
void MaybeFinishShutdown();
void KillPendingWorkLocked(grpc_error* error);
static grpc_call_error ValidateServerRequest(
grpc_completion_queue* cq_for_notification, void* tag,
grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
grpc_call_error ValidateServerRequestAndCq(
size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
std::vector<grpc_channel*> GetChannelsLocked() const;
grpc_channel_args* const channel_args_;
grpc_resource_user* default_resource_user_ = nullptr;
RefCountedPtr<channelz::ServerNode> channelz_node_;
std::vector<grpc_completion_queue*> cqs_;
std::vector<grpc_pollset*> pollsets_;
bool started_ = false;
// The two following mutexes control access to server-state.
// mu_global_ controls access to non-call-related state (e.g., channel state).
// mu_call_ controls access to call-related state (e.g., the call lists).
//
// If they are ever required to be nested, you must lock mu_global_
// before mu_call_. This is currently used in shutdown processing
// (ShutdownAndNotify() and MaybeFinishShutdown()).
Mutex mu_global_; // mutex for server and channel state
Mutex mu_call_; // mutex for call-specific state
// startup synchronization: flag is protected by mu_global_, signals whether
// we are doing the listener start routine or not.
bool starting_ = false;
CondVar starting_cv_;
std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_;
// Request matcher for unregistered methods.
std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;
std::atomic_bool shutdown_flag_{false};
bool shutdown_published_ = false;
std::vector<ShutdownTag> shutdown_tags_;
std::list<ChannelData*> channels_;
std::list<Listener> listeners_;
size_t listeners_destroyed_ = 0;
// The last time we printed a shutdown progress message.
gpr_timespec last_shutdown_message_time_;
};
} // namespace grpc_core
struct grpc_server {
grpc_core::OrphanablePtr<grpc_core::Server> core_server;
};
#endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */

@ -547,7 +547,7 @@ class Server::CallbackRequest final
// is nullptr since these services don't have pre-defined methods.
CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method,
grpc::CompletionQueue* cq,
grpc_core::ServerRegisteredCallAllocation* data)
grpc_core::Server::RegisteredCallAllocation* data)
: server_(server),
method_(method),
has_request_payload_(method->method_type() ==
@ -568,7 +568,7 @@ class Server::CallbackRequest final
// For generic services, method is nullptr since these services don't have
// pre-defined methods.
CallbackRequest(Server* server, grpc::CompletionQueue* cq,
grpc_core::ServerBatchCallAllocation* data)
grpc_core::Server::BatchCallAllocation* data)
: server_(server),
method_(nullptr),
has_request_payload_(false),
@ -1063,9 +1063,9 @@ bool Server::RegisterService(const std::string* host, grpc::Service* service) {
has_callback_methods_ = true;
grpc::internal::RpcServiceMethod* method_value = method.get();
grpc::CompletionQueue* cq = CallbackCQ();
grpc_core::SetServerRegisteredMethodAllocator(
server_, cq->cq(), method_registration_tag, [this, cq, method_value] {
grpc_core::ServerRegisteredCallAllocation result;
server_->core_server->SetRegisteredMethodAllocator(
cq->cq(), method_registration_tag, [this, cq, method_value] {
grpc_core::Server::RegisteredCallAllocation result;
new CallbackRequest<grpc::CallbackServerContext>(this, method_value,
cq, &result);
return result;
@ -1104,8 +1104,8 @@ void Server::RegisterCallbackGenericService(
generic_handler_.reset(service->Handler());
grpc::CompletionQueue* cq = CallbackCQ();
grpc_core::SetServerBatchMethodAllocator(server_, cq->cq(), [this, cq] {
grpc_core::ServerBatchCallAllocation result;
server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] {
grpc_core::Server::BatchCallAllocation result;
new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result);
return result;
});

@ -65,8 +65,8 @@ static void set_done_write(void* arg, grpc_error* /*error*/) {
static void server_setup_transport(void* ts, grpc_transport* transport) {
thd_args* a = static_cast<thd_args*>(ts);
grpc_core::ExecCtx exec_ctx;
grpc_server_setup_transport(a->server, transport, nullptr,
grpc_server_get_channel_args(a->server), nullptr);
a->server->core_server->SetupTransport(
transport, nullptr, a->server->core_server->channel_args(), nullptr);
}
/* Sets the read_done event */
@ -219,7 +219,7 @@ void grpc_run_bad_client_test(
grpc_endpoint_add_to_pollset(sfd.server, grpc_cq_pollset(a.cq));
/* Check a ground truth */
GPR_ASSERT(grpc_server_has_open_connections(a.server));
GPR_ASSERT(a.server->core_server->HasOpenConnections());
gpr_event_init(&a.done_thd);
a.validator = server_validator;

@ -76,7 +76,7 @@ namespace {
void verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -31,7 +31,7 @@
static void verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -21,7 +21,7 @@
static void verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -25,7 +25,7 @@
static void verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -24,7 +24,7 @@
static void verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -31,7 +31,7 @@ namespace {
void verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -68,7 +68,7 @@ static void verifier_succeeds(grpc_server* server, grpc_completion_queue* cq,
static void verifier_fails(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -115,7 +115,7 @@ static void verifier(grpc_server* server, grpc_completion_queue* cq,
static void failure_verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -26,7 +26,7 @@
static void verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -44,7 +44,7 @@
static void verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
while (grpc_server_has_open_connections(server)) {
while (server->core_server->HasOpenConnections()) {
GPR_ASSERT(grpc_completion_queue_next(
cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
.type == GRPC_QUEUE_TIMEOUT);

@ -73,9 +73,8 @@ static test_ctx g_ctx;
static void server_setup_transport(grpc_transport* transport) {
grpc_core::ExecCtx exec_ctx;
grpc_endpoint_add_to_pollset(g_ctx.ep->server, grpc_cq_pollset(g_ctx.cq));
grpc_server_setup_transport(g_ctx.server, transport, nullptr,
grpc_server_get_channel_args(g_ctx.server),
nullptr);
g_ctx.server->core_server->SetupTransport(
transport, nullptr, g_ctx.server->core_server->channel_args(), nullptr);
}
static void client_setup_transport(grpc_transport* transport) {

@ -519,7 +519,7 @@ TEST_F(ChannelzRegistryBasedTest, InternalChannelTest) {
TEST(ChannelzServerTest, BasicServerAPIFunctionality) {
grpc_core::ExecCtx exec_ctx;
ServerFixture server(10);
ServerNode* channelz_server = grpc_server_get_channelz_node(server.server());
ServerNode* channelz_server = server.server()->core_server->channelz_node();
channelz_server->RecordCallStarted();
channelz_server->RecordCallFailed();
channelz_server->RecordCallSucceeded();

@ -52,8 +52,8 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_core::ExecCtx exec_ctx;
grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data);
grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq));
grpc_server_setup_transport(f->server, transport, nullptr,
grpc_server_get_channel_args(f->server), nullptr);
f->server->core_server->SetupTransport(
transport, nullptr, f->server->core_server->channel_args(), nullptr);
}
typedef struct {

@ -46,8 +46,8 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_core::ExecCtx exec_ctx;
grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data);
grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq));
grpc_server_setup_transport(f->server, transport, nullptr,
grpc_server_get_channel_args(f->server), nullptr);
f->server->core_server->SetupTransport(
transport, nullptr, f->server->core_server->channel_args(), nullptr);
}
typedef struct {

@ -46,8 +46,8 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_core::ExecCtx exec_ctx;
grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data);
grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq));
grpc_server_setup_transport(f->server, transport, nullptr,
grpc_server_get_channel_args(f->server), nullptr);
f->server->core_server->SetupTransport(
transport, nullptr, f->server->core_server->channel_args(), nullptr);
}
typedef struct {

@ -58,7 +58,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_server_start(server);
grpc_transport* transport =
grpc_create_chttp2_transport(nullptr, mock_endpoint, false);
grpc_server_setup_transport(server, transport, nullptr, nullptr, nullptr);
server->core_server->SetupTransport(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_call* call1 = nullptr;

@ -214,7 +214,7 @@ static void test_channelz(grpc_end2end_test_config config) {
GPR_ASSERT(channelz_channel != nullptr);
grpc_core::channelz::ServerNode* channelz_server =
grpc_server_get_channelz_node(f.server);
f.server->core_server->channelz_node();
GPR_ASSERT(channelz_server != nullptr);
std::string json = channelz_channel->RenderJsonString();
@ -275,7 +275,7 @@ static void test_channelz_with_channel_trace(grpc_end2end_test_config config) {
GPR_ASSERT(channelz_channel != nullptr);
grpc_core::channelz::ServerNode* channelz_server =
grpc_server_get_channelz_node(f.server);
f.server->core_server->channelz_node();
GPR_ASSERT(channelz_server != nullptr);
run_one_request(config, f, true);

@ -174,17 +174,17 @@ class EndpointPairFixture : public BaseFixture {
* */
{
const grpc_channel_args* server_args =
grpc_server_get_channel_args(server_->c_server());
server_->c_server()->core_server->channel_args();
server_transport_ = grpc_create_chttp2_transport(
server_args, endpoints.server, false /* is_client */);
for (grpc_pollset* pollset :
grpc_server_get_pollsets(server_->c_server())) {
server_->c_server()->core_server->pollsets()) {
grpc_endpoint_add_to_pollset(endpoints.server, pollset);
}
grpc_server_setup_transport(server_->c_server(), server_transport_,
nullptr, server_args, nullptr);
server_->c_server()->core_server->SetupTransport(
server_transport_, nullptr, server_args, nullptr);
grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr);
}

@ -71,17 +71,17 @@ class EndpointPairFixture {
/* add server endpoint to server_ */
{
const grpc_channel_args* server_args =
grpc_server_get_channel_args(server_->c_server());
server_->c_server()->core_server->channel_args();
grpc_transport* transport = grpc_create_chttp2_transport(
server_args, endpoints.server, false /* is_client */);
for (grpc_pollset* pollset :
grpc_server_get_pollsets(server_->c_server())) {
server_->c_server()->core_server->pollsets()) {
grpc_endpoint_add_to_pollset(endpoints.server, pollset);
}
grpc_server_setup_transport(server_->c_server(), transport, nullptr,
server_args, nullptr);
server_->c_server()->core_server->SetupTransport(transport, nullptr,
server_args, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}

Loading…
Cancel
Save