From db151dc1680bbaf86dd6a0d9741987c8033745ef Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 17 Apr 2020 14:04:11 -0700 Subject: [PATCH] Virtualize RequestMatcher to enable customized matchers --- src/core/lib/surface/server.cc | 349 +++++++++++++++++++-------------- 1 file changed, 204 insertions(+), 145 deletions(-) diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index f2e9d031188..998eb6dd4a9 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -132,7 +132,54 @@ enum call_state { ZOMBIED }; -struct request_matcher; +struct call_data; + +// RPCs that come in from the transport must be matched against RPC requests +// from the application. An incoming request from the application can be matched +// to an RPC that has already arrived or can be queued up for later use. +// Likewise, an RPC coming in from the transport can either be matched to a +// request that already arrived from the application or can be queued up for +// later use (marked pending). If there is a match, the request's tag is posted +// on the request's notification CQ. +// +// RequestMatcherInterface is the base class to provide this functionality. +class RequestMatcherInterface { + public: + virtual ~RequestMatcherInterface() {} + + // Unref the calls associated with any incoming RPCs in the pending queue (not + // yet matched to an application-requested RPC). + virtual void ZombifyPending() = 0; + + // Mark all application-requested RPCs failed if they have not been matched to + // an incoming RPC. The error parameter indicates why the RPCs are being + // failed (always server shutdown in all current implementations). + virtual void KillRequests(grpc_error* error) = 0; + + // How many request queues are supported by this matcher. This is an abstract + // concept that essentially maps to gRPC completion queues. + virtual size_t request_queue_count() const = 0; + + // This function is invoked when the application requests a new RPC whose + // information is in the call parameter. The request_queue_index marks the + // queue onto which to place this RPC, and is typically associated with a gRPC + // CQ. If there are pending RPCs waiting to be matched, publish one (match it + // and notify the CQ). + virtual void RequestCallWithPossiblePublish(size_t request_queue_index, + requested_call* call) = 0; + + // This function is invoked on an incoming RPC, represented by the calld + // object. The RequestMatcher will try to match it against an + // application-requested RPC if possible or will place it in the pending queue + // otherwise. To enable some measure of fairness between server CQs, the match + // is done starting at the start_request_queue_index parameter in a cyclic + // order rather than always starting at 0. + virtual void MatchOrQueue(size_t start_request_queue_index, + call_data* calld) = 0; + + // Returns the server associated with this request matcher + virtual grpc_server* server() const = 0; +}; struct call_data { call_data(grpc_call_element* elem, const grpc_call_element_args& args) @@ -175,7 +222,7 @@ struct call_data { grpc_metadata_array initial_metadata = grpc_metadata_array(); // Zero-initialize the C struct. - request_matcher* matcher = nullptr; + RequestMatcherInterface* matcher = nullptr; grpc_byte_buffer* payload = nullptr; grpc_closure got_initial_metadata; @@ -194,20 +241,15 @@ struct call_data { grpc_core::CallCombiner* call_combiner; }; -struct request_matcher { - grpc_server* server; - call_data* pending_head; - call_data* pending_tail; - LockedMultiProducerSingleConsumerQueue* requests_per_cq; -}; - struct registered_method { char* method; char* host; grpc_server_register_method_payload_handling payload_handling; uint32_t flags; /* one request matcher per method */ - request_matcher matcher; + // TODO(vjpai): Move this to a unique_ptr once this has a real + // constructor/destructor + RequestMatcherInterface* matcher = nullptr; registered_method* next; }; @@ -245,7 +287,9 @@ struct grpc_server { registered_method* registered_methods; /** one request matcher for unregistered methods */ - request_matcher unregistered_request_matcher; + // TODO(vjpai): Convert to a std::unique_ptr once grpc_server has a real + // constructor and destructor. + RequestMatcherInterface* unregistered_request_matcher; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -268,13 +312,19 @@ struct grpc_server { (((channel_data*)(elem)->channel_data)->server) namespace { -void publish_new_rpc(void* arg, grpc_error* error); +void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, + requested_call* rc); void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc, grpc_error* error); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ void maybe_finish_shutdown(grpc_server* server); +void kill_zombie(void* elem, grpc_error* /*error*/) { + grpc_call_unref( + grpc_call_from_top_element(static_cast(elem))); +} + /* * channel broadcaster */ @@ -347,54 +397,138 @@ void channel_broadcaster_shutdown(channel_broadcaster* cb, bool send_goaway, * request_matcher */ -void request_matcher_init(request_matcher* rm, grpc_server* server) { - rm->server = server; - rm->pending_head = rm->pending_tail = nullptr; - rm->requests_per_cq = static_cast( - gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count)); - for (size_t i = 0; i < server->cq_count; i++) { - new (&rm->requests_per_cq[i]) LockedMultiProducerSingleConsumerQueue(); +// The RealRequestMatcher is an implementation of RequestMatcherInterface that +// actually uses all the features of RequestMatcherInterface: expecting the +// application to explicitly request RPCs and then matching those to incoming +// RPCs, along with a slow path by which incoming RPCs are put on a locked +// pending list if they aren't able to be matched to an application request. +class RealRequestMatcher : public RequestMatcherInterface { + public: + explicit RealRequestMatcher(grpc_server* server) + : server_(server), requests_per_cq_(server->cq_count) {} + + ~RealRequestMatcher() override { + for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { + GPR_ASSERT(queue.Pop() == nullptr); + } } -} -void request_matcher_destroy(request_matcher* rm) { - for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(rm->requests_per_cq[i].Pop() == nullptr); - rm->requests_per_cq[i].~LockedMultiProducerSingleConsumerQueue(); + void ZombifyPending() override { + while (pending_head_ != nullptr) { + call_data* calld = pending_head_; + pending_head_ = calld->pending_next; + gpr_atm_no_barrier_store(&calld->state, ZOMBIED); + GRPC_CLOSURE_INIT( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); + } } - gpr_free(rm->requests_per_cq); -} -void kill_zombie(void* elem, grpc_error* /*error*/) { - grpc_call_unref( - grpc_call_from_top_element(static_cast(elem))); -} + void KillRequests(grpc_error* error) override { + for (size_t i = 0; i < requests_per_cq_.size(); i++) { + requested_call* rc; + while ((rc = reinterpret_cast( + requests_per_cq_[i].Pop())) != nullptr) { + fail_call(server_, i, rc, GRPC_ERROR_REF(error)); + } + } + GRPC_ERROR_UNREF(error); + } -void request_matcher_zombify_all_pending_calls(request_matcher* rm) { - while (rm->pending_head) { - call_data* calld = rm->pending_head; - rm->pending_head = calld->pending_next; - gpr_atm_no_barrier_store(&calld->state, ZOMBIED); - GRPC_CLOSURE_INIT( - &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), - grpc_schedule_on_exec_ctx); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, - GRPC_ERROR_NONE); + size_t request_queue_count() const override { + return requests_per_cq_.size(); + } + + void RequestCallWithPossiblePublish(size_t request_queue_index, + requested_call* call) override { + if (requests_per_cq_[request_queue_index].Push(call->mpscq_node.get())) { + /* this was the first queued request: we need to lock and start + matching calls */ + gpr_mu_lock(&server_->mu_call); + call_data* calld; + while ((calld = pending_head_) != nullptr) { + requested_call* rc = reinterpret_cast( + requests_per_cq_[request_queue_index].Pop()); + if (rc == nullptr) break; + pending_head_ = calld->pending_next; + gpr_mu_unlock(&server_->mu_call); + if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { + // Zombied Call + GRPC_CLOSURE_INIT( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); + } else { + publish_call(server_, calld, request_queue_index, rc); + } + gpr_mu_lock(&server_->mu_call); + } + gpr_mu_unlock(&server_->mu_call); + } } -} -void request_matcher_kill_requests(grpc_server* server, request_matcher* rm, - grpc_error* error) { - requested_call* rc; - for (size_t i = 0; i < server->cq_count; i++) { - while ((rc = reinterpret_cast( - rm->requests_per_cq[i].Pop())) != nullptr) { - fail_call(server, i, rc, GRPC_ERROR_REF(error)); + void MatchOrQueue(size_t start_request_queue_index, + call_data* calld) override { + for (size_t i = 0; i < requests_per_cq_.size(); i++) { + size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size(); + requested_call* rc = + reinterpret_cast(requests_per_cq_[cq_idx].TryPop()); + if (rc == nullptr) { + continue; + } else { + GRPC_STATS_INC_SERVER_CQS_CHECKED(i); + gpr_atm_no_barrier_store(&calld->state, ACTIVATED); + publish_call(server_, calld, cq_idx, rc); + return; /* early out */ + } + } + + /* no cq to take the request found: queue it on the slow list */ + GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(); + gpr_mu_lock(&server_->mu_call); + + // We need to ensure that all the queues are empty. We do this under + // the server mu_call lock to ensure that if something is added to + // an empty request queue, it will block until the call is actually + // added to the pending list. + for (size_t i = 0; i < requests_per_cq_.size(); i++) { + size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size(); + requested_call* rc = + reinterpret_cast(requests_per_cq_[cq_idx].Pop()); + if (rc == nullptr) { + continue; + } else { + gpr_mu_unlock(&server_->mu_call); + GRPC_STATS_INC_SERVER_CQS_CHECKED(i + requests_per_cq_.size()); + gpr_atm_no_barrier_store(&calld->state, ACTIVATED); + publish_call(server_, calld, cq_idx, rc); + return; /* early out */ + } } + + gpr_atm_no_barrier_store(&calld->state, PENDING); + if (pending_head_ == nullptr) { + pending_tail_ = pending_head_ = calld; + } else { + pending_tail_->pending_next = calld; + pending_tail_ = calld; + } + gpr_mu_unlock(&server_->mu_call); } - GRPC_ERROR_UNREF(error); -} + + grpc_server* server() const override { return server_; } + + private: + grpc_server* const server_; + call_data* pending_head_ = nullptr; + call_data* pending_tail_ = nullptr; + std::vector requests_per_cq_; +}; /* * server proper @@ -412,16 +546,12 @@ void server_delete(grpc_server* server) { gpr_cv_destroy(&server->starting_cv); while ((rm = server->registered_methods) != nullptr) { server->registered_methods = rm->next; - if (server->started) { - request_matcher_destroy(&rm->matcher); - } + delete rm->matcher; gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); } - if (server->started) { - request_matcher_destroy(&server->unregistered_request_matcher); - } + delete server->unregistered_request_matcher; for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); } @@ -512,8 +642,8 @@ void publish_new_rpc(void* arg, grpc_error* error) { grpc_call_element* call_elem = static_cast(arg); call_data* calld = static_cast(call_elem->call_data); channel_data* chand = static_cast(call_elem->channel_data); - request_matcher* rm = calld->matcher; - grpc_server* server = rm->server; + RequestMatcherInterface* rm = calld->matcher; + grpc_server* server = rm->server(); if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { gpr_atm_no_barrier_store(&calld->state, ZOMBIED); @@ -526,55 +656,11 @@ void publish_new_rpc(void* arg, grpc_error* error) { return; } - for (size_t i = 0; i < server->cq_count; i++) { - size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - requested_call* rc = - reinterpret_cast(rm->requests_per_cq[cq_idx].TryPop()); - if (rc == nullptr) { - continue; - } else { - GRPC_STATS_INC_SERVER_CQS_CHECKED(i); - gpr_atm_no_barrier_store(&calld->state, ACTIVATED); - publish_call(server, calld, cq_idx, rc); - return; /* early out */ - } - } - - /* no cq to take the request found: queue it on the slow list */ - GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(); - gpr_mu_lock(&server->mu_call); - - // We need to ensure that all the queues are empty. We do this under - // the server mu_call lock to ensure that if something is added to - // an empty request queue, it will block until the call is actually - // added to the pending list. - for (size_t i = 0; i < server->cq_count; i++) { - size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - requested_call* rc = - reinterpret_cast(rm->requests_per_cq[cq_idx].Pop()); - if (rc == nullptr) { - continue; - } else { - gpr_mu_unlock(&server->mu_call); - GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count); - gpr_atm_no_barrier_store(&calld->state, ACTIVATED); - publish_call(server, calld, cq_idx, rc); - return; /* early out */ - } - } - - gpr_atm_no_barrier_store(&calld->state, PENDING); - if (rm->pending_head == nullptr) { - rm->pending_tail = rm->pending_head = calld; - } else { - rm->pending_tail->pending_next = calld; - rm->pending_tail = calld; - } - gpr_mu_unlock(&server->mu_call); + rm->MatchOrQueue(chand->cq_idx, calld); } void finish_start_new_rpc( - grpc_server* server, grpc_call_element* elem, request_matcher* rm, + grpc_server* server, grpc_call_element* elem, RequestMatcherInterface* rm, grpc_server_register_method_payload_handling payload_handling) { call_data* calld = static_cast(elem->call_data); @@ -632,7 +718,7 @@ void start_new_rpc(grpc_call_element* elem) { GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } - finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher, + finish_start_new_rpc(server, elem, rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } @@ -649,12 +735,12 @@ void start_new_rpc(grpc_call_element* elem) { GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } - finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher, + finish_start_new_rpc(server, elem, rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } } - finish_start_new_rpc(server, elem, &server->unregistered_request_matcher, + finish_start_new_rpc(server, elem, server->unregistered_request_matcher, GRPC_SRM_PAYLOAD_NONE); } @@ -683,15 +769,12 @@ int num_channels(grpc_server* server) { void kill_pending_work_locked(grpc_server* server, grpc_error* error) { if (server->started) { - request_matcher_kill_requests(server, &server->unregistered_request_matcher, - GRPC_ERROR_REF(error)); - request_matcher_zombify_all_pending_calls( - &server->unregistered_request_matcher); + server->unregistered_request_matcher->KillRequests(GRPC_ERROR_REF(error)); + server->unregistered_request_matcher->ZombifyPending(); for (registered_method* rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(server, &rm->matcher, - GRPC_ERROR_REF(error)); - request_matcher_zombify_all_pending_calls(&rm->matcher); + rm->matcher->KillRequests(GRPC_ERROR_REF(error)); + rm->matcher->ZombifyPending(); } } GRPC_ERROR_UNREF(error); @@ -1007,45 +1090,21 @@ void listener_destroy_done(void* s, grpc_error* /*error*/) { grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, requested_call* rc) { - call_data* calld = nullptr; - request_matcher* rm = nullptr; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } + RequestMatcherInterface* rm; switch (rc->type) { case BATCH_CALL: - rm = &server->unregistered_request_matcher; + rm = server->unregistered_request_matcher; break; case REGISTERED_CALL: - rm = &rc->data.registered.method->matcher; + rm = rc->data.registered.method->matcher; break; } - if (rm->requests_per_cq[cq_idx].Push(rc->mpscq_node.get())) { - /* this was the first queued request: we need to lock and start - matching calls */ - gpr_mu_lock(&server->mu_call); - while ((calld = rm->pending_head) != nullptr) { - rc = reinterpret_cast(rm->requests_per_cq[cq_idx].Pop()); - if (rc == nullptr) break; - rm->pending_head = calld->pending_next; - gpr_mu_unlock(&server->mu_call); - if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { - // Zombied Call - GRPC_CLOSURE_INIT( - &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), - grpc_schedule_on_exec_ctx); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, - GRPC_ERROR_NONE); - } else { - publish_call(server, calld, cq_idx, rc); - } - gpr_mu_lock(&server->mu_call); - } - gpr_mu_unlock(&server->mu_call); - } + rm->RequestCallWithPossiblePublish(cq_idx, rc); return GRPC_CALL_OK; } @@ -1191,9 +1250,9 @@ void grpc_server_start(grpc_server* server) { grpc_cq_pollset(server->cqs[i]); } } - request_matcher_init(&server->unregistered_request_matcher, server); + server->unregistered_request_matcher = new RealRequestMatcher(server); for (registered_method* rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->matcher, server); + rm->matcher = new RealRequestMatcher(server); } gpr_mu_lock(&server->mu_global);