Use AllocatingRequestMatcher for sync C++ server and simplify

pull/25401/head
Vijay Pai 4 years ago
parent e8400c0059
commit 9cceb28b2e
  1. 14
      src/core/lib/surface/server.cc
  2. 6
      src/core/lib/surface/server.h
  3. 374
      src/cpp/server/server_cc.cc

@ -375,7 +375,7 @@ class Server::AllocatingRequestMatcherBatch
cq(), static_cast<void*>(call_info.tag), nullptr, nullptr) == cq(), static_cast<void*>(call_info.tag), nullptr, nullptr) ==
GRPC_CALL_OK); GRPC_CALL_OK);
RequestedCall* rc = new RequestedCall( RequestedCall* rc = new RequestedCall(
static_cast<void*>(call_info.tag), cq(), call_info.call, static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
call_info.initial_metadata, call_info.details); call_info.initial_metadata, call_info.details);
calld->SetState(CallData::CallState::ACTIVATED); calld->SetState(CallData::CallState::ACTIVATED);
calld->Publish(cq_idx(), rc); calld->Publish(cq_idx(), rc);
@ -399,14 +399,12 @@ class Server::AllocatingRequestMatcherRegistered
void MatchOrQueue(size_t /*start_request_queue_index*/, void MatchOrQueue(size_t /*start_request_queue_index*/,
CallData* calld) override { CallData* calld) override {
RegisteredCallAllocation call_info = allocator_(); RegisteredCallAllocation call_info = allocator_();
GPR_ASSERT( GPR_ASSERT(server()->ValidateServerRequest(
server()->ValidateServerRequest(cq(), static_cast<void*>(call_info.tag), cq(), call_info.tag, call_info.optional_payload,
call_info.optional_payload, registered_method_) == GRPC_CALL_OK);
registered_method_) == GRPC_CALL_OK);
RequestedCall* rc = new RequestedCall( RequestedCall* rc = new RequestedCall(
static_cast<void*>(call_info.tag), cq(), call_info.call, call_info.tag, call_info.cq, call_info.call, call_info.initial_metadata,
call_info.initial_metadata, registered_method_, call_info.deadline, registered_method_, call_info.deadline, call_info.optional_payload);
call_info.optional_payload);
calld->SetState(CallData::CallState::ACTIVATED); calld->SetState(CallData::CallState::ACTIVATED);
calld->Publish(cq_idx(), rc); calld->Publish(cq_idx(), rc);
} }

@ -49,21 +49,23 @@ class Server : public InternallyRefCounted<Server> {
// An object to represent the most relevant characteristics of a // An object to represent the most relevant characteristics of a
// newly-allocated call object when using an AllocatingRequestMatcherBatch. // newly-allocated call object when using an AllocatingRequestMatcherBatch.
struct BatchCallAllocation { struct BatchCallAllocation {
grpc_experimental_completion_queue_functor* tag; void* tag;
grpc_call** call; grpc_call** call;
grpc_metadata_array* initial_metadata; grpc_metadata_array* initial_metadata;
grpc_call_details* details; grpc_call_details* details;
grpc_completion_queue* cq;
}; };
// An object to represent the most relevant characteristics of a // An object to represent the most relevant characteristics of a
// newly-allocated call object when using an // newly-allocated call object when using an
// AllocatingRequestMatcherRegistered. // AllocatingRequestMatcherRegistered.
struct RegisteredCallAllocation { struct RegisteredCallAllocation {
grpc_experimental_completion_queue_functor* tag; void* tag;
grpc_call** call; grpc_call** call;
grpc_metadata_array* initial_metadata; grpc_metadata_array* initial_metadata;
gpr_timespec* deadline; gpr_timespec* deadline;
grpc_byte_buffer** optional_payload; grpc_byte_buffer** optional_payload;
grpc_completion_queue* cq;
}; };
/// Interface for listeners. /// Interface for listeners.

@ -45,6 +45,7 @@
#include "absl/memory/memory.h" #include "absl/memory/memory.h"
#include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/ext/transport/inproc/inproc_transport.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
@ -336,199 +337,160 @@ class Server::UnimplementedAsyncResponse final
class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
public: public:
SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
: method_(method), grpc_core::Server::RegisteredCallAllocation* data)
method_tag_(method_tag), : SyncRequest(server, method) {
in_flight_(false), CommonSetup(data);
has_request_payload_(method->method_type() == data->deadline = &deadline_;
grpc::internal::RpcMethod::NORMAL_RPC || data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
method->method_type() ==
grpc::internal::RpcMethod::SERVER_STREAMING),
call_details_(nullptr),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
~SyncRequest() override {
if (call_details_) {
delete call_details_;
}
grpc_metadata_array_destroy(&request_metadata_);
} }
void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); } SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
grpc_core::Server::BatchCallAllocation* data)
void TeardownRequest() { : SyncRequest(server, method) {
grpc_completion_queue_destroy(cq_); CommonSetup(data);
cq_ = nullptr; call_details_ = new grpc_call_details;
grpc_call_details_init(call_details_);
data->details = call_details_;
} }
void Request(grpc_server* server, grpc_completion_queue* notify_cq) { ~SyncRequest() override {
GPR_ASSERT(cq_ && !in_flight_); if (has_request_payload_ && request_payload_) {
in_flight_ = true; grpc_byte_buffer_destroy(request_payload_);
if (method_tag_) {
if (grpc_server_request_registered_call(
server, method_tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
notify_cq, this) != GRPC_CALL_OK) {
TeardownRequest();
return;
}
} else {
if (!call_details_) {
call_details_ = new grpc_call_details;
grpc_call_details_init(call_details_);
}
if (grpc_server_request_call(server, &call_, call_details_,
&request_metadata_, cq_, notify_cq,
this) != GRPC_CALL_OK) {
TeardownRequest();
return;
}
} }
} wrapped_call_.Destroy();
ctx_.Destroy();
void PostShutdownCleanup() { if (call_details_ != nullptr) {
if (call_) { grpc_call_details_destroy(call_details_);
grpc_call_unref(call_); delete call_details_;
call_ = nullptr;
}
if (cq_) {
grpc_completion_queue_destroy(cq_);
cq_ = nullptr;
} }
grpc_metadata_array_destroy(&request_metadata_);
} }
bool FinalizeResult(void** /*tag*/, bool* status) override { bool FinalizeResult(void** /*tag*/, bool* status) override {
if (!*status) { if (!*status) {
grpc_completion_queue_destroy(cq_); delete this;
cq_ = nullptr; return false;
} }
if (call_details_) { if (call_details_) {
deadline_ = call_details_->deadline; deadline_ = call_details_->deadline;
grpc_call_details_destroy(call_details_);
grpc_call_details_init(call_details_);
} }
return true; return true;
} }
// The CallData class represents a call that is "active" as opposed void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
// to just being requested. It wraps and takes ownership of the cq from bool resources) {
// the call request ctx_.Init(deadline_, &request_metadata_);
class CallData final { wrapped_call_.Init(
public: call_, server_, &cq_, server_->max_receive_message_size(),
explicit CallData(Server* server, SyncRequest* mrd) ctx_->ctx.set_server_rpc_info(method_->name(), method_->method_type(),
: cq_(mrd->cq_), server_->interceptor_creators_));
ctx_(mrd->deadline_, &mrd->request_metadata_), ctx_->ctx.set_call(call_);
has_request_payload_(mrd->has_request_payload_), ctx_->ctx.cq_ = &cq_;
request_payload_(has_request_payload_ ? mrd->request_payload_ request_metadata_.count = 0;
: nullptr),
request_(nullptr), global_callbacks_ = global_callbacks;
method_(mrd->method_), resources_ = resources;
call_(
mrd->call_, server, &cq_, server->max_receive_message_size(), interceptor_methods_.SetCall(&*wrapped_call_);
ctx_.set_server_rpc_info(method_->name(), method_->method_type(), interceptor_methods_.SetReverse();
server->interceptor_creators_)), // Set interception point for RECV INITIAL METADATA
server_(server), interceptor_methods_.AddInterceptionHookPoint(
global_callbacks_(nullptr), grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
resources_(false) { interceptor_methods_.SetRecvInitialMetadata(&ctx_->ctx.client_metadata_);
ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_; if (has_request_payload_) {
GPR_ASSERT(mrd->in_flight_); // Set interception point for RECV MESSAGE
mrd->in_flight_ = false; auto* handler = resources_ ? method_->handler()
mrd->request_metadata_.count = 0; : server_->resource_exhausted_handler_.get();
deserialized_request_ = handler->Deserialize(call_, request_payload_,
&request_status_, nullptr);
request_payload_ = nullptr;
interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
interceptor_methods_.SetRecvMessage(deserialized_request_, nullptr);
} }
~CallData() { if (interceptor_methods_.RunInterceptors(
if (has_request_payload_ && request_payload_) { [this]() { ContinueRunAfterInterception(); })) {
grpc_byte_buffer_destroy(request_payload_); ContinueRunAfterInterception();
} } else {
// There were interceptors to be run, so ContinueRunAfterInterception
// will be run when interceptors are done.
} }
}
void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, void ContinueRunAfterInterception() {
bool resources) { {
global_callbacks_ = global_callbacks; ctx_->ctx.BeginCompletionOp(&*wrapped_call_, nullptr, nullptr);
resources_ = resources; global_callbacks_->PreSynchronousRequest(&ctx_->ctx);
auto* handler = resources_ ? method_->handler()
: server_->resource_exhausted_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
&*wrapped_call_, &ctx_->ctx, deserialized_request_, request_status_,
nullptr, nullptr));
global_callbacks_->PostSynchronousRequest(&ctx_->ctx);
interceptor_methods_.SetCall(&call_); cq_.Shutdown();
interceptor_methods_.SetReverse();
// Set interception point for RECV INITIAL METADATA
interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::
POST_RECV_INITIAL_METADATA);
interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
if (has_request_payload_) { grpc::internal::CompletionQueueTag* op_tag =
// Set interception point for RECV MESSAGE ctx_->ctx.GetCompletionOpTag();
auto* handler = resources_ ? method_->handler() cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
: server_->resource_exhausted_handler_.get();
request_ = handler->Deserialize(call_.call(), request_payload_,
&request_status_, nullptr);
request_payload_ = nullptr; /* Ensure the cq_ is shutdown */
interceptor_methods_.AddInterceptionHookPoint( grpc::PhonyTag ignored_tag;
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
interceptor_methods_.SetRecvMessage(request_, nullptr);
}
if (interceptor_methods_.RunInterceptors(
[this]() { ContinueRunAfterInterception(); })) {
ContinueRunAfterInterception();
} else {
// There were interceptors to be run, so ContinueRunAfterInterception
// will be run when interceptors are done.
}
} }
delete this;
}
void ContinueRunAfterInterception() { private:
{ SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method)
ctx_.BeginCompletionOp(&call_, nullptr, nullptr); : server_(server),
global_callbacks_->PreSynchronousRequest(&ctx_); method_(method),
auto* handler = resources_ ? method_->handler() has_request_payload_(method->method_type() ==
: server_->resource_exhausted_handler_.get(); grpc::internal::RpcMethod::NORMAL_RPC ||
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( method->method_type() ==
&call_, &ctx_, request_, request_status_, nullptr, nullptr)); grpc::internal::RpcMethod::SERVER_STREAMING),
request_ = nullptr; cq_(grpc_completion_queue_create_for_pluck(nullptr)) {}
global_callbacks_->PostSynchronousRequest(&ctx_);
cq_.Shutdown();
grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
/* Ensure the cq_ is shutdown */
grpc::PhonyTag ignored_tag;
GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
}
delete this;
}
private: template <class CallAllocation>
grpc::CompletionQueue cq_; void CommonSetup(CallAllocation* data) {
grpc::ServerContext ctx_; grpc_metadata_array_init(&request_metadata_);
const bool has_request_payload_; data->tag = static_cast<void*>(this);
grpc_byte_buffer* request_payload_; data->call = &call_;
void* request_; data->initial_metadata = &request_metadata_;
grpc::Status request_status_; data->cq = cq_.cq();
grpc::internal::RpcServiceMethod* const method_; }
grpc::internal::Call call_;
Server* server_;
std::shared_ptr<GlobalCallbacks> global_callbacks_;
bool resources_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
private: Server* const server_;
grpc::internal::RpcServiceMethod* const method_; grpc::internal::RpcServiceMethod* const method_;
void* const method_tag_;
bool in_flight_;
const bool has_request_payload_; const bool has_request_payload_;
grpc_call* call_; grpc_call* call_;
grpc_call_details* call_details_; grpc_call_details* call_details_ = nullptr;
gpr_timespec deadline_; gpr_timespec deadline_;
grpc_metadata_array request_metadata_; grpc_metadata_array request_metadata_;
grpc_byte_buffer* request_payload_; grpc_byte_buffer* request_payload_;
grpc_completion_queue* cq_; grpc::CompletionQueue cq_;
grpc::Status request_status_;
std::shared_ptr<GlobalCallbacks> global_callbacks_;
bool resources_;
void* deserialized_request_ = nullptr;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
// ServerContextWrapper allows ManualConstructor while using a private
// contructor of ServerContext via this friend class.
struct ServerContextWrapper {
ServerContext ctx;
ServerContextWrapper(gpr_timespec deadline, grpc_metadata_array* arr)
: ctx(deadline, arr) {}
};
grpc_core::ManualConstructor<ServerContextWrapper> ctx_;
grpc_core::ManualConstructor<internal::Call> wrapped_call_;
}; };
template <class ServerContextType> template <class ServerContextType>
@ -702,7 +664,7 @@ class Server::CallbackRequest final
void CommonSetup(Server* server, CallAllocation* data) { void CommonSetup(Server* server, CallAllocation* data) {
server->Ref(); server->Ref();
grpc_metadata_array_init(&request_metadata_); grpc_metadata_array_init(&request_metadata_);
data->tag = &tag_; data->tag = static_cast<void*>(&tag_);
data->call = &call_; data->call = &call_;
data->initial_metadata = &request_metadata_; data->initial_metadata = &request_metadata_;
if (ctx_ == nullptr) { if (ctx_ == nullptr) {
@ -711,6 +673,7 @@ class Server::CallbackRequest final
ctx_alloc_by_default_ = true; ctx_alloc_by_default_ = true;
} }
ctx_->set_context_allocator(server->context_allocator()); ctx_->set_context_allocator(server->context_allocator());
data->cq = cq_->cq();
} }
Server* const server_; Server* const server_;
@ -802,42 +765,36 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
void DoWork(void* tag, bool ok, bool resources) override { void DoWork(void* tag, bool ok, bool resources) override {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag); SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) { // Under the AllocatingRequestMatcher model we will never see an invalid tag
// No tag. Nothing to work on. This is an unlikley scenario and possibly a // here.
// bug in RPC Manager implementation. GPR_DEBUG_ASSERT(sync_req != nullptr);
gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag"); GPR_DEBUG_ASSERT(ok);
return;
}
if (ok) {
// Calldata takes ownership of the completion queue and interceptors
// inside sync_req
auto* cd = new SyncRequest::CallData(server_, sync_req);
// Prepare for the next request
if (!IsShutdown()) {
sync_req->SetupRequest(); // Create new completion queue for sync_req
sync_req->Request(server_->c_server(), server_cq_->cq());
}
GPR_TIMER_SCOPE("cd.Run()", 0); GPR_TIMER_SCOPE("sync_req->Run()", 0);
cd->Run(global_callbacks_, resources); sync_req->Run(global_callbacks_, resources);
}
// TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request
// object
} }
void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) {
sync_requests_.emplace_back(new SyncRequest(method, tag)); server_->server()->core_server->SetRegisteredMethodAllocator(
server_cq_->cq(), tag, [this, method] {
grpc_core::Server::RegisteredCallAllocation result;
new SyncRequest(server_, method, &result);
return result;
});
has_sync_method_ = true;
} }
void AddUnknownSyncMethod() { void AddUnknownSyncMethod() {
if (!sync_requests_.empty()) { if (has_sync_method_) {
unknown_method_ = absl::make_unique<grpc::internal::RpcServiceMethod>( unknown_method_ = absl::make_unique<grpc::internal::RpcServiceMethod>(
"unknown", grpc::internal::RpcMethod::BIDI_STREAMING, "unknown", grpc::internal::RpcMethod::BIDI_STREAMING,
new grpc::internal::UnknownMethodHandler); new grpc::internal::UnknownMethodHandler);
sync_requests_.emplace_back( server_->server()->core_server->SetBatchMethodAllocator(
new SyncRequest(unknown_method_.get(), nullptr)); server_cq_->cq(), [this] {
grpc_core::Server::BatchCallAllocation result;
new SyncRequest(server_, unknown_method_.get(), &result);
return result;
});
} }
} }
@ -852,27 +809,13 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
void* tag; void* tag;
bool ok; bool ok;
while (server_cq_->Next(&tag, &ok)) { while (server_cq_->Next(&tag, &ok)) {
if (ok) { GPR_DEBUG_ASSERT(false);
// If a request was pulled off the queue, it means that the thread gpr_log(GPR_ERROR, "SyncRequest seen during shutdown");
// handling the request added it to the completion queue after shutdown
// was called - because the thread had already started and checked the
// shutdown flag before shutdown was called. In this case, we simply
// clean it up here, *after* calling wait on all the worker threads, at
// which point we are certain no in-flight requests will add more to the
// queue. This fixes an intermittent memory leak on shutdown.
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
sync_req->PostShutdownCleanup();
}
} }
} }
void Start() { void Start() {
if (!sync_requests_.empty()) { if (has_sync_method_) {
for (const auto& value : sync_requests_) {
value->SetupRequest();
value->Request(server_->c_server(), server_cq_->cq());
}
Initialize(); // ThreadManager's Initialize() Initialize(); // ThreadManager's Initialize()
} }
} }
@ -881,7 +824,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
Server* server_; Server* server_;
grpc::CompletionQueue* server_cq_; grpc::CompletionQueue* server_cq_;
int cq_timeout_msec_; int cq_timeout_msec_;
std::vector<std::unique_ptr<SyncRequest>> sync_requests_; bool has_sync_method_ = false;
std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
}; };
@ -1190,13 +1133,27 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
} }
#endif #endif
grpc_server_start(server_); // If we have a generic service, all unmatched method names go there.
// Otherwise, we must provide at least one RPC request for an "unimplemented"
// RPC, which covers any RPC for a method name that isn't matched. If we
// have a sync service, let it be a sync unimplemented RPC, which must be
// registered before server start (to initialize an AllocatingRequestMatcher).
// If we have an AllocatingRequestMatcher, we can't also specify other
// unimplemented RPCs via explicit async requests, so we won't do so. If we
// only have async services, we can specify unimplemented RPCs on each async
// CQ so that some user polling thread will move them along as long as some
// progress is being made on any RPCs in the system.
bool unknown_rpc_needed =
!has_async_generic_service_ && !has_callback_generic_service_;
if (unknown_rpc_needed && !sync_req_mgrs_.empty()) {
sync_req_mgrs_[0]->AddUnknownSyncMethod();
unknown_rpc_needed = false;
}
if (!has_async_generic_service_ && !has_callback_generic_service_) { grpc_server_start(server_);
for (const auto& value : sync_req_mgrs_) {
value->AddUnknownSyncMethod();
}
if (unknown_rpc_needed) {
for (size_t i = 0; i < num_cqs; i++) { for (size_t i = 0; i < num_cqs; i++) {
if (cqs[i]->IsFrequentlyPolled()) { if (cqs[i]->IsFrequentlyPolled()) {
new UnimplementedAsyncRequest(this, cqs[i]); new UnimplementedAsyncRequest(this, cqs[i]);
@ -1205,6 +1162,7 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
if (health_check_cq != nullptr) { if (health_check_cq != nullptr) {
new UnimplementedAsyncRequest(this, health_check_cq); new UnimplementedAsyncRequest(this, health_check_cq);
} }
unknown_rpc_needed = false;
} }
// If this server has any support for synchronous methods (has any sync // If this server has any support for synchronous methods (has any sync

Loading…
Cancel
Save