Merge pull request #22674 from vjpai/unrequested

Allocating request matcher to support C++ callback API
pull/22744/head
Vijay Pai 5 years ago committed by GitHub
commit 6a477958c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      include/grpcpp/server_impl.h
  2. 329
      src/core/lib/surface/server.cc
  3. 32
      src/core/lib/surface/server.h
  4. 267
      src/cpp/server/server_cc.cc

@ -291,6 +291,13 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
grpc_impl::ServerInitializer* initializer();
// Functions to manage the server shutdown ref count. Things that increase
// the ref count are the running state of the server (take a ref at start and
// drop it at shutdown) and each running callback RPC.
void Ref();
void UnrefWithPossibleNotify() /* LOCKS_EXCLUDED(mu_) */;
void UnrefAndWaitLocked() /* EXCLUSIVE_LOCKS_REQUIRED(mu_) */;
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors_;
@ -315,16 +322,6 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
/// the \a sync_server_cqs)
std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
// Outstanding unmatched callback requests, indexed by method.
// NOTE: Using a gpr_atm rather than atomic_int because atomic_int isn't
// copyable or movable and thus will cause compilation errors. We
// actually only want to extend the vector before the threaded use
// starts, but this is still a limitation.
std::vector<gpr_atm> callback_unmatched_reqs_count_;
// List of callback requests to start when server actually starts.
std::list<CallbackRequestBase*> callback_reqs_to_start_;
#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
// For registering experimental callback generic service; remove when that
// method longer experimental
@ -336,25 +333,18 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
bool started_;
bool shutdown_;
bool shutdown_notified_; // Was notify called on the shutdown_cv_
grpc::internal::CondVar shutdown_done_cv_;
bool shutdown_done_ = false;
std::atomic_int shutdown_refs_outstanding_{1};
grpc::internal::CondVar shutdown_cv_;
// It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
// Incrementing callback_reqs_outstanding_ is ok without a lock but it must be
// decremented under the lock in case it is the last request and enables the
// server shutdown. The increment is performance-critical since it happens
// during periods of increasing load; the decrement happens only when memory
// is maxed out, during server shutdown, or (possibly in a future version)
// during decreasing load, so it is less performance-critical.
grpc::internal::Mutex callback_reqs_mu_;
grpc::internal::CondVar callback_reqs_done_cv_;
std::atomic<intptr_t> callback_reqs_outstanding_{0};
std::shared_ptr<GlobalCallbacks> global_callbacks_;
std::vector<grpc::string> services_;
bool has_async_generic_service_{false};
bool has_callback_generic_service_{false};
bool has_async_generic_service_ = false;
bool has_callback_generic_service_ = false;
bool has_callback_methods_ = false;
// Pointer to the wrapped grpc_server.
grpc_server* server_;
@ -383,8 +373,7 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
// with this server (if any). It is set on the first call to CallbackCQ().
// It is _not owned_ by the server; ownership belongs with its internal
// shutdown callback tag (invoked when the CQ is fully shutdown).
// It is protected by mu_
CompletionQueue* callback_cq_ = nullptr;
CompletionQueue* callback_cq_ /* GUARDED_BY(mu_) */ = nullptr;
// List of CQs passed in by user that must be Shutdown only after Server is
// Shutdown. Even though this is only used with NDEBUG, instantiate it in all

@ -72,15 +72,39 @@ enum requested_call_type { BATCH_CALL, REGISTERED_CALL };
struct registered_method;
struct requested_call {
grpc_core::ManualConstructor<
grpc_core::MultiProducerSingleConsumerQueue::Node>
mpscq_node;
requested_call_type type;
void* tag;
grpc_completion_queue* cq_bound_to_call;
grpc_call** call;
requested_call(void* tag_arg, grpc_completion_queue* call_cq,
grpc_call** call_arg, grpc_metadata_array* initial_md,
grpc_call_details* details)
: type(BATCH_CALL),
tag(tag_arg),
cq_bound_to_call(call_cq),
call(call_arg),
initial_metadata(initial_md) {
details->reserved = nullptr;
data.batch.details = details;
}
requested_call(void* tag_arg, grpc_completion_queue* call_cq,
grpc_call** call_arg, grpc_metadata_array* initial_md,
registered_method* rm, gpr_timespec* deadline,
grpc_byte_buffer** optional_payload)
: type(REGISTERED_CALL),
tag(tag_arg),
cq_bound_to_call(call_cq),
call(call_arg),
initial_metadata(initial_md) {
data.registered.method = rm;
data.registered.deadline = deadline;
data.registered.optional_payload = optional_payload;
}
grpc_core::MultiProducerSingleConsumerQueue::Node mpscq_node;
const requested_call_type type;
void* const tag;
grpc_completion_queue* const cq_bound_to_call;
grpc_call** const call;
grpc_cq_completion completion;
grpc_metadata_array* initial_metadata;
grpc_metadata_array* const initial_metadata;
union {
struct {
grpc_call_details* details;
@ -134,6 +158,10 @@ enum call_state {
struct call_data;
grpc_call_error ValidateServerRequest(
grpc_completion_queue* cq_for_notification, void* tag,
grpc_byte_buffer** optional_payload, registered_method* rm);
// RPCs that come in from the transport must be matched against RPC requests
// from the application. An incoming request from the application can be matched
// to an RPC that has already arrived or can be queued up for later use.
@ -242,14 +270,26 @@ struct call_data {
};
struct registered_method {
char* method;
char* host;
grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
registered_method(
const char* method_arg, const char* host_arg,
grpc_server_register_method_payload_handling payload_handling_arg,
uint32_t flags_arg)
: method(gpr_strdup(method_arg)),
host(gpr_strdup(host_arg)),
payload_handling(payload_handling_arg),
flags(flags_arg) {}
~registered_method() {
gpr_free(method);
gpr_free(host);
}
char* const method;
char* const host;
const grpc_server_register_method_payload_handling payload_handling;
const uint32_t flags;
/* one request matcher per method */
// TODO(vjpai): Move this to a unique_ptr once this has a real
// constructor/destructor
RequestMatcherInterface* matcher = nullptr;
std::unique_ptr<RequestMatcherInterface> matcher;
registered_method* next;
};
@ -285,6 +325,8 @@ struct grpc_server {
bool starting;
gpr_cv starting_cv;
// TODO(vjpai): Convert from a linked-list head pointer to a std::vector once
// grpc_server has a real constructor/destructor
registered_method* registered_methods;
/** one request matcher for unregistered methods */
// TODO(vjpai): Convert to a std::unique_ptr once grpc_server has a real
@ -444,7 +486,7 @@ class RealRequestMatcher : public RequestMatcherInterface {
void RequestCallWithPossiblePublish(size_t request_queue_index,
requested_call* call) override {
if (requests_per_cq_[request_queue_index].Push(call->mpscq_node.get())) {
if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server_->mu_call);
@ -530,6 +572,103 @@ class RealRequestMatcher : public RequestMatcherInterface {
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
};
// AllocatingRequestMatchers don't allow the application to request an RPC in
// advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
// will call out to an allocation function passed in at the construction of the
// object. These request matchers are designed for the C++ callback API, so they
// only support 1 completion queue (passed in at the constructor).
class AllocatingRequestMatcherBase : public RequestMatcherInterface {
public:
AllocatingRequestMatcherBase(grpc_server* server, grpc_completion_queue* cq)
: server_(server), cq_(cq) {
size_t idx;
for (idx = 0; idx < server->cq_count; idx++) {
if (server->cqs[idx] == cq) {
break;
}
}
GPR_ASSERT(idx < server->cq_count);
cq_idx_ = idx;
}
void ZombifyPending() override {}
void KillRequests(grpc_error* error) override { GRPC_ERROR_UNREF(error); }
size_t request_queue_count() const override { return 0; }
void RequestCallWithPossiblePublish(size_t /*request_queue_index*/,
requested_call* /*call*/) final {
GPR_ASSERT(false);
}
grpc_server* server() const override { return server_; }
// Supply the completion queue related to this request matcher
grpc_completion_queue* cq() const { return cq_; }
// Supply the completion queue's index relative to the server.
size_t cq_idx() const { return cq_idx_; }
private:
grpc_server* const server_;
grpc_completion_queue* const cq_;
size_t cq_idx_;
};
// An allocating request matcher for non-registered methods (used for generic
// API and unimplemented RPCs).
class AllocatingRequestMatcherBatch : public AllocatingRequestMatcherBase {
public:
AllocatingRequestMatcherBatch(
grpc_server* server, grpc_completion_queue* cq,
std::function<grpc_core::ServerBatchCallAllocation()> allocator)
: AllocatingRequestMatcherBase(server, cq),
allocator_(std::move(allocator)) {}
void MatchOrQueue(size_t /*start_request_queue_index*/,
call_data* calld) override {
grpc_core::ServerBatchCallAllocation call_info = allocator_();
GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
nullptr, nullptr) == GRPC_CALL_OK);
requested_call* rc = new requested_call(
static_cast<void*>(call_info.tag), cq(), call_info.call,
call_info.initial_metadata, call_info.details);
gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
publish_call(server(), calld, cq_idx(), rc);
}
private:
std::function<grpc_core::ServerBatchCallAllocation()> allocator_;
};
// An allocating request matcher for registered methods.
class AllocatingRequestMatcherRegistered : public AllocatingRequestMatcherBase {
public:
AllocatingRequestMatcherRegistered(
grpc_server* server, grpc_completion_queue* cq, registered_method* rm,
std::function<grpc_core::ServerRegisteredCallAllocation()> allocator)
: AllocatingRequestMatcherBase(server, cq),
registered_method_(rm),
allocator_(std::move(allocator)) {}
void MatchOrQueue(size_t /*start_request_queue_index*/,
call_data* calld) override {
grpc_core::ServerRegisteredCallAllocation call_info = allocator_();
GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
call_info.optional_payload,
registered_method_) == GRPC_CALL_OK);
requested_call* rc = new requested_call(
static_cast<void*>(call_info.tag), cq(), call_info.call,
call_info.initial_metadata, registered_method_, call_info.deadline,
call_info.optional_payload);
gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
publish_call(server(), calld, cq_idx(), rc);
}
private:
registered_method* const registered_method_;
std::function<grpc_core::ServerRegisteredCallAllocation()> allocator_;
};
/*
* server proper
*/
@ -546,10 +685,7 @@ void server_delete(grpc_server* server) {
gpr_cv_destroy(&server->starting_cv);
while ((rm = server->registered_methods) != nullptr) {
server->registered_methods = rm->next;
delete rm->matcher;
gpr_free(rm->method);
gpr_free(rm->host);
gpr_free(rm);
delete rm;
}
delete server->unregistered_request_matcher;
for (i = 0; i < server->cq_count; i++) {
@ -603,7 +739,9 @@ void destroy_channel(channel_data* chand) {
op);
}
void done_request_event(void* req, grpc_cq_completion* /*c*/) { gpr_free(req); }
void done_request_event(void* req, grpc_cq_completion* /*c*/) {
delete static_cast<requested_call*>(req);
}
void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
requested_call* rc) {
@ -718,7 +856,8 @@ void start_new_rpc(grpc_call_element* elem) {
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
finish_start_new_rpc(server, elem, rm->server_registered_method->matcher,
finish_start_new_rpc(server, elem,
rm->server_registered_method->matcher.get(),
rm->server_registered_method->payload_handling);
return;
}
@ -735,7 +874,8 @@ void start_new_rpc(grpc_call_element* elem) {
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
finish_start_new_rpc(server, elem, rm->server_registered_method->matcher,
finish_start_new_rpc(server, elem,
rm->server_registered_method->matcher.get(),
rm->server_registered_method->payload_handling);
return;
}
@ -1101,7 +1241,7 @@ grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
rm = server->unregistered_request_matcher;
break;
case REGISTERED_CALL:
rm = rc->data.registered.method->matcher;
rm = rc->data.registered.method->matcher.get();
break;
}
rm->RequestCallWithPossiblePublish(cq_idx, rc);
@ -1119,6 +1259,26 @@ void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
}
} // namespace
namespace grpc_core {
void SetServerRegisteredMethodAllocator(
grpc_server* server, grpc_completion_queue* cq, void* method_tag,
std::function<ServerRegisteredCallAllocation()> allocator) {
registered_method* rm = static_cast<registered_method*>(method_tag);
rm->matcher.reset(new AllocatingRequestMatcherRegistered(
server, cq, rm, std::move(allocator)));
}
void SetServerBatchMethodAllocator(
grpc_server* server, grpc_completion_queue* cq,
std::function<ServerBatchCallAllocation()> allocator) {
GPR_DEBUG_ASSERT(server->unregistered_request_matcher == nullptr);
server->unregistered_request_matcher =
new AllocatingRequestMatcherBatch(server, cq, std::move(allocator));
}
}; // namespace grpc_core
const grpc_channel_filter grpc_server_top_filter = {
server_start_transport_stream_op_batch,
grpc_channel_next_op,
@ -1224,12 +1384,8 @@ void* grpc_server_register_method(
flags);
return nullptr;
}
m = static_cast<registered_method*>(gpr_zalloc(sizeof(registered_method)));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m = new registered_method(method, host, payload_handling, flags);
m->next = server->registered_methods;
m->payload_handling = payload_handling;
m->flags = flags;
server->registered_methods = m;
return m;
}
@ -1250,9 +1406,13 @@ void grpc_server_start(grpc_server* server) {
grpc_cq_pollset(server->cqs[i]);
}
}
server->unregistered_request_matcher = new RealRequestMatcher(server);
if (server->unregistered_request_matcher == nullptr) {
server->unregistered_request_matcher = new RealRequestMatcher(server);
}
for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
rm->matcher = new RealRequestMatcher(server);
if (rm->matcher == nullptr) {
rm->matcher.reset(new RealRequestMatcher(server));
}
}
gpr_mu_lock(&server->mu_global);
@ -1523,15 +1683,51 @@ void grpc_server_add_listener(
server->listeners = l;
}
namespace {
grpc_call_error ValidateServerRequest(
grpc_completion_queue* cq_for_notification, void* tag,
grpc_byte_buffer** optional_payload, registered_method* rm) {
if ((rm == nullptr && optional_payload != nullptr) ||
((rm != nullptr) && ((optional_payload == nullptr) !=
(rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) {
return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
}
if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
}
return GRPC_CALL_OK;
}
grpc_call_error ValidateServerRequestAndCq(
size_t* cq_idx, grpc_server* server,
grpc_completion_queue* cq_for_notification, void* tag,
grpc_byte_buffer** optional_payload, registered_method* rm) {
size_t idx;
for (idx = 0; idx < server->cq_count; idx++) {
if (server->cqs[idx] == cq_for_notification) {
break;
}
}
if (idx == server->cq_count) {
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
grpc_call_error error =
ValidateServerRequest(cq_for_notification, tag, optional_payload, rm);
if (error != GRPC_CALL_OK) {
return error;
}
*cq_idx = idx;
return GRPC_CALL_OK;
}
} // namespace
grpc_call_error grpc_server_request_call(
grpc_server* server, grpc_call** call, grpc_call_details* details,
grpc_metadata_array* initial_metadata,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
GRPC_API_TRACE(
"grpc_server_request_call("
@ -1540,33 +1736,17 @@ grpc_call_error grpc_server_request_call(
7,
(server, call, details, initial_metadata, cq_bound_to_call,
cq_for_notification, tag));
size_t cq_idx;
for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
if (server->cqs[cq_idx] == cq_for_notification) {
break;
}
}
if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
grpc_call_error error = ValidateServerRequestAndCq(
&cq_idx, server, cq_for_notification, tag, nullptr, nullptr);
if (error != GRPC_CALL_OK) {
return error;
}
if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
gpr_free(rc);
error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
goto done;
}
details->reserved = nullptr;
rc->type = BATCH_CALL;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->call = call;
rc->data.batch.details = details;
rc->initial_metadata = initial_metadata;
error = queue_call_request(server, cq_idx, rc);
done:
return error;
requested_call* rc = new requested_call(tag, cq_bound_to_call, call,
initial_metadata, details);
return queue_call_request(server, cq_idx, rc);
}
grpc_call_error grpc_server_request_registered_call(
@ -1577,7 +1757,6 @@ grpc_call_error grpc_server_request_registered_call(
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
registered_method* rm = static_cast<registered_method*>(rmp);
GRPC_API_TRACE(
"grpc_server_request_registered_call("
@ -1589,33 +1768,15 @@ grpc_call_error grpc_server_request_registered_call(
cq_bound_to_call, cq_for_notification, tag));
size_t cq_idx;
for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
if (server->cqs[cq_idx] == cq_for_notification) {
break;
}
}
if (cq_idx == server->cq_count) {
gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
if ((optional_payload == nullptr) !=
(rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
gpr_free(rc);
return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
grpc_call_error error = ValidateServerRequestAndCq(
&cq_idx, server, cq_for_notification, tag, optional_payload, rm);
if (error != GRPC_CALL_OK) {
return error;
}
if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
gpr_free(rc);
return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
}
rc->type = REGISTERED_CALL;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->call = call;
rc->data.registered.method = rm;
rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
requested_call* rc =
new requested_call(tag, cq_bound_to_call, call, initial_metadata, rm,
deadline, optional_payload);
return queue_call_request(server, cq_idx, rc);
}

@ -64,4 +64,36 @@ int grpc_server_has_open_connections(grpc_server* server);
void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
size_t* pollset_count);
namespace grpc_core {
// An object to represent the most relevant characteristics of a newly-allocated
// call object when using an AllocatingRequestMatcherBatch
struct ServerBatchCallAllocation {
grpc_experimental_completion_queue_functor* tag;
grpc_call** call;
grpc_metadata_array* initial_metadata;
grpc_call_details* details;
};
// An object to represent the most relevant characteristics of a newly-allocated
// call object when using an AllocatingRequestMatcherRegistered
struct ServerRegisteredCallAllocation {
grpc_experimental_completion_queue_functor* tag;
grpc_call** call;
grpc_metadata_array* initial_metadata;
gpr_timespec* deadline;
grpc_byte_buffer** optional_payload;
};
// Functions to specify that a specific registered method or the unregistered
// collection should use a specific allocator for request matching.
void SetServerRegisteredMethodAllocator(
grpc_server* server, grpc_completion_queue* cq, void* method_tag,
std::function<ServerRegisteredCallAllocation()> allocator);
void SetServerBatchMethodAllocator(
grpc_server* server, grpc_completion_queue* cq,
std::function<ServerBatchCallAllocation()> allocator);
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */

@ -47,6 +47,7 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
#include "src/cpp/server/health/default_health_check_service.h"
@ -62,17 +63,6 @@ namespace {
// max-threads set) to the server builder.
#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
// How many callback requests of each method should we pre-register at start
#define DEFAULT_CALLBACK_REQS_PER_METHOD 512
// What is the (soft) limit for outstanding requests in the server
#define SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING 30000
// If the number of unmatched requests for a method drops below this amount, try
// to allocate extra unless it pushes the total number of callbacks above the
// soft maximum
#define SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD 128
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
~DefaultGlobalCallbacks() override {}
@ -544,74 +534,61 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
grpc_completion_queue* cq_;
};
class Server::CallbackRequestBase : public grpc::internal::CompletionQueueTag {
public:
virtual ~CallbackRequestBase() {}
virtual bool Request() = 0;
};
template <class ServerContextType>
class Server::CallbackRequest final : public Server::CallbackRequestBase {
class Server::CallbackRequest final
: public grpc::internal::CompletionQueueTag {
public:
static_assert(
std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value,
"ServerContextType must be derived from CallbackServerContext");
// The constructor needs to know the server for this callback request and its
// index in the server's request count array to allow for proper dynamic
// requesting of incoming RPCs. For codegen services, the values of method and
// method_tag represent the defined characteristics of the method being
// requested. For generic services, method and method_tag are nullptr since
// these services don't have pre-defined methods or method registration tags.
CallbackRequest(Server* server, size_t method_idx,
grpc::internal::RpcServiceMethod* method, void* method_tag)
// For codegen services, the value of method represents the defined
// characteristics of the method being requested. For generic services, method
// is nullptr since these services don't have pre-defined methods.
CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method,
grpc::CompletionQueue* cq,
grpc_core::ServerRegisteredCallAllocation* data)
: server_(server),
method_index_(method_idx),
method_(method),
method_tag_(method_tag),
has_request_payload_(
method_ != nullptr &&
(method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC ||
method->method_type() ==
grpc::internal::RpcMethod::SERVER_STREAMING)),
cq_(server->CallbackCQ()),
has_request_payload_(method->method_type() ==
grpc::internal::RpcMethod::NORMAL_RPC ||
method->method_type() ==
grpc::internal::RpcMethod::SERVER_STREAMING),
cq_(cq),
tag_(this) {
server_->callback_reqs_outstanding_++;
server->Ref();
Setup();
data->tag = &tag_;
data->call = &call_;
data->initial_metadata = &request_metadata_;
data->deadline = &deadline_;
data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
}
~CallbackRequest() {
Clear();
// The counter of outstanding requests must be decremented
// under a lock in case it causes the server shutdown.
grpc::internal::MutexLock l(&server_->callback_reqs_mu_);
if (--server_->callback_reqs_outstanding_ == 0) {
server_->callback_reqs_done_cv_.Signal();
// For generic services, method is nullptr since these services don't have
// pre-defined methods.
CallbackRequest(Server* server, grpc::CompletionQueue* cq,
grpc_core::ServerBatchCallAllocation* data)
: server_(server),
method_(nullptr),
has_request_payload_(false),
cq_(cq),
tag_(this) {
server->Ref();
Setup();
data->tag = &tag_;
data->call = &call_;
data->initial_metadata = &request_metadata_;
if (!call_details_) {
call_details_ = new grpc_call_details;
grpc_call_details_init(call_details_);
}
data->details = call_details_;
}
bool Request() override {
if (method_tag_) {
if (grpc_server_request_registered_call(
server_->c_server(), method_tag_, &call_, &deadline_,
&request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(),
cq_->cq(), static_cast<void*>(&tag_)) != GRPC_CALL_OK) {
return false;
}
} else {
if (!call_details_) {
call_details_ = new grpc_call_details;
grpc_call_details_init(call_details_);
}
if (grpc_server_request_call(server_->c_server(), &call_, call_details_,
&request_metadata_, cq_->cq(), cq_->cq(),
static_cast<void*>(&tag_)) != GRPC_CALL_OK) {
return false;
}
}
return true;
~CallbackRequest() {
Clear();
server_->UnrefWithPossibleNotify();
}
// Needs specialization to account for different processing of metadata
@ -655,12 +632,6 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase {
GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == req_);
int count =
static_cast<int>(gpr_atm_no_barrier_fetch_add(
&req_->server_
->callback_unmatched_reqs_count_[req_->method_index_],
-1)) -
1;
if (!ok) {
// The call has been shutdown.
// Delete its contents to free up the request.
@ -668,24 +639,6 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase {
return;
}
// If this was the last request in the list or it is below the soft
// minimum and there are spare requests available, set up a new one.
if (count == 0 || (count < SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD &&
req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) {
auto* new_req = new CallbackRequest<ServerContextType>(
req_->server_, req_->method_index_, req_->method_,
req_->method_tag_);
if (!new_req->Request()) {
// The server must have just decided to shutdown.
gpr_atm_no_barrier_fetch_add(
&new_req->server_
->callback_unmatched_reqs_count_[new_req->method_index_],
-1);
delete new_req;
}
}
// Bind the call, deadline, and metadata from what we got
req_->ctx_.set_call(req_->call_);
req_->ctx_.cq_ = req_->cq_;
@ -740,28 +693,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase {
: req_->server_->generic_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
req_->handler_data_, [this] {
// Recycle this request if there aren't too many outstanding.
// Note that we don't have to worry about a case where there
// are no requests waiting to match for this method since that
// is already taken care of when binding a request to a call.
// TODO(vjpai): Also don't recycle this request if the dynamic
// load no longer justifies it. Consider measuring
// dynamic load and setting a target accordingly.
if (req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING) {
req_->Clear();
req_->Setup();
} else {
// We can free up this request because there are too many
delete req_;
return;
}
if (!req_->Request()) {
// The server must have just decided to shutdown.
delete req_;
}
}));
req_->handler_data_, [this] { delete req_; }));
}
};
@ -779,8 +711,6 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase {
}
void Setup() {
gpr_atm_no_barrier_fetch_add(
&server_->callback_unmatched_reqs_count_[method_index_], 1);
grpc_metadata_array_init(&request_metadata_);
ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME));
request_payload_ = nullptr;
@ -790,9 +720,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase {
}
Server* const server_;
const size_t method_index_;
grpc::internal::RpcServiceMethod* const method_;
void* const method_tag_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
void* request_;
@ -1055,13 +983,6 @@ Server::~Server() {
}
grpc_server_destroy(server_);
for (auto& per_method_count : callback_unmatched_reqs_count_) {
// There should be no more unmatched callbacks for any method
// as each request is failed by Shutdown. Check that this actually
// happened
GPR_ASSERT(static_cast<int>(gpr_atm_no_barrier_load(&per_method_count)) ==
0);
}
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@ -1139,17 +1060,16 @@ bool Server::RegisterService(const grpc::string* host, grpc::Service* service) {
value->AddSyncMethod(method.get(), method_registration_tag);
}
} else {
// a callback method. Register at least some callback requests
callback_unmatched_reqs_count_.push_back(0);
auto method_index = callback_unmatched_reqs_count_.size() - 1;
// TODO(vjpai): Register these dynamically based on need
for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
callback_reqs_to_start_.push_back(
new CallbackRequest<grpc::CallbackServerContext>(
this, method_index, method.get(), method_registration_tag));
}
// Enqueue it so that it will be Request'ed later after all request
// matchers are created at core server startup
has_callback_methods_ = true;
grpc::internal::RpcServiceMethod* method_value = method.get();
grpc::CompletionQueue* cq = CallbackCQ();
grpc_core::SetServerRegisteredMethodAllocator(
server_, cq->cq(), method_registration_tag, [this, cq, method_value] {
grpc_core::ServerRegisteredCallAllocation result;
new CallbackRequest<grpc::CallbackServerContext>(this, method_value,
cq, &result);
return result;
});
}
method_name = method->name();
@ -1183,14 +1103,12 @@ void Server::RegisterCallbackGenericService(
has_callback_generic_service_ = true;
generic_handler_.reset(service->Handler());
callback_unmatched_reqs_count_.push_back(0);
auto method_index = callback_unmatched_reqs_count_.size() - 1;
// TODO(vjpai): Register these dynamically based on need
for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
callback_reqs_to_start_.push_back(
new CallbackRequest<grpc::GenericCallbackServerContext>(
this, method_index, nullptr, nullptr));
}
grpc::CompletionQueue* cq = CallbackCQ();
grpc_core::SetServerBatchMethodAllocator(server_, cq->cq(), [this, cq] {
grpc_core::ServerBatchCallAllocation result;
new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result);
return result;
});
}
int Server::AddListeningPort(const grpc::string& addr,
@ -1201,6 +1119,31 @@ int Server::AddListeningPort(const grpc::string& addr,
return port;
}
void Server::Ref() {
shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed);
}
void Server::UnrefWithPossibleNotify() {
if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
// No refs outstanding means that shutdown has been initiated and no more
// callback requests are outstanding.
grpc::internal::MutexLock lock(&mu_);
GPR_ASSERT(shutdown_);
shutdown_done_ = true;
shutdown_done_cv_.Signal();
}
}
void Server::UnrefAndWaitLocked() {
if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
shutdown_done_ = true;
return; // no need to wait on CV since done condition already set
}
shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; });
}
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_);
global_callbacks_->PreServerStart(this);
@ -1236,7 +1179,7 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// If this server uses callback methods, then create a callback generic
// service to handle any unimplemented methods using the default reactor
// creator
if (!callback_reqs_to_start_.empty() && !has_callback_generic_service_) {
if (has_callback_methods_ && !has_callback_generic_service_) {
unimplemented_service_.reset(new grpc::CallbackGenericService);
RegisterCallbackGenericService(unimplemented_service_.get());
}
@ -1276,11 +1219,6 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
value->Start();
}
for (auto* cbreq : callback_reqs_to_start_) {
GPR_ASSERT(cbreq->Request());
}
callback_reqs_to_start_.clear();
if (default_health_check_service_impl != nullptr) {
default_health_check_service_impl->StartServingThread();
}
@ -1333,23 +1271,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
value->Wait();
}
// Wait for all outstanding callback requests to complete
// (whether waiting for a match or already active).
// We know that no new requests will be created after this point
// because they are only created at server startup time or when
// we have a successful match on a request. During the shutdown phase,
// requests that have not yet matched will be failed rather than
// allowed to succeed, which will cause the server to delete the
// request and decrement the count. Possibly a request will match before
// the shutdown but then find that shutdown has already started by the
// time it tries to register a new request. In that case, the registration
// will report a failure, indicating a shutdown and again we won't end
// up incrementing the counter.
{
grpc::internal::MutexLock cblock(&callback_reqs_mu_);
callback_reqs_done_cv_.WaitUntil(
&callback_reqs_mu_, [this] { return callback_reqs_outstanding_ == 0; });
}
// Drop the shutdown ref and wait for all other refs to drop as well.
UnrefAndWaitLocked();
// Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
// will delete itself at true shutdown.
@ -1421,15 +1344,17 @@ grpc::CompletionQueue* Server::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-server CQ registered
grpc::internal::MutexLock l(&mu_);
if (callback_cq_ == nullptr) {
auto* shutdown_callback = new grpc::ShutdownCallback;
callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});
// Transfer ownership of the new cq to its own shutdown callback
shutdown_callback->TakeCQ(callback_cq_);
if (callback_cq_ != nullptr) {
return callback_cq_;
}
auto* shutdown_callback = new grpc::ShutdownCallback;
callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});
// Transfer ownership of the new cq to its own shutdown callback
shutdown_callback->TakeCQ(callback_cq_);
return callback_cq_;
}

Loading…
Cancel
Save