Changes to use the new API

reviewable/pr9972/r1
Sree Kuchibhotla 8 years ago
parent 5ea3bbd418
commit 2abbf8a319
  1. 5
      include/grpc/grpc.h
  2. 26
      src/core/lib/surface/completion_queue.c
  3. 3
      src/core/lib/surface/completion_queue.h
  4. 33
      src/core/lib/surface/completion_queue_factory.c

@ -128,7 +128,6 @@ typedef enum {
GRPC_CQ_NON_POLLING
} grpc_cq_polling_type;
#define GRPC_CQ_CURRENT_VERSION 1
typedef struct grpc_completion_queue_attributes {
/* The version number of this structure. More fields might be added to this
@ -161,8 +160,8 @@ GRPCAPI grpc_completion_queue *grpc_completion_queue_create_for_pluck(
/** Create a completion queue */
GRPCAPI grpc_completion_queue *grpc_completion_queue_create(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
void *reserved);
const grpc_completion_queue_factory *factory,
const grpc_completion_queue_attributes *attributes, void *reserved);
/** Blocks until an event is available, the completion queue is being shut down,
or deadline is reached.

@ -115,15 +115,17 @@ int grpc_cq_event_timeout_trace;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
grpc_error *error);
grpc_completion_queue *grpc_completion_queue_create(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
void *reserved) {
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type) {
grpc_completion_queue *cc;
GPR_ASSERT(!reserved);
GPR_TIMER_BEGIN("grpc_completion_queue_create", 0);
GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
GRPC_API_TRACE(
"grpc_completion_queue_create_internal(completion_type=%d, "
"polling_type=%d)",
2, (completion_type, polling_type));
cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
@ -153,7 +155,7 @@ grpc_completion_queue *grpc_completion_queue_create(
grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc,
grpc_schedule_on_exec_ctx);
GPR_TIMER_END("grpc_completion_queue_create", 0);
GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
return cc;
}
@ -381,8 +383,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
reserved));
5,
(cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
reserved));
GPR_ASSERT(!reserved);
dump_pending_tags(cc);
@ -556,8 +559,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
6, (cc, tag, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, reserved));
6,
(cc, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
reserved));
}
GPR_ASSERT(!reserved);

@ -102,4 +102,7 @@ int grpc_cq_is_server_cq(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc);
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */

@ -36,12 +36,15 @@
#include <grpc/support/log.h>
/* TODO (sreek) - Currently this does not use the attributes arg. This will be
added in a future PR */
/*
* == Default completion queue factory implementation ==
*/
static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attributes) {
return grpc_completion_queue_create(NULL);
const grpc_completion_queue_attributes* attr) {
return grpc_completion_queue_create_internal(attr->cq_type,
attr->cq_polling_type);
}
static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@ -49,19 +52,24 @@ static grpc_completion_queue_factory_vtable default_vtable = {default_create};
static const grpc_completion_queue_factory g_default_cq_factory = {
"Default Factory", NULL, &default_vtable};
/*
* == Completion queue factory APIs
*/
const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) {
/* As we add more fields to grpc_completion_queue_attributes, we may have to
change this assert to:
GPR_ASSERT (attributes->version >= 1 &&
attributes->version <= GRPC_CQ_CURRENT_VERSION) */
GPR_ASSERT(attributes->version == 1);
GPR_ASSERT(attributes->version >= 1 &&
attributes->version <= GRPC_CQ_CURRENT_VERSION);
/* The default factory can handle version 1 of the attributes structure. We
may have to change this as more fields are added to the structure */
return &g_default_cq_factory;
}
/*
* == Completion queue creation APIs ==
*/
grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT,
@ -75,3 +83,10 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
GRPC_CQ_DEFAULT_POLLING};
return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
}
grpc_completion_queue* grpc_completion_queue_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attr, void* reserved) {
GPR_ASSERT(!reserved);
return factory->vtable->create(factory, attr);
}

Loading…
Cancel
Save