One RPCMgr instance per CQ

pull/7466/head
Sree Kuchibhotla 9 years ago
parent 3ea9e247e0
commit aabada97a1
  1. 58
      include/grpc++/server.h
  2. 9
      include/grpc++/server_builder.h
  3. 9
      src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
  4. 6
      src/core/lib/surface/server.c
  5. 3
      src/core/lib/surface/server.h
  6. 37
      src/cpp/rpcmanager/grpc_rpc_manager.cc
  7. 59
      src/cpp/rpcmanager/grpc_rpc_manager.h
  8. 190
      src/cpp/server/server.cc
  9. 72
      src/cpp/server/server_builder.cc
  10. 3
      src/cpp/server/server_posix.cc
  11. 2
      test/core/end2end/fixtures/h2_fd.c

@ -66,9 +66,7 @@ class ThreadPoolInterface;
/// Models a gRPC server.
///
/// Servers are configured and started via \a grpc::ServerBuilder.
class Server GRPC_FINAL : public ServerInterface,
private GrpcLibraryCodegen,
public GrpcRpcManager {
class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
public:
~Server();
@ -100,13 +98,6 @@ class Server GRPC_FINAL : public ServerInterface,
// Returns a \em raw pointer to the underlying grpc_server instance.
grpc_server* c_server();
// Returns a \em raw pointer to the underlying CompletionQueue.
CompletionQueue* completion_queue();
/// GRPC RPC Manager functions
void PollForWork(bool& is_work_found, void** tag) GRPC_OVERRIDE;
void DoWork(void* tag) GRPC_OVERRIDE;
private:
friend class AsyncGenericService;
friend class ServerBuilder;
@ -116,19 +107,37 @@ class Server GRPC_FINAL : public ServerInterface,
class AsyncRequest;
class ShutdownRequest;
/// SyncRequestManager is an implementation of GrpcRpcManager. This class is
/// responsible for polling for incoming RPCs and calling the RPC handlers.
/// This is only used in case of a Sync server (i.e a server exposing a sync
/// interface)
class SyncRequestManager;
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
/// Server constructors. To be used by \a ServerBuilder only.
///
/// \param has_sync_methods Does this Server have any synchronous methods.
/// This information is useful to the server in creating some internal data
/// structures (completion queues / thread pools etc) to handle the incoming
/// RPCs corresponding to those sync methods
/// \param sync_server_cqs The completion queues to use if the server is a
/// synchronous server (or a hybrid server). The server polls for new RPCs on
/// these queues
///
/// \param max_message_size Maximum message length that the channel can
/// receive.
Server(bool has_sync_methods, int max_message_size, ChannelArguments* args);
///
/// \param args The channel args
///
/// \param min_pollers The minimum number of polling threads per server
/// completion queue (in param sync_server_cqs) to use for listening to
/// incoming requests (used only in case of sync server)
///
/// \param max_pollers The maximum number of polling threads per server
/// completion queue (in param sync_server_cqs) to use for listening to
/// incoming requests (used only in case of sync server)
Server(std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs,
int max_message_size, ChannelArguments* args, int min_pollers,
int max_pollers);
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
@ -181,11 +190,13 @@ class Server GRPC_FINAL : public ServerInterface,
const int max_message_size_;
// The following completion queues used ONLY if the server has any services
// with sync methods. The queues are used as notification_cqs to get notified
// of the incoming RPCs
// std::vector<std::unique_ptr<CompletionQueue>> notification_cqs_;
CompletionQueue cq_;
/// The following completion queues are ONLY used in case of Sync API i.e if
/// the server has any services with sync methods. The server uses these
/// completion queues to poll for new RPCs
std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs_;
/// List of GrpcRpcManager instances (one for each cq in the sync_server_cqs)
std::vector<std::unique_ptr<SyncRequestManager>> sync_req_mgrs_;
// Sever status
grpc::mutex mu_;
@ -193,6 +204,9 @@ class Server GRPC_FINAL : public ServerInterface,
bool shutdown_;
bool shutdown_notified_;
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq_;
// TODO (sreek) : Remove num_running_cb_ and callback_cv_;
// The number of threads which are running callbacks.
// int num_running_cb_;
@ -202,12 +216,10 @@ class Server GRPC_FINAL : public ServerInterface,
std::shared_ptr<GlobalCallbacks> global_callbacks_;
std::list<SyncRequest>* sync_methods_;
std::vector<grpc::string> services_;
std::unique_ptr<RpcServiceMethod> unknown_method_;
bool has_generic_service_;
// Pointer to the c grpc server.
// Pointer to the c core's grpc server.
grpc_server* server_;
std::unique_ptr<ServerInitializer> server_initializer_;

@ -153,6 +153,12 @@ class ServerBuilder {
private:
friend class ::grpc::testing::ServerBuilderPluginTest;
// TODO (sreek) Make these configurable
// The default number of minimum and maximum number of polling threads needed
// per completion queue. These are only used in case of Sync server
const int kDefaultMinPollers = 1;
const int kDefaultMaxPollers = -1; // Unlimited
struct Port {
grpc::string addr;
std::shared_ptr<ServerCredentials> creds;
@ -172,7 +178,10 @@ class ServerBuilder {
std::vector<std::unique_ptr<ServerBuilderOption>> options_;
std::vector<std::unique_ptr<NamedService>> services_;
std::vector<Port> ports_;
/* List of completion queues added via AddCompletionQueue() method */
std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;
std::vector<std::unique_ptr<ServerBuilderPlugin>> plugins_;
AsyncGenericService* generic_service_;

@ -65,6 +65,15 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
const grpc_channel_args *server_args = grpc_server_get_channel_args(server);
grpc_transport *transport = grpc_create_chttp2_transport(
&exec_ctx, server_args, server_endpoint, 0 /* is_client */);
grpc_pollset **pollsets;
size_t num_pollsets = 0;
grpc_server_get_pollsets(server, &pollsets, &num_pollsets);
for (size_t i = 0; i < num_pollsets; i++) {
grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, pollsets[i]);
}
grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, grpc_cq_pollset(cq));
grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);

@ -1098,6 +1098,12 @@ void grpc_server_start(grpc_server *server) {
grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
size_t *pollset_count) {
*pollset_count = server->cq_count;
*pollsets = server->pollsets;
}
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
grpc_pollset *accepting_pollset,

@ -60,4 +60,7 @@ const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
int grpc_server_has_open_connections(grpc_server *server);
void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
size_t *pollset_count);
#endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */

@ -54,14 +54,12 @@ GrpcRpcManager::GrpcRpcManagerThread::~GrpcRpcManagerThread() {
thd_.reset();
}
GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers,
int max_threads)
GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
: shutdown_(false),
num_pollers_(0),
min_pollers_(min_pollers),
max_pollers_(max_pollers),
num_threads_(0),
max_threads_(max_threads) {}
num_threads_(0) {}
GrpcRpcManager::~GrpcRpcManager() {
std::unique_lock<grpc::mutex> lock(mu_);
@ -84,6 +82,11 @@ void GrpcRpcManager::ShutdownRpcManager() {
shutdown_ = true;
}
bool GrpcRpcManager::IsShutdown() {
std::unique_lock<grpc::mutex> lock(mu_);
return shutdown_;
}
void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) {
std::unique_lock<grpc::mutex> lock(list_mu_);
completed_threads_.push_back(thd);
@ -108,8 +111,7 @@ void GrpcRpcManager::Initialize() {
// below the maximum threshold, we can let the current thread continue as poller
bool GrpcRpcManager::MaybeContinueAsPoller() {
std::unique_lock<grpc::mutex> lock(mu_);
if (shutdown_ || num_pollers_ > max_pollers_ ||
num_threads_ >= max_threads_) {
if (shutdown_ || num_pollers_ > max_pollers_) {
return false;
}
@ -122,8 +124,7 @@ bool GrpcRpcManager::MaybeContinueAsPoller() {
// min_pollers_) and the total number of threads is below the maximum threshold
void GrpcRpcManager::MaybeCreatePoller() {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (!shutdown_ && num_pollers_ < min_pollers_ &&
num_threads_ < max_threads_) {
if (!shutdown_ && num_pollers_ < min_pollers_) {
num_pollers_++;
num_threads_++;
@ -133,28 +134,38 @@ void GrpcRpcManager::MaybeCreatePoller() {
}
void GrpcRpcManager::MainWorkLoop() {
bool is_work_found = false;
void* tag;
bool ok;
/*
1. Poll for work (i.e PollForWork())
2. After returning from PollForWork, reduce the number of pollers by 1
2. After returning from PollForWork, reduce the number of pollers by 1. If
PollForWork() returned a TIMEOUT, then it may indicate that we have more
polling threads than needed. Check if the number of pollers is greater
than min_pollers and if so, terminate the thread.
3. Since we are short of one poller now, see if a new poller has to be
created (i.e see MaybeCreatePoller() for more details)
4. Do the actual work (DoWork())
5. After doing the work, see it this thread can resume polling work (i.e
see MaybeContinueAsPoller() for more details) */
do {
PollForWork(is_work_found, &tag);
WorkStatus work_status = PollForWork(&tag, &ok);
{
grpc::unique_lock<grpc::mutex> lock(mu_);
num_pollers_--;
if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
break;
}
}
if (is_work_found) {
// TODO (sreek) See if we need to check for shutdown here and quit
// Note that MaybeCreatePoller does check for shutdown and creates a new
// thread only if GrpcRpcManager is not shutdown
if (work_status == WORK_FOUND) {
MaybeCreatePoller();
DoWork(tag);
DoWork(tag, ok);
}
} while (MaybeContinueAsPoller());

@ -44,17 +44,56 @@ namespace grpc {
class GrpcRpcManager {
public:
explicit GrpcRpcManager(int min_pollers, int max_pollers, int max_threads);
explicit GrpcRpcManager(int min_pollers, int max_pollers);
virtual ~GrpcRpcManager();
// This function MUST be called before using the object
void Initialize();
virtual void PollForWork(bool& is_work_found, void **tag) = 0;
virtual void DoWork(void *tag) = 0;
enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT };
// "Polls" for new work.
// If the return value is WORK_FOUND:
// - The implementaion of PollForWork() MAY set some opaque identifier to
// (identify the work item found) via the '*tag' parameter
// - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
// value of 'false' indicates some implemenation specific error (that is
// neither SHUTDOWN nor TIMEOUT)
// - GrpcRpcManager does not interpret the values of 'tag' and 'ok'
// - GrpcRpcManager WILL call DoWork() and pass '*tag' and 'ok' as input to
// DoWork()
//
// If the return value is SHUTDOWN:,
// - GrpcManager WILL NOT call DoWork() and terminates the thead
//
// If the return value is TIMEOUT:,
// - GrpcManager WILL NOT call DoWork()
// - GrpcManager MAY terminate the thread depending on the current number of
// active poller threads and mix_pollers/max_pollers settings
// - Also, the value of timeout is specific to the derived class
// implementation
virtual WorkStatus PollForWork(void** tag, bool* ok) = 0;
// The implementation of DoWork() is supposed to perform the work found by
// PollForWork(). The tag and ok parameters are the same as returned by
// PollForWork()
//
// The implementation of DoWork() should also do any setup needed to ensure
// that the next call to PollForWork() (not necessarily by the current thread)
// actually finds some work
virtual void DoWork(void* tag, bool ok) = 0;
// Mark the GrpcRpcManager as shutdown and begin draining the work.
// This is a non-blocking call and the caller should call Wait(), a blocking
// call which returns only once the shutdown is complete
void ShutdownRpcManager();
// Has ShutdownRpcManager() been called
bool IsShutdown();
// A blocking call that returns only after the GrpcRpcManager has shutdown and
// all the threads have drained all the outstanding work
void Wait();
void ShutdownRpcManager();
private:
// Helper wrapper class around std::thread. This takes a GrpcRpcManager object
@ -63,8 +102,6 @@ class GrpcRpcManager {
// The Run() function calls GrpcManager::MainWorkLoop() function and once that
// completes, it marks the GrpcRpcManagerThread completed by calling
// GrpcRpcManager::MarkAsCompleted()
// TODO: sreek - Consider using a separate threadpool rather than implementing
// one in this class
class GrpcRpcManagerThread {
public:
GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr);
@ -83,13 +120,11 @@ class GrpcRpcManager {
void MainWorkLoop();
// Create a new poller if the number of current pollers is less than the
// minimum number of pollers needed (i.e min_pollers) and the total number of
// threads are less than the max number of threads (i.e max_threads)
// minimum number of pollers needed (i.e min_pollers).
void MaybeCreatePoller();
// Returns true if the current thread can resume as a poller. i.e if the
// current number of pollers is less than the max_pollers AND the total number
// of threads is less than max_threads
// current number of pollers is less than the max_pollers.
bool MaybeContinueAsPoller();
void MarkAsCompleted(GrpcRpcManagerThread* thd);
@ -113,10 +148,6 @@ class GrpcRpcManager {
// currently polling i.e num_pollers_)
int num_threads_;
// The maximum number of threads that can be active (This is a soft limit and
// the actual number of threads may sometimes be briefly above this number)
int max_threads_;
grpc::mutex list_mu_;
std::list<GrpcRpcManagerThread*> completed_threads_;
};

@ -275,15 +275,99 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
class Server::SyncRequestManager : public GrpcRpcManager {
public:
SyncRequestManager(Server* server, CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
int min_pollers, int max_pollers)
: GrpcRpcManager(min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
global_callbacks_(global_callbacks) {}
static const int kRpcPollingTimeoutMsec = 500;
WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
*tag = nullptr;
gpr_timespec deadline =
gpr_time_from_millis(kRpcPollingTimeoutMsec, GPR_TIMESPAN);
switch (server_cq_->AsyncNext(tag, ok, deadline)) {
case CompletionQueue::TIMEOUT:
return TIMEOUT;
case CompletionQueue::SHUTDOWN:
return SHUTDOWN;
case CompletionQueue::GOT_EVENT:
return WORK_FOUND;
}
GPR_UNREACHABLE_CODE(return TIMEOUT);
}
void DoWork(void* tag, bool ok) GRPC_OVERRIDE {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (ok && sync_req) {
SyncRequest::CallData cd(server_, sync_req);
{
sync_req->SetupRequest();
if (!IsShutdown()) {
sync_req->Request(server_->c_server(), server_cq_->cq());
} else {
sync_req->TeardownRequest();
}
}
GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_);
}
// TODO (sreek): If ok == false, log an error
}
void AddSyncMethod(RpcServiceMethod* method, void* tag) {
sync_methods_.emplace_back(method, tag);
}
void AddUnknownSyncMethod() {
// TODO (sreek) - Check if !sync_methods_.empty() is really needed here
if (!sync_methods_.empty()) {
unknown_method_.reset(new RpcServiceMethod(
"unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
// Use of emplace_back with just constructor arguments is not accepted
// here by gcc-4.4 because it can't match the anonymous nullptr with a
// proper constructor implicitly. Construct the object and use push_back.
sync_methods_.push_back(SyncRequest(unknown_method_.get(), nullptr));
}
}
void Start() {
if (!sync_methods_.empty()) {
for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
m->SetupRequest();
m->Request(server_->c_server(), server_cq_->cq());
}
GrpcRpcManager::Initialize();
}
}
private:
Server* server_;
CompletionQueue* server_cq_;
std::vector<SyncRequest> sync_methods_;
std::unique_ptr<RpcServiceMethod> unknown_method_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
};
static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(bool has_sync_methods, int max_message_size,
ChannelArguments* args)
: GrpcRpcManager(3, 5, 8),
max_message_size_(max_message_size),
Server::Server(
std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs,
int max_message_size, ChannelArguments* args, int min_pollers,
int max_pollers)
: max_message_size_(max_message_size),
sync_server_cqs_(sync_server_cqs),
started_(false),
shutdown_(false),
shutdown_notified_(false),
sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
server_(nullptr),
server_initializer_(new ServerInitializer(this)) {
@ -291,16 +375,17 @@ Server::Server(bool has_sync_methods, int max_message_size,
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
global_callbacks_->UpdateArguments(args);
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
sync_req_mgrs_.emplace_back(new SyncRequestManager(
this, &(*it), global_callbacks_, min_pollers, max_pollers));
}
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
server_ = grpc_server_create(&channel_args, nullptr);
if (!has_sync_methods) {
grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
nullptr);
} else {
grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}
server_ = grpc_server_create(&channel_args, nullptr);
}
Server::~Server() {
@ -310,15 +395,20 @@ Server::~Server() {
lock.unlock();
Shutdown();
} else if (!started_) {
// TODO (sreek): Shutdown all cqs
/*
cq_.Shutdown();
*/
}
}
// TODO(sreek) Do thisfor all cqs ?
/*
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
*/
grpc_server_destroy(server_);
delete sync_methods_;
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@ -329,8 +419,6 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
grpc_server* Server::c_server() { return server_; }
CompletionQueue* Server::completion_queue() { return &cq_; }
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
RpcServiceMethod* method) {
switch (method->method_type()) {
@ -351,6 +439,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
@ -369,7 +458,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
if (method->handler() == nullptr) {
method->set_server_tag(tag);
} else {
sync_methods_->emplace_back(method, tag);
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->AddSyncMethod(method, tag);
}
}
method_name = method->name();
}
@ -405,13 +496,8 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
grpc_server_start(server_);
if (!has_generic_service_) {
if (!sync_methods_->empty()) {
unknown_method_.reset(new RpcServiceMethod(
"unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
// Use of emplace_back with just constructor arguments is not accepted
// here by gcc-4.4 because it can't match the anonymous nullptr with a
// proper constructor implicitly. Construct the object and use push_back.
sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->AddUnknownSyncMethod();
}
for (size_t i = 0; i < num_cqs; i++) {
@ -421,6 +507,12 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
}
}
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
/* TODO (Sreek) - Do this for all cqs */
/*
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@ -430,26 +522,73 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GrpcRpcManager::Initialize();
}
*/
return true;
}
// TODO (sreek) - Reimplement this
void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
int shutdown_tag = 0; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag);
// Shutdown all RpcManagers. This will try to gracefully stop all the
// threads in the RpcManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->ShutdownRpcManager();
}
shutdown_cq_.Shutdown();
void* tag;
bool ok;
CompletionQueue::NextStatus status =
shutdown_cq_.AsyncNext(&tag, &ok, deadline);
// If this timed out, it means we are done with the grace-period for
// a clean shutdown. We should force a shutdown now by cancelling all
// inflight calls
if (status == CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
}
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
// Wait for threads in all RpcManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
}
// Shutdown the completion queues
// TODO (sreek) Move this into SyncRequestManager
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
(*it).Shutdown();
}
/*
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
lock.unlock();
*/
// TODO (sreek) Delete this
/*
GrpcRpcManager::ShutdownRpcManager();
GrpcRpcManager::Wait();
*/
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
// Since nothing else needs to touch state guarded by mu_, holding it
// through this loop is fine.
//
/*
SyncRequest* request;
bool ok;
while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
@ -461,6 +600,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
}
}
lock.lock();
*/
/* TODO (sreek) - Remove this block */
// Wait for running callbacks to finish.
@ -642,6 +782,8 @@ void Server::RunRpc() {
*/
}
/* TODO (sreek) Move this to SyncRequestManager */
/*
void Server::PollForWork(bool& is_work_found, void** tag) {
is_work_found = true;
*tag = nullptr;
@ -651,6 +793,7 @@ void Server::PollForWork(bool& is_work_found, void** tag) {
}
}
void Server::DoWork(void* tag) {
auto* mrd = static_cast<SyncRequest*>(tag);
if (mrd) {
@ -669,6 +812,7 @@ void Server::DoWork(void* tag) {
cd.Run(global_callbacks_);
}
}
*/
ServerInitializer* Server::initializer() { return server_initializer_.get(); }

@ -93,7 +93,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p",
service);
(void *) service);
} else {
generic_service_ = service;
}
@ -138,7 +138,6 @@ ServerBuilder& ServerBuilder::AddListeningPort(
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// == Determine if the server has any syncrhonous methods ==
bool has_sync_methods = false;
for (auto it = services_.begin(); it != services_.end(); ++it) {
@ -157,6 +156,35 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
}
// If this is a Sync server, i.e a server expositing sync API, then the server
// needs to create some completion queues to listen for incoming requests.
// 'sync_server_cqs' are those internal completion queues.
//
// This is different from the completion queues added to the server via
// ServerBuilder's AddCompletionQueue() method (those completion queues
// are in 'cqs_' member variable of ServerBuilder object)
std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs(
new std::vector<ServerCompletionQueue>());
if (has_sync_methods) {
// If the server has synchronous methods, it will need completion queues to
// handle those methods. Create one cq per core (or create 4 if number of
// cores is less than 4 or unavailable)
//
// TODO (sreek) - The default number 4 is just a guess. Check if a lower or
// higher number makes sense
int num_cqs = gpr_cpu_num_cores();
num_cqs = GPR_MAX(num_cqs, 4);
for (int i = 0; i < num_cqs; i++) {
// emplace_back() would have been ideal here but doesn't work since the
// ServerCompletionQueue's constructor is private. With emplace_back, the
// constructor is called from somewhere within the library; so making
// ServerBuilder class a friend to ServerCompletion queue won't help.
sync_server_cqs->push_back(ServerCompletionQueue());
}
}
// == Channel args ==
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
@ -178,28 +206,38 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
maybe_default_compression_algorithm_.algorithm);
}
std::unique_ptr<Server> server(
new Server(has_sync_methods, max_message_size_, &args));
// TODO (sreek) Make the number of pollers configurable
std::unique_ptr<Server> server(new Server(sync_server_cqs, max_message_size_,
&args, kDefaultMinPollers,
kDefaultMaxPollers));
ServerInitializer* initializer = server->initializer();
// If the server has atleast one sync methods, we know that this is a Sync
// server or a Hybrid server and the completion queue (server->cq_) would be
// frequently polled.
int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
// A completion queue that is not polled frequently (by calling Next() or
// AsyncNext()) is not safe to use for listening to incoming channels.
// Register all such completion queues as non-listening completion queues
// with the GRPC core library.
if ((*cq)->IsFrequentlyPolled()) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
// Register all the completion queues with the server. i.e
// 1. sync_server_cqs: internal completion queues created IF this is a sync
// server
// 2. cqs_: Completion queues added via AddCompletionQueue() call
// All sync cqs (if any) are frequently polled by the GrpcRpcManager
int num_frequently_polled_cqs = sync_server_cqs->size();
for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
grpc_server_register_completion_queue(server->server_, it->cq(), nullptr);
}
// cqs_ contains the completion queue added by calling the ServerBuilder's
// AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
// calling Next() or AsyncNext()) and hence are not safe to be used for
// listening to incoming channels. Such completion queues must be registered
// as non-listening queues
for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
if ((*it)->IsFrequentlyPolled()) {
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
(*cq)->cq(), nullptr);
(*it)->cq(), nullptr);
}
}

@ -40,8 +40,7 @@ namespace grpc {
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
void AddInsecureChannelFromFd(Server* server, int fd) {
grpc_server_add_insecure_channel_from_fd(
server->c_server(), server->completion_queue()->cq(), fd);
grpc_server_add_insecure_channel_from_fd(server->c_server(), NULL, fd);
}
#endif // GPR_SUPPORT_CHANNELS_FROM_FD

@ -95,7 +95,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_server_register_completion_queue(f->server, f->cq, NULL);
grpc_server_start(f->server);
grpc_server_add_insecure_channel_from_fd(f->server, f->cq, sfd->fd_pair[1]);
grpc_server_add_insecure_channel_from_fd(f->server, NULL, sfd->fd_pair[1]);
grpc_exec_ctx_finish(&exec_ctx);
}

Loading…
Cancel
Save