diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 56864d6d536..d489a90c69c 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -222,9 +222,18 @@ class CompletionQueue : private GrpcLibraryCodegen { /// A specific type of completion queue used by the processing of notifications /// by servers. Instantiated by \a ServerBuilder. class ServerCompletionQueue : public CompletionQueue { + public: + bool IsFrequentlyPolled() { return is_frequently_polled_; } + private: + bool is_frequently_polled_; friend class ServerBuilder; - ServerCompletionQueue() {} + /// \param is_frequently_polled Informs the GPRC library about whether the + /// server completion queue would be actively polled (by calling Next() or + /// AsyncNext()). By default all server completion queues are assumed to be + /// frequently polled. + ServerCompletionQueue(bool is_frequently_polled = true) + : is_frequently_polled_(is_frequently_polled) {} }; } // namespace grpc diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 86c7fecef59..85af9aa57fb 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -102,7 +102,15 @@ class ServerBuilder { /// Add a completion queue for handling asynchronous services /// Caller is required to keep this completion queue live until /// the server is destroyed. - std::unique_ptr AddCompletionQueue(); + /// + /// \param is_frequently_polled This is an optional parameter to inform GRPC + /// library about whether this completion queue would be frequently polled + /// (i.e by calling Next() or AsyncNext()). The default value is 'true' and is + /// the recommended setting. Setting this to 'false' (i.e not polling the + /// completion queue frequently) will have a significantly negative + /// performance impact and hence should not be used in production use cases. + std::unique_ptr AddCompletionQueue( + bool is_frequently_polled = true); /// Return a running server which is ready for processing calls. std::unique_ptr BuildAndStart(); diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 5c868aece37..059bd2ebc74 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -334,6 +334,15 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, void *reserved); +/** Register a non-listening completion queue with the server. This API is + similar to grpc_server_register_completion_queue except that the server will + not use this completion_queue to listen to any incoming channels. + + Registering a non-listening completion queue will have negative performance + impact and hence this API is not recommended for production use cases. */ +GRPCAPI void grpc_server_register_non_listening_completion_queue( + grpc_server *server, grpc_completion_queue *q, void *reserved); + /** Add a HTTP2 over plaintext over tcp listener. Returns bound port number on success, 0 on failure. REQUIRES: server not started */ diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 5ec8808b508..f6f7ac880cc 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -70,6 +70,8 @@ struct grpc_completion_queue { int shutdown; int shutdown_called; int is_server_cq; + /** Can the server cq accept incoming channels */ + int is_non_listening_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; grpc_closure pollset_shutdown_done; @@ -149,6 +151,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { cc->shutdown = 0; cc->shutdown_called = 0; cc->is_server_cq = 0; + cc->is_non_listening_server_cq = 0; cc->num_pluckers = 0; #ifndef NDEBUG cc->outstanding_tag_count = 0; @@ -507,6 +510,14 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { return POLLSET_FROM_CQ(cc); } +void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { + cc->is_non_listening_server_cq = 1; +} + +bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { + return (cc->is_non_listening_server_cq == 1); +} + void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index eef82cf0148..ee3e0448401 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -82,6 +82,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); +void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc); +bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index cbfd2458741..c34ec04d2d3 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -895,23 +895,45 @@ const grpc_channel_filter grpc_server_top_filter = { "server", }; -void grpc_server_register_completion_queue(grpc_server *server, - grpc_completion_queue *cq, - void *reserved) { +static void register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + bool is_non_listening, void *reserved) { size_t i, n; - GRPC_API_TRACE( - "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, - (server, cq, reserved)); GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } - GRPC_CQ_INTERNAL_REF(cq, "server"); + grpc_cq_mark_server_cq(cq); - n = server->cq_count++; - server->cqs = gpr_realloc(server->cqs, - server->cq_count * sizeof(grpc_completion_queue *)); - server->cqs[n] = cq; + + /* Non-listening completion queues are not added to server->cqs */ + if (is_non_listening) { + grpc_cq_mark_non_listening_server_cq(cq); + } else { + GRPC_CQ_INTERNAL_REF(cq, "server"); + n = server->cq_count++; + server->cqs = gpr_realloc( + server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); + server->cqs[n] = cq; + } +} + +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, + (server, cq, reserved)); + register_completion_queue(server, cq, false, reserved); +} + +void grpc_server_register_non_listening_completion_queue( + grpc_server *server, grpc_completion_queue *cq, void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " + "reserved=%p)", + 3, (server, cq, reserved)); + register_completion_queue(server, cq, true, reserved); } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 68cc38258cf..5445d3e13bc 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -46,8 +46,9 @@ ServerBuilder::ServerBuilder() grpc_compression_options_init(&compression_options_); } -std::unique_ptr ServerBuilder::AddCompletionQueue() { - ServerCompletionQueue* cq = new ServerCompletionQueue(); +std::unique_ptr ServerBuilder::AddCompletionQueue( + bool is_frequently_polled) { + ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled); cqs_.push_back(cq); return std::unique_ptr(cq); } @@ -105,8 +106,17 @@ std::unique_ptr ServerBuilder::BuildAndStart() { std::unique_ptr server( new Server(thread_pool.release(), true, max_message_size_, &args)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), - nullptr); + // A completion queue that is not polled frequently (by calling Next() or + // AsyncNext()) is not safe to use for listening to incoming channels. + // Register all such completion queues as non-listening completion queues + // with the GRPC core library. + if ((*cq)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + nullptr); + } else { + grpc_server_register_non_listening_completion_queue(server->server_, + (*cq)->cq(), nullptr); + } } for (auto service = services_.begin(); service != services_.end(); service++) { diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 02043a89d3a..0423448154d 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test { } // Create a separate cq for each potential handler. for (int i = 0; i < 5; i++) { - cqs_.push_back(builder.AddCompletionQueue()); + cqs_.push_back(builder.AddCompletionQueue(false)); } server_ = builder.BuildAndStart(); }