Merge branch 'server_channel_affinity' of github.com:sreecha/grpc into affine

pull/6645/head
Craig Tiller 9 years ago
commit f7a670fe6d
  1. 1
      grpc.def
  2. 13
      include/grpc++/impl/codegen/completion_queue.h
  3. 12
      include/grpc++/server_builder.h
  4. 11
      include/grpc/grpc.h
  5. 76
      src/core/lib/iomgr/ev_poll_and_epoll_posix.c
  6. 5
      src/core/lib/iomgr/ev_posix.c
  7. 6
      src/core/lib/iomgr/ev_posix.h
  8. 16
      src/core/lib/iomgr/tcp_server_posix.c
  9. 13
      src/core/lib/surface/completion_queue.c
  10. 4
      src/core/lib/surface/completion_queue.h
  11. 53
      src/core/lib/surface/server.c
  12. 37
      src/cpp/server/server_builder.cc
  13. 2
      src/python/grpcio/grpc/_cython/imports.generated.c
  14. 3
      src/python/grpcio/grpc/_cython/imports.generated.h
  15. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  16. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  17. 3
      test/core/end2end/fixtures/h2_sockpair+trace.c
  18. 3
      test/core/end2end/fixtures/h2_sockpair.c
  19. 3
      test/core/end2end/fixtures/h2_sockpair_1byte.c
  20. 130
      test/core/iomgr/fd_posix_test.c
  21. 2
      test/cpp/end2end/hybrid_end2end_test.cc

@ -77,6 +77,7 @@ EXPORTS
grpc_server_request_registered_call
grpc_server_create
grpc_server_register_completion_queue
grpc_server_register_non_listening_completion_queue
grpc_server_add_insecure_http2_port
grpc_server_start
grpc_server_shutdown_and_notify

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -222,9 +222,18 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// A specific type of completion queue used by the processing of notifications
/// by servers. Instantiated by \a ServerBuilder.
class ServerCompletionQueue : public CompletionQueue {
public:
bool IsFrequentlyPolled() { return is_frequently_polled_; }
private:
bool is_frequently_polled_;
friend class ServerBuilder;
ServerCompletionQueue() {}
/// \param is_frequently_polled Informs the GPRC library about whether the
/// server completion queue would be actively polled (by calling Next() or
/// AsyncNext()). By default all server completion queues are assumed to be
/// frequently polled.
ServerCompletionQueue(bool is_frequently_polled = true)
: is_frequently_polled_(is_frequently_polled) {}
};
} // namespace grpc

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -108,7 +108,15 @@ class ServerBuilder {
/// Add a completion queue for handling asynchronous services
/// Caller is required to keep this completion queue live until
/// the server is destroyed.
std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
///
/// \param is_frequently_polled This is an optional parameter to inform GRPC
/// library about whether this completion queue would be frequently polled
/// (i.e by calling Next() or AsyncNext()). The default value is 'true' and is
/// the recommended setting. Setting this to 'false' (i.e not polling the
/// completion queue frequently) will have a significantly negative
/// performance impact and hence should not be used in production use cases.
std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(
bool is_frequently_polled = true);
/// Return a running server which is ready for processing calls.
std::unique_ptr<Server> BuildAndStart();

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -334,6 +334,15 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved);
/** Register a non-listening completion queue with the server. This API is
similar to grpc_server_register_completion_queue except that the server will
not use this completion_queue to listen to any incoming channels.
Registering a non-listening completion queue will have negative performance
impact and hence this API is not recommended for production use cases. */
GRPCAPI void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *q, void *reserved);
/** Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.
REQUIRES: server not started */

@ -126,6 +126,9 @@ struct grpc_fd {
grpc_closure *on_done_closure;
grpc_iomgr_object iomgr_object;
/* The pollset that last noticed and notified that the fd is readable */
grpc_pollset *read_notifier_pollset;
};
/* Begin polling on an fd.
@ -147,7 +150,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
if got_read or got_write are 1, also does the become_{readable,writable} as
appropriate. */
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
int got_read, int got_write);
int got_read, int got_write,
grpc_pollset *read_notifier_pollset);
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd *fd);
@ -342,6 +346,7 @@ static grpc_fd *alloc_fd(int fd) {
r->on_done_closure = NULL;
r->closed = 0;
r->released = 0;
r->read_notifier_pollset = NULL;
gpr_mu_unlock(&r->mu);
return r;
}
@ -545,6 +550,11 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
}
static void set_read_notifier_pollset_locked(
grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
fd->read_notifier_pollset = read_notifier_pollset;
}
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
GPR_ASSERT(!fd->shutdown);
@ -568,6 +578,18 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
/* Return the read-notifier pollset */
static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd) {
grpc_pollset *notifier = NULL;
gpr_mu_lock(&fd->mu);
notifier = fd->read_notifier_pollset;
gpr_mu_unlock(&fd->mu);
return notifier;
}
static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
grpc_pollset_worker *worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher *watcher) {
@ -620,7 +642,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
}
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
int got_read, int got_write) {
int got_read, int got_write,
grpc_pollset *read_notifier_pollset) {
int was_polling = 0;
int kick = 0;
grpc_fd *fd = watcher->fd;
@ -656,6 +679,10 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
kick = 1;
}
if (read_notifier_pollset != NULL) {
set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
}
}
if (got_write) {
if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
@ -756,9 +783,14 @@ static void pollset_kick_ext(grpc_pollset *p,
specific_worker = pop_front_worker(p);
if (specific_worker != NULL) {
if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
/* Prefer not to kick self. Push the worker to the end of the list and
* pop the one from front */
GPR_TIMER_MARK("kick_anonymous_not_self", 0);
push_back_worker(p, specific_worker);
specific_worker = pop_front_worker(p);
/* If there was only one worker on the pollset, we would get the same
* worker we pushed (the one set on current thread local) back. If so,
* kick it only if GRPC_POLLSET_CAN_KICK_SELF flag is set */
if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
@ -1201,11 +1233,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
if (fd) {
fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
} else if (r == 0) {
if (fd) {
fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
} else {
if (pfd[0].revents & POLLIN_CHECK) {
@ -1216,9 +1248,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
}
if (nfds > 2) {
fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
pfd[2].revents & POLLOUT_CHECK);
pfd[2].revents & POLLOUT_CHECK, pollset);
} else if (fd) {
fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
}
@ -1354,11 +1386,11 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
for (i = 2; i < pfd_count; i++) {
fd_end_poll(exec_ctx, &watchers[i], 0, 0);
fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else if (r == 0) {
for (i = 2; i < pfd_count; i++) {
fd_end_poll(exec_ctx, &watchers[i], 0, 0);
fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
@ -1369,11 +1401,11 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
}
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
fd_end_poll(exec_ctx, &watchers[i], 0, 0);
fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
continue;
}
fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
pfds[i].revents & POLLOUT_CHECK);
pfds[i].revents & POLLOUT_CHECK, pollset);
}
}
@ -1449,20 +1481,31 @@ static void poll_become_multipoller(grpc_exec_ctx *exec_ctx,
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st,
grpc_pollset *read_notifier_pollset) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
gpr_mu_lock(&fd->mu);
set_ready_locked(exec_ctx, fd, st);
/* A non-NULL read_notifier_pollset means that the fd is readable. */
if (read_notifier_pollset != NULL) {
/* Note: Since the fd might be a part of multiple pollsets, this might be
* called multiple times (for each time the fd becomes readable) and it is
* okay to set the fd's read-notifier pollset to anyone of these pollsets */
set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
}
gpr_mu_unlock(&fd->mu);
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
set_ready(exec_ctx, fd, &fd->read_closure);
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier_pollset) {
set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
set_ready(exec_ctx, fd, &fd->write_closure);
set_ready(exec_ctx, fd, &fd->write_closure, NULL);
}
struct epoll_fd_list {
@ -1554,7 +1597,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
}
fd_end_poll(exec_ctx, &watcher, 0, 0);
fd_end_poll(exec_ctx, &watcher, 0, 0, NULL);
}
static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
@ -1668,7 +1711,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
} else {
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd);
fd_become_readable(exec_ctx, fd, pollset);
}
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
@ -1897,6 +1940,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_shutdown = fd_shutdown,
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,

@ -163,6 +163,11 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
g_event_engine->fd_notify_on_write(exec_ctx, fd, closure);
}
grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd) {
return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd);
}
size_t grpc_pollset_size(void) { return g_event_engine->pollset_size; }
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {

@ -55,6 +55,8 @@ typedef struct grpc_event_engine_vtable {
grpc_closure *closure);
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
grpc_fd *fd);
void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu);
void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@ -137,6 +139,10 @@ void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
/* Return the read notifier pollset from the fd */
grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd);
/* pollset_posix functions */
/* Add an fd to a pollset */

@ -310,13 +310,15 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
grpc_pollset *read_notifier_pollset = NULL;
grpc_fd *fdobj;
size_t i;
if (!success) {
goto error;
}
read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd);
/* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) {
struct sockaddr_storage addr;
@ -349,12 +351,14 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
}
fdobj = grpc_fd_create(fd, name);
/* TODO(ctiller): revise this when we have server-side sharding
of channels -- we certainly should not be automatically adding every
incoming channel to every pollset owned by the server */
for (i = 0; i < sp->server->pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, sp->server->pollsets[i], fdobj);
if (read_notifier_pollset == NULL) {
gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
goto error;
}
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -70,6 +70,8 @@ struct grpc_completion_queue {
int shutdown;
int shutdown_called;
int is_server_cq;
/** Can the server cq accept incoming channels */
int is_non_listening_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
@ -149,6 +151,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->shutdown = 0;
cc->shutdown_called = 0;
cc->is_server_cq = 0;
cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
@ -511,6 +514,14 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return POLLSET_FROM_CQ(cc);
}
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
cc->is_non_listening_server_cq = 1;
}
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
return (cc->is_non_listening_server_cq == 1);
}
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -82,6 +82,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc);
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -895,23 +895,45 @@ const grpc_channel_filter grpc_server_top_filter = {
"server",
};
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved) {
static void register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
bool is_non_listening, void *reserved) {
size_t i, n;
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
GRPC_CQ_INTERNAL_REF(cq, "server");
grpc_cq_mark_server_cq(cq);
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *));
server->cqs[n] = cq;
/* Non-listening completion queues are not added to server->cqs */
if (is_non_listening) {
grpc_cq_mark_non_listening_server_cq(cq);
} else {
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(
server->cqs, server->cq_count * sizeof(grpc_completion_queue *));
server->cqs[n] = cq;
}
}
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved) {
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
register_completion_queue(server, cq, false, reserved);
}
void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *cq, void *reserved) {
GRPC_API_TRACE(
"grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, "
"reserved=%p)",
3, (server, cq, reserved));
register_completion_queue(server, cq, true, reserved);
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
@ -1018,7 +1040,6 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
const grpc_channel_args *args) {
size_t i;
size_t num_registered_methods;
size_t alloc;
registered_method *rm;
@ -1033,12 +1054,6 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
uint32_t max_probes = 0;
grpc_transport_op op;
for (i = 0; i < s->cq_count; i++) {
memset(&op, 0, sizeof(op));
op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
grpc_transport_perform_op(exec_ctx, transport, &op);
}
channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
chand = (channel_data *)grpc_channel_stack_element(

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -60,8 +60,9 @@ ServerBuilder::ServerBuilder()
}
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
ServerCompletionQueue* cq = new ServerCompletionQueue();
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
bool is_frequently_polled) {
ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled);
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
@ -99,8 +100,11 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr,
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<ThreadPoolInterface> thread_pool;
// Does this server have atleast one sync method
bool has_sync_methods = false;
for (auto it = services_.begin(); it != services_.end(); ++it) {
if ((*it)->service->has_synchronous_methods()) {
has_sync_methods = true;
if (thread_pool == nullptr) {
thread_pool.reset(CreateDefaultThreadPool());
break;
@ -128,10 +132,33 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args));
ServerInitializer* initializer = server->initializer();
// If the server has atleast one sync methods, we know that this is a Sync
// server or a Hybrid server and the completion queue (server->cq_) would be
// frequently polled.
int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
// A completion queue that is not polled frequently (by calling Next() or
// AsyncNext()) is not safe to use for listening to incoming channels.
// Register all such completion queues as non-listening completion queues
// with the GRPC core library.
if ((*cq)->IsFrequentlyPolled()) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
(*cq)->cq(), nullptr);
}
}
if (num_frequently_polled_cqs == 0) {
gpr_log(GPR_ERROR,
"Atleast one of the completion queues must be frequently polled");
return nullptr;
}
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService((*service)->host.get(), (*service)->service)) {

@ -115,6 +115,7 @@ grpc_server_register_method_type grpc_server_register_method_import;
grpc_server_request_registered_call_type grpc_server_request_registered_call_import;
grpc_server_create_type grpc_server_create_import;
grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
grpc_server_start_type grpc_server_start_import;
grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import;
@ -386,6 +387,7 @@ void pygrpc_load_imports(HMODULE library) {
grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call");
grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create");
grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue");
grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue");
grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port");
grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start");
grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify");

@ -296,6 +296,9 @@ extern grpc_server_create_type grpc_server_create_import;
typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved);
extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
#define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved);
extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import
typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr);
extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
#define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import

@ -115,6 +115,7 @@ grpc_server_register_method_type grpc_server_register_method_import;
grpc_server_request_registered_call_type grpc_server_request_registered_call_import;
grpc_server_create_type grpc_server_create_import;
grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
grpc_server_start_type grpc_server_start_import;
grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import;
@ -382,6 +383,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call");
grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create");
grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue");
grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue");
grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port");
grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start");
grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify");

@ -296,6 +296,9 @@ extern grpc_server_create_type grpc_server_create_import;
typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved);
extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
#define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved);
extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import
typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr);
extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
#define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import

@ -50,6 +50,7 @@
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@ -60,6 +61,8 @@
static void server_setup_transport(void *ts, grpc_transport *transport) {
grpc_end2end_test_fixture *f = ts;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_pair *sfd = f->fixture_data;
grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq));
grpc_server_setup_transport(&exec_ctx, f->server, transport,
grpc_server_get_channel_args(f->server));
grpc_exec_ctx_finish(&exec_ctx);

@ -49,6 +49,7 @@
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@ -59,6 +60,8 @@
static void server_setup_transport(void *ts, grpc_transport *transport) {
grpc_end2end_test_fixture *f = ts;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_pair *sfd = f->fixture_data;
grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq));
grpc_server_setup_transport(&exec_ctx, f->server, transport,
grpc_server_get_channel_args(f->server));
grpc_exec_ctx_finish(&exec_ctx);

@ -49,6 +49,7 @@
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@ -59,6 +60,8 @@
static void server_setup_transport(void *ts, grpc_transport *transport) {
grpc_end2end_test_fixture *f = ts;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_pair *sfd = f->fixture_data;
grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq));
grpc_server_setup_transport(&exec_ctx, f->server, transport,
grpc_server_get_channel_args(f->server));
grpc_exec_ctx_finish(&exec_ctx);

@ -518,6 +518,134 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) {
grpc_pollset_destroy(p);
}
typedef struct read_notifier_test_fd_context {
grpc_fd *fd;
bool is_cb_called;
} read_notifier_test_fd_context;
static void read_notifier_test_callback(
grpc_exec_ctx *exec_ctx, void *arg /* (read_notifier_test_fd_context *) */,
bool success) {
read_notifier_test_fd_context *fd_context = arg;
grpc_fd *fd = fd_context->fd;
/* Verify that the read notifier pollset is set */
GPR_ASSERT(grpc_fd_get_read_notifier_pollset(exec_ctx, fd) != NULL);
fd_context->is_cb_called = true;
}
/* sv MUST to be an array of size 2 */
static void get_socket_pair(int sv[]) {
int flags = 0;
GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
flags = fcntl(sv[0], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
static grpc_pollset *create_grpc_pollset(gpr_mu **mu) {
grpc_pollset *pollset = gpr_malloc(grpc_pollset_size());
grpc_pollset_init(pollset, mu);
return pollset;
}
static void free_grpc_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
grpc_closure destroyed;
grpc_closure_init(&destroyed, destroy_pollset, pollset);
grpc_pollset_shutdown(exec_ctx, pollset, &destroyed);
grpc_exec_ctx_flush(exec_ctx);
gpr_free(pollset);
}
/* This tests that the read_notifier_pollset field of a grpc_fd is properly
set when the grpc_fd becomes readable
- This tests both basic and multi pollsets
- The parameter register_cb_after_read_event controls whether the on-read
callback registration (i.e the one done by grpc_fd_notify_on_read()) is
done either before or after the fd becomes readable
*/
static void test_grpc_fd_read_notifier_pollset(
bool register_cb_after_read_event) {
grpc_fd *em_fd[2];
int sv[2][2];
gpr_mu *mu[2];
grpc_pollset *pollset[2];
char data;
ssize_t result;
int i;
grpc_pollset_worker *worker;
read_notifier_test_fd_context fd_context;
grpc_closure on_read_closure;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
for (i = 0; i < 2; i++) {
pollset[i] = create_grpc_pollset(&mu[i]);
get_socket_pair(sv[i]); /* sv[i][0] & sv[i][1] will have the socket pair */
em_fd[i] = grpc_fd_create(sv[i][0], "test_grpc_fd_read_notifier_pollset");
grpc_pollset_add_fd(&exec_ctx, pollset[i], em_fd[i]);
}
/* At this point pollset[0] has em_fd[0] and pollset[1] has em_fd[1] and both
are basic pollsets. Make pollset[1] a multi-pollset by adding em_fd[0] to
it */
grpc_pollset_add_fd(&exec_ctx, pollset[1], em_fd[0]);
grpc_exec_ctx_flush(&exec_ctx);
/* The following tests that the read_notifier_pollset is correctly set on the
grpc_fd structure in both basic pollset and multi pollset cases.
pollset[0] is a basic pollset containing just em_fd[0]
pollset[1] is a multi pollset containing em_fd[0] and em_fd[1] */
for (i = 0; i < 2; i++) {
on_read_closure.cb = read_notifier_test_callback;
fd_context.fd = em_fd[i];
fd_context.is_cb_called = false;
on_read_closure.cb_arg = &fd_context;
if (!register_cb_after_read_event) {
/* Registering the callback BEFORE the fd is readable */
grpc_fd_notify_on_read(&exec_ctx, em_fd[i], &on_read_closure);
}
data = 0;
result = write(sv[i][1], &data, sizeof(data));
GPR_ASSERT(result == 1);
/* grpc_pollset_work requires the caller to hold the pollset mutex */
gpr_mu_lock(mu[i]);
worker = NULL;
grpc_pollset_work(&exec_ctx, pollset[i], &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
gpr_mu_unlock(mu[i]);
grpc_exec_ctx_flush(&exec_ctx);
if (register_cb_after_read_event) {
/* Registering the callback after the fd is readable. In this case, the
callback should be executed right away. */
grpc_fd_notify_on_read(&exec_ctx, em_fd[i], &on_read_closure);
grpc_exec_ctx_flush(&exec_ctx);
}
/* The callback should have been called by now */
GPR_ASSERT(fd_context.is_cb_called);
/* Drain the socket (Not really needed for the test) */
result = read(sv[i][0], &data, 1);
GPR_ASSERT(result == 1);
}
/* Clean up */
for (i = 0; i < 2; i++) {
grpc_fd_orphan(&exec_ctx, em_fd[i], NULL, NULL, "");
close(sv[i][1]);
free_grpc_pollset(&exec_ctx, pollset[i]);
}
grpc_exec_ctx_finish(&exec_ctx);
}
int main(int argc, char **argv) {
grpc_closure destroyed;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -527,6 +655,8 @@ int main(int argc, char **argv) {
grpc_pollset_init(g_pollset, &g_mu);
test_grpc_fd();
test_grpc_fd_change();
test_grpc_fd_read_notifier_pollset(false);
test_grpc_fd_read_notifier_pollset(true);
grpc_closure_init(&destroyed, destroy_pollset, g_pollset);
grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
grpc_exec_ctx_finish(&exec_ctx);

@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test {
}
// Create a separate cq for each potential handler.
for (int i = 0; i < 5; i++) {
cqs_.push_back(builder.AddCompletionQueue());
cqs_.push_back(builder.AddCompletionQueue(false));
}
server_ = builder.BuildAndStart();
}

Loading…
Cancel
Save