Merge pull request #22670 from vjpai/request_matcher_interface

Virtualize RequestMatcher to enable customized matchers
reviewable/pr22715/r1
Vijay Pai 5 years ago committed by GitHub
commit 0093accaca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 327
      src/core/lib/surface/server.cc

@ -132,7 +132,54 @@ enum call_state {
ZOMBIED
};
struct request_matcher;
struct call_data;
// RPCs that come in from the transport must be matched against RPC requests
// from the application. An incoming request from the application can be matched
// to an RPC that has already arrived or can be queued up for later use.
// Likewise, an RPC coming in from the transport can either be matched to a
// request that already arrived from the application or can be queued up for
// later use (marked pending). If there is a match, the request's tag is posted
// on the request's notification CQ.
//
// RequestMatcherInterface is the base class to provide this functionality.
class RequestMatcherInterface {
public:
virtual ~RequestMatcherInterface() {}
// Unref the calls associated with any incoming RPCs in the pending queue (not
// yet matched to an application-requested RPC).
virtual void ZombifyPending() = 0;
// Mark all application-requested RPCs failed if they have not been matched to
// an incoming RPC. The error parameter indicates why the RPCs are being
// failed (always server shutdown in all current implementations).
virtual void KillRequests(grpc_error* error) = 0;
// How many request queues are supported by this matcher. This is an abstract
// concept that essentially maps to gRPC completion queues.
virtual size_t request_queue_count() const = 0;
// This function is invoked when the application requests a new RPC whose
// information is in the call parameter. The request_queue_index marks the
// queue onto which to place this RPC, and is typically associated with a gRPC
// CQ. If there are pending RPCs waiting to be matched, publish one (match it
// and notify the CQ).
virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
requested_call* call) = 0;
// This function is invoked on an incoming RPC, represented by the calld
// object. The RequestMatcher will try to match it against an
// application-requested RPC if possible or will place it in the pending queue
// otherwise. To enable some measure of fairness between server CQs, the match
// is done starting at the start_request_queue_index parameter in a cyclic
// order rather than always starting at 0.
virtual void MatchOrQueue(size_t start_request_queue_index,
call_data* calld) = 0;
// Returns the server associated with this request matcher
virtual grpc_server* server() const = 0;
};
struct call_data {
call_data(grpc_call_element* elem, const grpc_call_element_args& args)
@ -175,7 +222,7 @@ struct call_data {
grpc_metadata_array initial_metadata =
grpc_metadata_array(); // Zero-initialize the C struct.
request_matcher* matcher = nullptr;
RequestMatcherInterface* matcher = nullptr;
grpc_byte_buffer* payload = nullptr;
grpc_closure got_initial_metadata;
@ -194,20 +241,15 @@ struct call_data {
grpc_core::CallCombiner* call_combiner;
};
struct request_matcher {
grpc_server* server;
call_data* pending_head;
call_data* pending_tail;
LockedMultiProducerSingleConsumerQueue* requests_per_cq;
};
struct registered_method {
char* method;
char* host;
grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
/* one request matcher per method */
request_matcher matcher;
// TODO(vjpai): Move this to a unique_ptr once this has a real
// constructor/destructor
RequestMatcherInterface* matcher = nullptr;
registered_method* next;
};
@ -245,7 +287,9 @@ struct grpc_server {
registered_method* registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
// TODO(vjpai): Convert to a std::unique_ptr once grpc_server has a real
// constructor and destructor.
RequestMatcherInterface* unregistered_request_matcher;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
@ -268,13 +312,19 @@ struct grpc_server {
(((channel_data*)(elem)->channel_data)->server)
namespace {
void publish_new_rpc(void* arg, grpc_error* error);
void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
requested_call* rc);
void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
grpc_error* error);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
void maybe_finish_shutdown(grpc_server* server);
void kill_zombie(void* elem, grpc_error* /*error*/) {
grpc_call_unref(
grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
}
/*
* channel broadcaster
*/
@ -347,33 +397,26 @@ void channel_broadcaster_shutdown(channel_broadcaster* cb, bool send_goaway,
* request_matcher
*/
void request_matcher_init(request_matcher* rm, grpc_server* server) {
rm->server = server;
rm->pending_head = rm->pending_tail = nullptr;
rm->requests_per_cq = static_cast<LockedMultiProducerSingleConsumerQueue*>(
gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
for (size_t i = 0; i < server->cq_count; i++) {
new (&rm->requests_per_cq[i]) LockedMultiProducerSingleConsumerQueue();
}
}
// The RealRequestMatcher is an implementation of RequestMatcherInterface that
// actually uses all the features of RequestMatcherInterface: expecting the
// application to explicitly request RPCs and then matching those to incoming
// RPCs, along with a slow path by which incoming RPCs are put on a locked
// pending list if they aren't able to be matched to an application request.
class RealRequestMatcher : public RequestMatcherInterface {
public:
explicit RealRequestMatcher(grpc_server* server)
: server_(server), requests_per_cq_(server->cq_count) {}
void request_matcher_destroy(request_matcher* rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) {
GPR_ASSERT(rm->requests_per_cq[i].Pop() == nullptr);
rm->requests_per_cq[i].~LockedMultiProducerSingleConsumerQueue();
~RealRequestMatcher() override {
for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
GPR_ASSERT(queue.Pop() == nullptr);
}
gpr_free(rm->requests_per_cq);
}
void kill_zombie(void* elem, grpc_error* /*error*/) {
grpc_call_unref(
grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
}
void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
while (rm->pending_head) {
call_data* calld = rm->pending_head;
rm->pending_head = calld->pending_next;
void ZombifyPending() override {
while (pending_head_ != nullptr) {
call_data* calld = pending_head_;
pending_head_ = calld->pending_next;
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
@ -384,18 +427,109 @@ void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
}
}
void request_matcher_kill_requests(grpc_server* server, request_matcher* rm,
grpc_error* error) {
void KillRequests(grpc_error* error) override {
for (size_t i = 0; i < requests_per_cq_.size(); i++) {
requested_call* rc;
for (size_t i = 0; i < server->cq_count; i++) {
while ((rc = reinterpret_cast<requested_call*>(
rm->requests_per_cq[i].Pop())) != nullptr) {
fail_call(server, i, rc, GRPC_ERROR_REF(error));
requests_per_cq_[i].Pop())) != nullptr) {
fail_call(server_, i, rc, GRPC_ERROR_REF(error));
}
}
GRPC_ERROR_UNREF(error);
}
size_t request_queue_count() const override {
return requests_per_cq_.size();
}
void RequestCallWithPossiblePublish(size_t request_queue_index,
requested_call* call) override {
if (requests_per_cq_[request_queue_index].Push(call->mpscq_node.get())) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server_->mu_call);
call_data* calld;
while ((calld = pending_head_) != nullptr) {
requested_call* rc = reinterpret_cast<requested_call*>(
requests_per_cq_[request_queue_index].Pop());
if (rc == nullptr) break;
pending_head_ = calld->pending_next;
gpr_mu_unlock(&server_->mu_call);
if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
// Zombied Call
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
} else {
publish_call(server_, calld, request_queue_index, rc);
}
gpr_mu_lock(&server_->mu_call);
}
gpr_mu_unlock(&server_->mu_call);
}
}
void MatchOrQueue(size_t start_request_queue_index,
call_data* calld) override {
for (size_t i = 0; i < requests_per_cq_.size(); i++) {
size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
requested_call* rc =
reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].TryPop());
if (rc == nullptr) {
continue;
} else {
GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
publish_call(server_, calld, cq_idx, rc);
return; /* early out */
}
}
/* no cq to take the request found: queue it on the slow list */
GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
gpr_mu_lock(&server_->mu_call);
// We need to ensure that all the queues are empty. We do this under
// the server mu_call lock to ensure that if something is added to
// an empty request queue, it will block until the call is actually
// added to the pending list.
for (size_t i = 0; i < requests_per_cq_.size(); i++) {
size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
requested_call* rc =
reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].Pop());
if (rc == nullptr) {
continue;
} else {
gpr_mu_unlock(&server_->mu_call);
GRPC_STATS_INC_SERVER_CQS_CHECKED(i + requests_per_cq_.size());
gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
publish_call(server_, calld, cq_idx, rc);
return; /* early out */
}
}
gpr_atm_no_barrier_store(&calld->state, PENDING);
if (pending_head_ == nullptr) {
pending_tail_ = pending_head_ = calld;
} else {
pending_tail_->pending_next = calld;
pending_tail_ = calld;
}
gpr_mu_unlock(&server_->mu_call);
}
grpc_server* server() const override { return server_; }
private:
grpc_server* const server_;
call_data* pending_head_ = nullptr;
call_data* pending_tail_ = nullptr;
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
};
/*
* server proper
*/
@ -412,16 +546,12 @@ void server_delete(grpc_server* server) {
gpr_cv_destroy(&server->starting_cv);
while ((rm = server->registered_methods) != nullptr) {
server->registered_methods = rm->next;
if (server->started) {
request_matcher_destroy(&rm->matcher);
}
delete rm->matcher;
gpr_free(rm->method);
gpr_free(rm->host);
gpr_free(rm);
}
if (server->started) {
request_matcher_destroy(&server->unregistered_request_matcher);
}
delete server->unregistered_request_matcher;
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
}
@ -512,8 +642,8 @@ void publish_new_rpc(void* arg, grpc_error* error) {
grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(call_elem->call_data);
channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
request_matcher* rm = calld->matcher;
grpc_server* server = rm->server;
RequestMatcherInterface* rm = calld->matcher;
grpc_server* server = rm->server();
if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
@ -526,55 +656,11 @@ void publish_new_rpc(void* arg, grpc_error* error) {
return;
}
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
requested_call* rc =
reinterpret_cast<requested_call*>(rm->requests_per_cq[cq_idx].TryPop());
if (rc == nullptr) {
continue;
} else {
GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
publish_call(server, calld, cq_idx, rc);
return; /* early out */
}
}
/* no cq to take the request found: queue it on the slow list */
GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
gpr_mu_lock(&server->mu_call);
// We need to ensure that all the queues are empty. We do this under
// the server mu_call lock to ensure that if something is added to
// an empty request queue, it will block until the call is actually
// added to the pending list.
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
requested_call* rc =
reinterpret_cast<requested_call*>(rm->requests_per_cq[cq_idx].Pop());
if (rc == nullptr) {
continue;
} else {
gpr_mu_unlock(&server->mu_call);
GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count);
gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
publish_call(server, calld, cq_idx, rc);
return; /* early out */
}
}
gpr_atm_no_barrier_store(&calld->state, PENDING);
if (rm->pending_head == nullptr) {
rm->pending_tail = rm->pending_head = calld;
} else {
rm->pending_tail->pending_next = calld;
rm->pending_tail = calld;
}
gpr_mu_unlock(&server->mu_call);
rm->MatchOrQueue(chand->cq_idx, calld);
}
void finish_start_new_rpc(
grpc_server* server, grpc_call_element* elem, request_matcher* rm,
grpc_server* server, grpc_call_element* elem, RequestMatcherInterface* rm,
grpc_server_register_method_payload_handling payload_handling) {
call_data* calld = static_cast<call_data*>(elem->call_data);
@ -632,7 +718,7 @@ void start_new_rpc(grpc_call_element* elem) {
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
finish_start_new_rpc(server, elem, rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling);
return;
}
@ -649,12 +735,12 @@ void start_new_rpc(grpc_call_element* elem) {
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
finish_start_new_rpc(server, elem, rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling);
return;
}
}
finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,
finish_start_new_rpc(server, elem, server->unregistered_request_matcher,
GRPC_SRM_PAYLOAD_NONE);
}
@ -683,15 +769,12 @@ int num_channels(grpc_server* server) {
void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
if (server->started) {
request_matcher_kill_requests(server, &server->unregistered_request_matcher,
GRPC_ERROR_REF(error));
request_matcher_zombify_all_pending_calls(
&server->unregistered_request_matcher);
server->unregistered_request_matcher->KillRequests(GRPC_ERROR_REF(error));
server->unregistered_request_matcher->ZombifyPending();
for (registered_method* rm = server->registered_methods; rm;
rm = rm->next) {
request_matcher_kill_requests(server, &rm->matcher,
GRPC_ERROR_REF(error));
request_matcher_zombify_all_pending_calls(&rm->matcher);
rm->matcher->KillRequests(GRPC_ERROR_REF(error));
rm->matcher->ZombifyPending();
}
}
GRPC_ERROR_UNREF(error);
@ -1007,45 +1090,21 @@ void listener_destroy_done(void* s, grpc_error* /*error*/) {
grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
requested_call* rc) {
call_data* calld = nullptr;
request_matcher* rm = nullptr;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK;
}
RequestMatcherInterface* rm;
switch (rc->type) {
case BATCH_CALL:
rm = &server->unregistered_request_matcher;
rm = server->unregistered_request_matcher;
break;
case REGISTERED_CALL:
rm = &rc->data.registered.method->matcher;
rm = rc->data.registered.method->matcher;
break;
}
if (rm->requests_per_cq[cq_idx].Push(rc->mpscq_node.get())) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != nullptr) {
rc = reinterpret_cast<requested_call*>(rm->requests_per_cq[cq_idx].Pop());
if (rc == nullptr) break;
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
// Zombied Call
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
} else {
publish_call(server, calld, cq_idx, rc);
}
gpr_mu_lock(&server->mu_call);
}
gpr_mu_unlock(&server->mu_call);
}
rm->RequestCallWithPossiblePublish(cq_idx, rc);
return GRPC_CALL_OK;
}
@ -1191,9 +1250,9 @@ void grpc_server_start(grpc_server* server) {
grpc_cq_pollset(server->cqs[i]);
}
}
request_matcher_init(&server->unregistered_request_matcher, server);
server->unregistered_request_matcher = new RealRequestMatcher(server);
for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_init(&rm->matcher, server);
rm->matcher = new RealRequestMatcher(server);
}
gpr_mu_lock(&server->mu_global);

Loading…
Cancel
Save