Add the option of adding a non-listening server completion queue. This makes writing certain test cases (like hybrid_end2end tests) easier

pull/6149/head
Sree Kuchibhotla 9 years ago
parent 020cdb8b21
commit 1f5e262589
  1. 11
      include/grpc++/impl/codegen/completion_queue.h
  2. 10
      include/grpc++/server_builder.h
  3. 9
      include/grpc/grpc.h
  4. 11
      src/core/lib/surface/completion_queue.c
  5. 2
      src/core/lib/surface/completion_queue.h
  6. 44
      src/core/lib/surface/server.c
  7. 18
      src/cpp/server/server_builder.cc
  8. 2
      test/cpp/end2end/hybrid_end2end_test.cc

@ -222,9 +222,18 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// A specific type of completion queue used by the processing of notifications
/// by servers. Instantiated by \a ServerBuilder.
class ServerCompletionQueue : public CompletionQueue {
public:
bool IsFrequentlyPolled() { return is_frequently_polled_; }
private:
bool is_frequently_polled_;
friend class ServerBuilder;
ServerCompletionQueue() {}
/// \param is_frequently_polled Informs the GPRC library about whether the
/// server completion queue would be actively polled (by calling Next() or
/// AsyncNext()). By default all server completion queues are assumed to be
/// frequently polled.
ServerCompletionQueue(bool is_frequently_polled = true)
: is_frequently_polled_(is_frequently_polled) {}
};
} // namespace grpc

@ -102,7 +102,15 @@ class ServerBuilder {
/// Add a completion queue for handling asynchronous services
/// Caller is required to keep this completion queue live until
/// the server is destroyed.
std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
///
/// \param is_frequently_polled This is an optional parameter to inform GRPC
/// library about whether this completion queue would be frequently polled
/// (i.e by calling Next() or AsyncNext()). The default value is 'true' and is
/// the recommended setting. Setting this to 'false' (i.e not polling the
/// completion queue frequently) will have a significantly negative
/// performance impact and hence should not be used in production use cases.
std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(
bool is_frequently_polled = true);
/// Return a running server which is ready for processing calls.
std::unique_ptr<Server> BuildAndStart();

@ -334,6 +334,15 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved);
/** Register a non-listening completion queue with the server. This API is
similar to grpc_server_register_completion_queue except that the server will
not use this completion_queue to listen to any incoming channels.
Registering a non-listening completion queue will have negative performance
impact and hence this API is not recommended for production use cases. */
GRPCAPI void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *q, void *reserved);
/** Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.
REQUIRES: server not started */

@ -70,6 +70,8 @@ struct grpc_completion_queue {
int shutdown;
int shutdown_called;
int is_server_cq;
/** Can the server cq accept incoming channels */
int is_non_listening_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
@ -149,6 +151,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->shutdown = 0;
cc->shutdown_called = 0;
cc->is_server_cq = 0;
cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
@ -507,6 +510,14 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return POLLSET_FROM_CQ(cc);
}
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
cc->is_non_listening_server_cq = 1;
}
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
return (cc->is_non_listening_server_cq == 1);
}
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }

@ -82,6 +82,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc);
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);

@ -895,23 +895,45 @@ const grpc_channel_filter grpc_server_top_filter = {
"server",
};
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved) {
static void register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
bool is_non_listening, void *reserved) {
size_t i, n;
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
GRPC_CQ_INTERNAL_REF(cq, "server");
grpc_cq_mark_server_cq(cq);
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *));
server->cqs[n] = cq;
/* Non-listening completion queues are not added to server->cqs */
if (is_non_listening) {
grpc_cq_mark_non_listening_server_cq(cq);
} else {
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(
server->cqs, server->cq_count * sizeof(grpc_completion_queue *));
server->cqs[n] = cq;
}
}
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved) {
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
register_completion_queue(server, cq, false, reserved);
}
void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *cq, void *reserved) {
GRPC_API_TRACE(
"grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, "
"reserved=%p)",
3, (server, cq, reserved));
register_completion_queue(server, cq, true, reserved);
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {

@ -46,8 +46,9 @@ ServerBuilder::ServerBuilder()
grpc_compression_options_init(&compression_options_);
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
ServerCompletionQueue* cq = new ServerCompletionQueue();
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
bool is_frequently_polled) {
ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled);
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
@ -105,8 +106,17 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
// A completion queue that is not polled frequently (by calling Next() or
// AsyncNext()) is not safe to use for listening to incoming channels.
// Register all such completion queues as non-listening completion queues
// with the GRPC core library.
if ((*cq)->IsFrequentlyPolled()) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
(*cq)->cq(), nullptr);
}
}
for (auto service = services_.begin(); service != services_.end();
service++) {

@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test {
}
// Create a separate cq for each potential handler.
for (int i = 0; i < 5; i++) {
cqs_.push_back(builder.AddCompletionQueue());
cqs_.push_back(builder.AddCompletionQueue(false));
}
server_ = builder.BuildAndStart();
}

Loading…
Cancel
Save