Get rid of second api for marking non-listening cqs

pull/10584/head
Craig Tiller 8 years ago
parent 58aa706aaf
commit 11c5832b3e
  1. 9
      include/grpc/grpc.h
  2. 12
      src/core/lib/surface/completion_queue.c
  3. 5
      src/core/lib/surface/completion_queue.h
  4. 23
      src/core/lib/surface/server.c
  5. 18
      src/cpp/server/server_builder.cc

@ -376,15 +376,6 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved);
/** Register a non-listening completion queue with the server. This API is
similar to grpc_server_register_completion_queue except that the server will
not use this completion_queue to listen to any incoming channels.
Registering a non-listening completion queue will have negative performance
impact and hence this API is not recommended for production use cases. */
GRPCAPI void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *q, void *reserved);
/** Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.
REQUIRES: server not started */

@ -62,6 +62,7 @@ typedef struct {
typedef struct {
bool can_get_pollset;
bool can_listen;
size_t (*size)(void);
void (*init)(grpc_pollset *pollset, gpr_mu **mu);
grpc_error *(*kick)(grpc_pollset *pollset,
@ -172,6 +173,7 @@ static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
/* GRPC_CQ_DEFAULT_POLLING */
{.can_get_pollset = true,
.can_listen = true,
.size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
@ -180,6 +182,7 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_LISTENING */
{.can_get_pollset = true,
.can_listen = false,
.size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
@ -188,6 +191,7 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_POLLING */
{.can_get_pollset = false,
.can_listen = false,
.size = non_polling_poller_size,
.init = non_polling_poller_init,
.kick = non_polling_poller_kick,
@ -863,4 +867,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
return cc->is_server_cq;
}
bool grpc_cq_can_listen(grpc_completion_queue *cc) {
return cc->poller_vtable->can_listen;
}

@ -94,10 +94,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps);
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc);
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
bool grpc_cq_is_server_cq(grpc_completion_queue *cc);
bool grpc_cq_can_listen(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);

@ -974,7 +974,7 @@ const grpc_channel_filter grpc_server_top_filter = {
static void register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
bool is_non_listening, void *reserved) {
void *reserved) {
size_t i, n;
GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
@ -983,10 +983,6 @@ static void register_completion_queue(grpc_server *server,
grpc_cq_mark_server_cq(cq);
if (is_non_listening) {
grpc_cq_mark_non_listening_server_cq(cq);
}
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
@ -1009,16 +1005,7 @@ void grpc_server_register_completion_queue(grpc_server *server,
calls grpc_completion_queue_pluck() on server completion queues */
}
register_completion_queue(server, cq, false, reserved);
}
void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *cq, void *reserved) {
GRPC_API_TRACE(
"grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, "
"reserved=%p)",
3, (server, cq, reserved));
register_completion_queue(server, cq, true, reserved);
register_completion_queue(server, cq, reserved);
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
@ -1101,9 +1088,9 @@ void grpc_server_start(grpc_server *server) {
server->requested_calls_per_cq =
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
grpc_pollset *pollset = grpc_cq_pollset(server->cqs[i]);
if (pollset != NULL) server->pollsets[server->pollset_count++] = pollset;
if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}
server->request_freelist_per_cq[i] =
gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);

@ -284,13 +284,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// 2. cqs_: Completion queues added via AddCompletionQueue() call
for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
if (is_hybrid_server) {
grpc_server_register_non_listening_completion_queue(server->server_,
(*it)->cq(), nullptr);
} else {
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
}
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
num_frequently_polled_cqs++;
}
@ -300,13 +295,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// listening to incoming channels. Such completion queues must be registered
// as non-listening queues
for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
if ((*it)->IsFrequentlyPolled()) {
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
(*it)->cq(), nullptr);
}
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
}
if (num_frequently_polled_cqs == 0) {

Loading…
Cancel
Save