Merge github.com:grpc/grpc into epex4

pull/10712/head
Craig Tiller 8 years ago
commit 890b584ee9
  1. 1
      grpc.def
  2. 4
      include/grpc++/impl/codegen/client_unary_call.h
  3. 36
      include/grpc++/impl/codegen/completion_queue.h
  4. 9
      include/grpc++/impl/codegen/core_codegen.h
  5. 6
      include/grpc++/impl/codegen/core_codegen_interface.h
  6. 12
      include/grpc++/impl/codegen/sync_stream.h
  7. 58
      include/grpc/grpc.h
  8. 49
      include/grpc/impl/codegen/grpc_types.h
  9. 186
      src/core/lib/surface/completion_queue.c
  10. 6
      src/core/lib/surface/completion_queue.h
  11. 19
      src/core/lib/surface/server.c
  12. 12
      src/cpp/common/core_codegen.cc
  13. 32
      src/cpp/server/server_builder.cc
  14. 8
      src/node/ext/server_generic.cc
  15. 39
      src/proto/grpc/health/v1/BUILD
  16. 3
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  17. 11
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  18. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  19. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  20. 1
      test/cpp/end2end/BUILD
  21. 28
      test/cpp/end2end/async_end2end_test.cc

@ -89,7 +89,6 @@ EXPORTS
grpc_server_request_registered_call
grpc_server_create
grpc_server_register_completion_queue
grpc_server_register_non_listening_completion_queue
grpc_server_add_insecure_http2_port
grpc_server_start
grpc_server_shutdown_and_notify

@ -52,7 +52,9 @@ template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const InputMessage& request,
OutputMessage* result) {
CompletionQueue cq(true); // Pluckable completion queue
CompletionQueue cq(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
GRPC_CQ_DEFAULT_POLLING}); // Pluckable completion queue
Call call(channel->CreateCall(method, context, &cq));
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,

@ -102,7 +102,9 @@ class CompletionQueue : private GrpcLibraryCodegen {
public:
/// Default constructor. Implicitly creates a \a grpc_completion_queue
/// instance.
CompletionQueue() : CompletionQueue(false) {}
CompletionQueue()
: CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING}) {}
/// Wrap \a take, taking ownership of the instance.
///
@ -182,6 +184,16 @@ class CompletionQueue : private GrpcLibraryCodegen {
};
void CompleteAvalanching();
protected:
/// Private constructor of CompletionQueue only visible to friend classes
CompletionQueue(const grpc_completion_queue_attributes& attributes) {
cq_ = g_core_codegen_interface->grpc_completion_queue_create(
g_core_codegen_interface->grpc_completion_queue_factory_lookup(
&attributes),
&attributes, NULL);
InitialAvalanching(); // reserve this for the future shutdown
}
private:
// Friend synchronous wrappers so that they can access Pluck(), which is
// a semi-private API geared towards the synchronous implementation.
@ -215,18 +227,6 @@ class CompletionQueue : private GrpcLibraryCodegen {
const InputMessage& request,
OutputMessage* result);
/// Private constructor of CompletionQueue only visible to friend classes
CompletionQueue(bool is_pluck) {
if (is_pluck) {
cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_pluck(
nullptr);
} else {
cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_next(
nullptr);
}
InitialAvalanching(); // reserve this for the future shutdown
}
NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
/// Wraps \a grpc_completion_queue_pluck.
@ -289,17 +289,19 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// by servers. Instantiated by \a ServerBuilder.
class ServerCompletionQueue : public CompletionQueue {
public:
bool IsFrequentlyPolled() { return is_frequently_polled_; }
bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; }
private:
bool is_frequently_polled_;
grpc_cq_polling_type polling_type_;
friend class ServerBuilder;
/// \param is_frequently_polled Informs the GRPC library about whether the
/// server completion queue would be actively polled (by calling Next() or
/// AsyncNext()). By default all server completion queues are assumed to be
/// frequently polled.
ServerCompletionQueue(bool is_frequently_polled = true)
: is_frequently_polled_(is_frequently_polled) {}
ServerCompletionQueue(grpc_cq_polling_type polling_type)
: CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type}),
polling_type_(polling_type) {}
};
} // namespace grpc

@ -44,8 +44,15 @@
namespace grpc {
/// Implementation of the core codegen interface.
class CoreCodegen : public CoreCodegenInterface {
class CoreCodegen final : public CoreCodegenInterface {
private:
virtual const grpc_completion_queue_factory*
grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) override;
virtual grpc_completion_queue* grpc_completion_queue_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attributes,
void* reserved) override;
grpc_completion_queue* grpc_completion_queue_create_for_next(
void* reserved) override;
grpc_completion_queue* grpc_completion_queue_create_for_pluck(

@ -59,6 +59,12 @@ class CoreCodegenInterface {
virtual void assert_fail(const char* failed_assertion, const char* file,
int line) = 0;
virtual const grpc_completion_queue_factory*
grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attributes, void* reserved) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create_for_next(
void* reserved) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create_for_pluck(

@ -156,7 +156,9 @@ class ClientReader final : public ClientReaderInterface<R> {
ClientReader(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const W& request)
: context_(context),
cq_(true), // Pluckable cq
cq_(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose>
@ -230,7 +232,9 @@ class ClientWriter : public ClientWriterInterface<W> {
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, R* response)
: context_(context),
cq_(true), // Pluckable cq
cq_(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
@ -330,7 +334,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context),
cq_(true), // Pluckable cq
cq_(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
if (!context_->initial_metadata_corked_) {
CallOpSet<CallOpSendInitialMetadata> ops;

@ -93,55 +93,6 @@ GRPCAPI const char *grpc_version_string(void);
/** Return a string specifying what the 'g' in gRPC stands for */
GRPCAPI const char *grpc_g_stands_for(void);
/** Specifies the type of APIs to use to pop events from the completion queue */
typedef enum {
/** Events are popped out by calling grpc_completion_queue_next() API ONLY */
GRPC_CQ_NEXT = 1,
/** Events are popped out by calling grpc_completion_queue_pluck() API ONLY*/
GRPC_CQ_PLUCK
} grpc_cq_completion_type;
/** Completion queues internally MAY maintain a set of file descriptors in a
structure called 'pollset'. This enum specifies if a completion queue has an
associated pollset and any restrictions on the type of file descriptors that
can be present in the pollset.
I/O progress can only be made when grpc_completion_queue_next() or
grpc_completion_queue_pluck() are called on the completion queue (unless the
grpc_cq_polling_type is GRPC_CQ_NON_POLLING) and hence it is very important
to actively call these APIs */
typedef enum {
/** The completion queue will have an associated pollset and there is no
restriction on the type of file descriptors the pollset may contain */
GRPC_CQ_DEFAULT_POLLING,
/** Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will
not contain any 'listening file descriptors' (i.e file descriptors used to
listen to incoming channels) */
GRPC_CQ_NON_LISTENING,
/** The completion queue will not have an associated pollset. Note that
grpc_completion_queue_next() or grpc_completion_queue_pluck() MUST still
be called to pop events from the completion queue; it is not required to
call them actively to make I/O progress */
GRPC_CQ_NON_POLLING
} grpc_cq_polling_type;
#define GRPC_CQ_CURRENT_VERSION 1
typedef struct grpc_completion_queue_attributes {
/* The version number of this structure. More fields might be added to this
structure in future. */
int version; /* Set to GRPC_CQ_CURRENT_VERSION */
grpc_cq_completion_type cq_completion_type;
grpc_cq_polling_type cq_polling_type;
} grpc_completion_queue_attributes;
/** The completion queue factory structure is opaque to the callers of grpc */
typedef struct grpc_completion_queue_factory grpc_completion_queue_factory;
/** Returns the completion queue factory based on the attributes. MAY return a
NULL if no factory can be found */
GRPCAPI const grpc_completion_queue_factory *
@ -427,15 +378,6 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved);
/** Register a non-listening completion queue with the server. This API is
similar to grpc_server_register_completion_queue except that the server will
not use this completion_queue to listen to any incoming channels.
Registering a non-listening completion queue will have negative performance
impact and hence this API is not recommended for production use cases. */
GRPCAPI void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *q, void *reserved);
/** Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.
REQUIRES: server not started */

@ -556,6 +556,55 @@ typedef struct {
typedef struct grpc_resource_quota grpc_resource_quota;
/** Completion queues internally MAY maintain a set of file descriptors in a
structure called 'pollset'. This enum specifies if a completion queue has an
associated pollset and any restrictions on the type of file descriptors that
can be present in the pollset.
I/O progress can only be made when grpc_completion_queue_next() or
grpc_completion_queue_pluck() are called on the completion queue (unless the
grpc_cq_polling_type is GRPC_CQ_NON_POLLING) and hence it is very important
to actively call these APIs */
typedef enum {
/** The completion queue will have an associated pollset and there is no
restriction on the type of file descriptors the pollset may contain */
GRPC_CQ_DEFAULT_POLLING,
/** Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will
not contain any 'listening file descriptors' (i.e file descriptors used to
listen to incoming channels) */
GRPC_CQ_NON_LISTENING,
/** The completion queue will not have an associated pollset. Note that
grpc_completion_queue_next() or grpc_completion_queue_pluck() MUST still
be called to pop events from the completion queue; it is not required to
call them actively to make I/O progress */
GRPC_CQ_NON_POLLING
} grpc_cq_polling_type;
/** Specifies the type of APIs to use to pop events from the completion queue */
typedef enum {
/** Events are popped out by calling grpc_completion_queue_next() API ONLY */
GRPC_CQ_NEXT = 1,
/** Events are popped out by calling grpc_completion_queue_pluck() API ONLY*/
GRPC_CQ_PLUCK
} grpc_cq_completion_type;
#define GRPC_CQ_CURRENT_VERSION 1
typedef struct grpc_completion_queue_attributes {
/* The version number of this structure. More fields might be added to this
structure in future. */
int version; /* Set to GRPC_CQ_CURRENT_VERSION */
grpc_cq_completion_type cq_completion_type;
grpc_cq_polling_type cq_polling_type;
} grpc_completion_queue_attributes;
/** The completion queue factory structure is opaque to the callers of grpc */
typedef struct grpc_completion_queue_factory grpc_completion_queue_factory;
#ifdef __cplusplus
}
#endif

@ -60,13 +60,154 @@ typedef struct {
void *tag;
} plucker;
typedef struct {
bool can_get_pollset;
bool can_listen;
size_t (*size)(void);
void (*init)(grpc_pollset *pollset, gpr_mu **mu);
grpc_error *(*kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
} cq_poller_vtable;
typedef struct non_polling_worker {
gpr_cv cv;
bool kicked;
struct non_polling_worker *next;
struct non_polling_worker *prev;
} non_polling_worker;
typedef struct {
gpr_mu mu;
non_polling_worker *root;
grpc_closure *shutdown;
} non_polling_poller;
static size_t non_polling_poller_size(void) {
return sizeof(non_polling_poller);
}
static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) {
non_polling_poller *npp = (non_polling_poller *)pollset;
gpr_mu_init(&npp->mu);
*mu = &npp->mu;
}
static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
non_polling_poller *npp = (non_polling_poller *)pollset;
gpr_mu_destroy(&npp->mu);
}
static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_pollset_worker **worker,
gpr_timespec now,
gpr_timespec deadline) {
non_polling_poller *npp = (non_polling_poller *)pollset;
if (npp->shutdown) return GRPC_ERROR_NONE;
non_polling_worker w;
gpr_cv_init(&w.cv);
if (worker != NULL) *worker = (grpc_pollset_worker *)&w;
if (npp->root == NULL) {
npp->root = w.next = w.prev = &w;
} else {
w.next = npp->root;
w.prev = w.next->prev;
w.next->prev = w.prev->next = &w;
}
w.kicked = false;
while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline))
;
if (&w == npp->root) {
npp->root = w.next;
if (&w == npp->root) {
if (npp->shutdown) {
grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
}
npp->root = NULL;
}
}
w.next->prev = w.prev;
w.prev->next = w.next;
gpr_cv_destroy(&w.cv);
if (worker != NULL) *worker = NULL;
return GRPC_ERROR_NONE;
}
static grpc_error *non_polling_poller_kick(
grpc_pollset *pollset, grpc_pollset_worker *specific_worker) {
non_polling_poller *p = (non_polling_poller *)pollset;
if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root;
if (specific_worker != NULL) {
non_polling_worker *w = (non_polling_worker *)specific_worker;
if (!w->kicked) {
w->kicked = true;
gpr_cv_signal(&w->cv);
}
}
return GRPC_ERROR_NONE;
}
static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_closure *closure) {
non_polling_poller *p = (non_polling_poller *)pollset;
GPR_ASSERT(closure != NULL);
p->shutdown = closure;
if (p->root == NULL) {
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
non_polling_worker *w = p->root;
do {
gpr_cv_signal(&w->cv);
w = w->next;
} while (w != p->root);
}
}
static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
/* GRPC_CQ_DEFAULT_POLLING */
{.can_get_pollset = true,
.can_listen = true,
.size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
.work = grpc_pollset_work,
.shutdown = grpc_pollset_shutdown,
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_LISTENING */
{.can_get_pollset = true,
.can_listen = false,
.size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
.work = grpc_pollset_work,
.shutdown = grpc_pollset_shutdown,
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_POLLING */
{.can_get_pollset = false,
.can_listen = false,
.size = non_polling_poller_size,
.init = non_polling_poller_init,
.kick = non_polling_poller_kick,
.work = non_polling_poller_work,
.shutdown = non_polling_poller_shutdown,
.destroy = non_polling_poller_destroy},
};
/* Completion queue structure */
struct grpc_completion_queue {
/** owned by pollset */
gpr_mu *mu;
grpc_cq_completion_type completion_type;
grpc_cq_polling_type polling_type;
const cq_poller_vtable *poller_vtable;
/** completed events */
grpc_cq_completion completed_head;
@ -127,15 +268,18 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
"polling_type=%d)",
2, (completion_type, polling_type));
cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
const cq_poller_vtable *poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu);
#ifndef NDEBUG
cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0;
#endif
cc->completion_type = completion_type;
cc->polling_type = polling_type;
cc->poller_vtable = poller_vtable;
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
@ -164,10 +308,6 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
return cc->completion_type;
}
grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) {
return cc->polling_type;
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
@ -196,7 +336,7 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
#endif
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
grpc_pollset_destroy(exec_ctx, POLLSET_FROM_CQ(cc));
cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc));
#ifndef NDEBUG
gpr_free(cc->outstanding_tags);
#endif
@ -281,7 +421,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
}
}
grpc_error *kick_error =
grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
gpr_mu_unlock(cc->mu);
if (kick_error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(kick_error);
@ -296,8 +436,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
&cc->pollset_shutdown_done);
cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
&cc->pollset_shutdown_done);
gpr_mu_unlock(cc->mu);
}
@ -453,8 +593,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_lock(cc->mu);
continue;
} else {
grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL,
now, iteration_deadline);
grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
NULL, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
gpr_mu_unlock(cc->mu);
const char *msg = grpc_error_string(err);
@ -645,8 +785,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(cc->mu);
} else {
grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc),
&worker, now, iteration_deadline);
grpc_error *err = cc->poller_vtable->work(
&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu);
@ -690,8 +830,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->pending_events)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
&cc->pollset_shutdown_done);
cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
&cc->pollset_shutdown_done);
}
gpr_mu_unlock(cc->mu);
grpc_exec_ctx_finish(&exec_ctx);
@ -709,7 +849,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return POLLSET_FROM_CQ(cc);
return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
}
grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
@ -730,4 +870,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
return cc->is_server_cq;
}
bool grpc_cq_can_listen(grpc_completion_queue *cc) {
return cc->poller_vtable->can_listen;
}

@ -94,13 +94,11 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps);
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc);
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
bool grpc_cq_is_server_cq(grpc_completion_queue *cc);
bool grpc_cq_can_listen(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc);
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);

@ -981,7 +981,7 @@ const grpc_channel_filter grpc_server_top_filter = {
static void register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
bool is_non_listening, void *reserved) {
void *reserved) {
size_t i, n;
GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
@ -990,10 +990,6 @@ static void register_completion_queue(grpc_server *server,
grpc_cq_mark_server_cq(cq);
if (is_non_listening) {
grpc_cq_mark_non_listening_server_cq(cq);
}
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
@ -1016,16 +1012,7 @@ void grpc_server_register_completion_queue(grpc_server *server,
calls grpc_completion_queue_pluck() on server completion queues */
}
register_completion_queue(server, cq, false, reserved);
}
void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *cq, void *reserved) {
GRPC_API_TRACE(
"grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, "
"reserved=%p)",
3, (server, cq, reserved));
register_completion_queue(server, cq, true, reserved);
register_completion_queue(server, cq, reserved);
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
@ -1121,7 +1108,7 @@ void grpc_server_start(grpc_server *server) {
server->requested_calls_per_cq =
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}

@ -54,6 +54,18 @@ struct grpc_byte_buffer;
namespace grpc {
const grpc_completion_queue_factory*
CoreCodegen::grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) {
return ::grpc_completion_queue_factory_lookup(attributes);
}
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attributes, void* reserved) {
return ::grpc_completion_queue_create(factory, attributes, reserved);
}
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next(
void* reserved) {
return ::grpc_completion_queue_create_for_next(reserved);

@ -83,7 +83,8 @@ ServerBuilder::~ServerBuilder() {
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
bool is_frequently_polled) {
ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled);
ServerCompletionQueue* cq = new ServerCompletionQueue(
is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING);
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
@ -242,6 +243,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_cqs(std::make_shared<
std::vector<std::unique_ptr<ServerCompletionQueue>>>());
int num_frequently_polled_cqs = 0;
for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
if ((*it)->IsFrequentlyPolled()) {
num_frequently_polled_cqs++;
}
}
const bool is_hybrid_server =
has_sync_methods && num_frequently_polled_cqs > 0;
if (has_sync_methods) {
// This is a Sync server
gpr_log(GPR_INFO,
@ -251,9 +262,12 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec);
grpc_cq_polling_type polling_type =
is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
// Create completion queues to listen to incoming rpc requests
for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
sync_server_cqs->emplace_back(new ServerCompletionQueue());
sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type));
}
}
@ -269,12 +283,10 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// server
// 2. cqs_: Completion queues added via AddCompletionQueue() call
// All sync cqs (if any) are frequently polled by ThreadManager
int num_frequently_polled_cqs = sync_server_cqs->size();
for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
num_frequently_polled_cqs++;
}
// cqs_ contains the completion queue added by calling the ServerBuilder's
@ -283,14 +295,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// listening to incoming channels. Such completion queues must be registered
// as non-listening queues
for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
if ((*it)->IsFrequentlyPolled()) {
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
(*it)->cq(), nullptr);
}
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
}
if (num_frequently_polled_cqs == 0) {

@ -44,9 +44,11 @@ namespace grpc {
namespace node {
Server::Server(grpc_server *server) : wrapped_server(server) {
shutdown_queue = grpc_completion_queue_create_for_pluck(NULL);
grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
NULL);
grpc_completion_queue_attributes attrs = {
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_NON_LISTENING};
shutdown_queue = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attrs), &attrs, NULL);
grpc_server_register_completion_queue(server, shutdown_queue, NULL);
}
Server::~Server() {

@ -0,0 +1,39 @@
# Copyright 2017, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
licenses(["notice"]) # 3-clause BSD
package(default_visibility = ["//visibility:public"])
load("//bazel:grpc_build_system.bzl", "grpc_proto_library")
grpc_proto_library(
name = "health_proto",
srcs = ["health.proto"],
)

@ -356,8 +356,6 @@ cdef extern from "grpc/grpc.h":
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved) nogil
void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil
int grpc_server_add_insecure_http2_port(
grpc_server *server, const char *addr) nogil
void grpc_server_start(grpc_server *server) nogil
@ -502,4 +500,3 @@ cdef extern from "grpc/compression.h":
int grpc_compression_options_is_algorithm_enabled(
const grpc_compression_options *opts,
grpc_compression_algorithm algorithm) nogil

@ -82,20 +82,11 @@ cdef class Server:
self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue)
def register_non_listening_completion_queue(
self, CompletionQueue queue not None):
if self.is_started:
raise ValueError("cannot register completion queues after start")
with nogil:
grpc_server_register_non_listening_completion_queue(
self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue)
def start(self):
if self.is_started:
raise ValueError("the server has already started")
self.backup_shutdown_queue = CompletionQueue()
self.register_non_listening_completion_queue(self.backup_shutdown_queue)
self.register_completion_queue(self.backup_shutdown_queue)
self.is_started = True
with nogil:
grpc_server_start(self.c_server)

@ -127,7 +127,6 @@ grpc_server_register_method_type grpc_server_register_method_import;
grpc_server_request_registered_call_type grpc_server_request_registered_call_import;
grpc_server_create_type grpc_server_create_import;
grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
grpc_server_start_type grpc_server_start_import;
grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import;
@ -425,7 +424,6 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call");
grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create");
grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue");
grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue");
grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port");
grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start");
grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify");

@ -332,9 +332,6 @@ extern grpc_server_create_type grpc_server_create_import;
typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved);
extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
#define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved);
extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import
typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr);
extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
#define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import

@ -51,6 +51,7 @@ cc_test(
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
"//src/proto/grpc/health/v1:health_proto",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",

@ -38,6 +38,7 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/ext/health_check_service_server_builder_option.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
@ -49,6 +50,7 @@
#include <gtest/gtest.h>
#include "src/core/lib/iomgr/port.h"
#include "src/proto/grpc/health/v1/health.grpc.pb.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@ -224,13 +226,15 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
class TestScenario {
public:
TestScenario(bool non_block, const grpc::string& creds_type,
TestScenario(bool non_block, const grpc::string& creds_type, bool hcs,
const grpc::string& content)
: disable_blocking(non_block),
health_check_service(hcs),
credentials_type(creds_type),
message_content(content) {}
void Log() const;
bool disable_blocking;
bool health_check_service;
// Although the below grpc::string's are logically const, we can't declare
// them const because of a limitation in the way old compilers (e.g., gcc-4.4)
// manage vector insertion using a copy constructor
@ -243,6 +247,8 @@ static std::ostream& operator<<(std::ostream& out,
return out << "TestScenario{disable_blocking="
<< (scenario.disable_blocking ? "true" : "false")
<< ", credentials='" << scenario.credentials_type
<< ", health_check_service="
<< (scenario.health_check_service ? "true" : "false")
<< "', message_size=" << scenario.message_content.size() << "}";
}
@ -252,6 +258,8 @@ void TestScenario::Log() const {
gpr_log(GPR_DEBUG, "%s", out.str().c_str());
}
class HealthCheck : public health::v1::Health::Service {};
class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
AsyncEnd2endTest() { GetParam().Log(); }
@ -268,6 +276,9 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
GetParam().credentials_type);
builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
if (GetParam().health_check_service) {
builder.RegisterService(&health_check_);
}
cq_ = builder.AddCompletionQueue();
// TODO(zyc): make a test option to choose wheather sync plugins should be
@ -340,6 +351,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
grpc::testing::EchoTestService::AsyncService service_;
HealthCheck health_check_;
std::ostringstream server_address_;
int port_;
@ -1754,12 +1766,14 @@ std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
messages.push_back(big_msg);
}
for (auto cred = credentials_types.begin(); cred != credentials_types.end();
++cred) {
for (auto msg = messages.begin(); msg != messages.end(); msg++) {
scenarios.emplace_back(false, *cred, *msg);
if (test_disable_blocking) {
scenarios.emplace_back(true, *cred, *msg);
for (auto health_check_service : {false, true}) {
for (auto cred = credentials_types.begin(); cred != credentials_types.end();
++cred) {
for (auto msg = messages.begin(); msg != messages.end(); msg++) {
scenarios.emplace_back(false, *cred, health_check_service, *msg);
if (test_disable_blocking) {
scenarios.emplace_back(true, *cred, health_check_service, *msg);
}
}
}
}

Loading…
Cancel
Save