Rewrite completion queue internals to use pre-allocation of events

pull/2347/head
Craig Tiller 10 years ago
parent c6964b1d98
commit 97fc6a3f3f
  1. 3
      src/core/iomgr/pollset_multipoller_with_epoll.c
  2. 42
      src/core/surface/call.c
  3. 216
      src/core/surface/completion_queue.c
  4. 22
      src/core/surface/completion_queue.h
  5. 165
      src/core/surface/server.c
  6. 24
      test/core/surface/completion_queue_test.c

@ -105,10 +105,11 @@ static void multipoll_with_epoll_pollset_maybe_work(
* here.
*/
timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
do {
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
if (ep_rv < 0) {

@ -162,6 +162,8 @@ struct grpc_call {
gpr_uint8 error_status_set;
/** should the alarm be cancelled */
gpr_uint8 cancel_alarm;
/** bitmask of allocated completion events in completions */
gpr_uint8 allocated_completions;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
@ -250,6 +252,9 @@ struct grpc_call {
grpc_iomgr_closure on_done_recv;
grpc_iomgr_closure on_done_send;
grpc_iomgr_closure on_done_bind;
/** completion events - for completion queue use */
grpc_cq_completion completions[6];
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@ -349,6 +354,27 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
return call->cq;
}
grpc_cq_completion *allocate_completion(grpc_call *call) {
gpr_uint8 i;
for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) {
if (call->allocated_completions & (1u << i)) {
continue;
}
call->allocated_completions |= 1u << i;
return &call->completions[i];
}
gpr_log(GPR_ERROR, "should never reach here");
abort();
}
void done_completion(void *call, grpc_cq_completion *completion) {
grpc_call *c = call;
gpr_mu_lock(&c->mu);
c->allocated_completions &= ~(1u << (completion - c->completions));
gpr_mu_unlock(&c->mu);
GRPC_CALL_INTERNAL_UNREF(c, "completion", 1);
}
#ifdef GRPC_CALL_REF_COUNT_DEBUG
void grpc_call_internal_ref(grpc_call *c, const char *reason) {
gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
@ -1316,11 +1342,15 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, success);
grpc_cq_end_op(call->cq,
tag, success, done_completion, call,
allocate_completion(call));
}
static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
grpc_cq_end_op(call->cq,
tag, 1, done_completion, call,
allocate_completion(call));
}
static int are_write_flags_valid(gpr_uint32 flags) {
@ -1343,8 +1373,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
if (nops == 0) {
grpc_cq_begin_op(call->cq, call);
grpc_cq_end_op(call->cq, tag, call, 1);
grpc_cq_begin_op(call->cq);
GRPC_CALL_INTERNAL_REF(call, "completion");
grpc_cq_end_op(call->cq, tag, 1, done_completion, call, allocate_completion(call));
return GRPC_CALL_OK;
}
@ -1466,7 +1497,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
}
}
grpc_cq_begin_op(call->cq, call);
GRPC_CALL_INTERNAL_REF(call, "completion");
grpc_cq_begin_op(call->cq);
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}

@ -45,34 +45,20 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#define NUM_TAG_BUCKETS 31
/* A single event: extends grpc_event to form a linked list with a destruction
function (on_finish) that is hidden from outside this module */
typedef struct event {
grpc_event base;
struct event *queue_next;
struct event *queue_prev;
struct event *bucket_next;
struct event *bucket_prev;
} event;
/* Completion queue structure */
struct grpc_completion_queue {
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
once all queued events are drained */
gpr_refcount refs;
/* Once owning_refs drops to zero, we will destroy the cq */
/** completed events */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
/** Number of pending events (+1 if we're not shutdown) */
gpr_refcount pending_events;
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
/* the set of low level i/o things that concern this cq */
/** the set of low level i/o things that concern this cq */
grpc_pollset pollset;
/* 0 initially, 1 once we've begun shutting down */
/** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
/* Head of a linked list of queued events (prev points to the last element) */
event *queue;
/* Fixed size chained hash table of events for pluck() */
event *buckets[NUM_TAG_BUCKETS];
int is_server_cq;
};
@ -80,10 +66,12 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1);
gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
gpr_ref_init(&cc->owning_refs, 2);
grpc_pollset_init(&cc->pollset);
cc->completed_tail = &cc->completed_head;
cc->completed_head.next = (gpr_uintptr) cc->completed_tail;
return cc;
}
@ -114,179 +102,127 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->queue == NULL);
GPR_ASSERT(cc->completed_head.next == (gpr_uintptr) &cc->completed_head);
grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
}
}
/* Create and append an event to the queue. Returns the event so that its data
members can be filled in.
Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void *tag, grpc_call *call) {
event *ev = gpr_malloc(sizeof(event));
gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
ev->base.type = type;
ev->base.tag = tag;
if (cc->queue == NULL) {
cc->queue = ev->queue_next = ev->queue_prev = ev;
} else {
ev->queue_next = cc->queue;
ev->queue_prev = cc->queue->queue_prev;
ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
}
if (cc->buckets[bucket] == NULL) {
cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
} else {
ev->bucket_next = cc->buckets[bucket];
ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
}
grpc_pollset_kick(&cc->pollset);
return ev;
}
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
gpr_ref(&cc->refs);
if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
void grpc_cq_begin_op(grpc_completion_queue *cc) {
gpr_ref(&cc->pending_events);
}
/* Signal the end of an operation - if this is the last waiting-to-be-queued
event, then enter shutdown mode */
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success) {
event *ev;
int shutdown = 0;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
ev->base.success = success;
if (gpr_unref(&cc->refs)) {
/* Queue a GRPC_OP_COMPLETED operation */
void grpc_cq_end_op(
grpc_completion_queue *cc,
void *tag,
int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg,
grpc_cq_completion *storage) {
int shutdown = gpr_unref(&cc->pending_events);
storage->tag = tag;
storage->done = done;
storage->done_arg = done_arg;
storage->next = ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
if (!shutdown) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
shutdown = 1;
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0);
if (shutdown) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
}
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
static event *create_shutdown_event(void) {
event *ev = gpr_malloc(sizeof(event));
ev->base.type = GRPC_QUEUE_SHUTDOWN;
ev->base.tag = NULL;
return ev;
}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
event *ev = NULL;
grpc_event ret;
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if (cc->queue != NULL) {
gpr_uintptr bucket;
ev = cc->queue;
bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS;
cc->queue = ev->queue_next;
ev->queue_next->queue_prev = ev->queue_prev;
ev->queue_prev->queue_next = ev->queue_next;
ev->bucket_next->bucket_prev = ev->bucket_prev;
ev->bucket_prev->bucket_next = ev->bucket_next;
if (ev == cc->buckets[bucket]) {
cc->buckets[bucket] = ev->bucket_next;
if (ev == cc->buckets[bucket]) {
cc->buckets[bucket] = NULL;
}
}
if (cc->queue == ev) {
cc->queue = NULL;
if (cc->completed_tail != &cc->completed_head) {
grpc_cq_completion *c = (grpc_cq_completion *) cc->completed_head.next;
cc->completed_head.next = c->next & ~(gpr_uintptr)1;
if (c == cc->completed_tail) {
cc->completed_tail = &cc->completed_head;
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
c->done(c->done_arg, c);
break;
}
if (cc->shutdown) {
ev = create_shutdown_event();
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
return ret;
break;
}
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret = ev->base;
gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
return ret;
}
static event *pluck_event(grpc_completion_queue *cc, void *tag) {
gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
event *ev = cc->buckets[bucket];
if (ev == NULL) return NULL;
do {
if (ev->base.tag == tag) {
ev->queue_next->queue_prev = ev->queue_prev;
ev->queue_prev->queue_next = ev->queue_next;
ev->bucket_next->bucket_prev = ev->bucket_prev;
ev->bucket_prev->bucket_next = ev->bucket_next;
if (ev == cc->buckets[bucket]) {
cc->buckets[bucket] = ev->bucket_next;
if (ev == cc->buckets[bucket]) {
cc->buckets[bucket] = NULL;
}
}
if (cc->queue == ev) {
cc->queue = ev->queue_next;
if (cc->queue == ev) {
cc->queue = NULL;
}
}
return ev;
}
ev = ev->bucket_next;
} while (ev != cc->buckets[bucket]);
return NULL;
}
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
event *ev = NULL;
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if ((ev = pluck_event(cc, tag))) {
break;
prev = &cc->completed_head;
while ((c = (grpc_cq_completion*)(prev->next & ~(gpr_uintptr)1)) != &cc->completed_head) {
if (c->tag == tag) {
prev->next = (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1);
if (c == cc->completed_tail) {
cc->completed_tail = prev;
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
c->done(c->done_arg, c);
goto done;
}
prev = c;
}
if (cc->shutdown) {
ev = create_shutdown_event();
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
return ret;
break;
}
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret = ev->base;
gpr_free(ev);
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
return ret;
@ -303,7 +239,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
cc->shutdown_called = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
if (gpr_unref(&cc->refs)) {
if (gpr_unref(&cc->pending_events)) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;

@ -39,6 +39,17 @@
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
typedef struct grpc_cq_completion {
/** user supplied tag */
void *tag;
/** done callback - called when this queue element is no longer
needed by the completion queue */
void (*done)(void *done_arg, struct grpc_cq_completion *c);
void *done_arg;
/** next pointer; low bit is used to indicate success or not */
gpr_uintptr next;
} grpc_cq_completion;
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);
@ -57,11 +68,16 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
void grpc_cq_begin_op(grpc_completion_queue *cc);
/* Queue a GRPC_OP_COMPLETED operation */
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success);
void grpc_cq_end_op(
grpc_completion_queue *cc,
void *tag,
int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg,
grpc_cq_completion *storage);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);

@ -72,12 +72,14 @@ typedef struct {
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct {
typedef struct requested_call {
requested_call_type type;
struct requested_call *next;
void *tag;
grpc_completion_queue *cq_bound_to_call;
grpc_completion_queue *cq_for_notification;
grpc_call **call;
grpc_cq_completion completion;
union {
struct {
grpc_call_details *details;
@ -92,17 +94,11 @@ typedef struct {
} data;
} requested_call;
typedef struct {
requested_call *calls;
size_t count;
size_t capacity;
} requested_call_array;
struct registered_method {
char *method;
char *host;
call_data *pending;
requested_call_array requested;
requested_call *requests;
registered_method *next;
};
@ -131,6 +127,7 @@ struct channel_data {
typedef struct shutdown_tag {
void *tag;
grpc_completion_queue *cq;
grpc_cq_completion completion;
} shutdown_tag;
struct grpc_server {
@ -153,7 +150,7 @@ struct grpc_server {
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
requested_call_array requested_calls;
requested_call *requests;
gpr_uint8 shutdown;
gpr_uint8 shutdown_published;
@ -325,22 +322,6 @@ static int call_list_remove(call_data *call, call_list list) {
return 1;
}
static void requested_call_array_destroy(requested_call_array *array) {
gpr_free(array->calls);
}
static requested_call *requested_call_array_add(requested_call_array *array) {
requested_call *rc;
if (array->count == array->capacity) {
array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
array->calls =
gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
}
rc = &array->calls[array->count++];
memset(rc, 0, sizeof(*rc));
return rc;
}
static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
@ -352,12 +333,10 @@ static void server_delete(grpc_server *server) {
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
gpr_free(server->channel_filters);
requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
gpr_free(rm->method);
gpr_free(rm->host);
requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
@ -406,18 +385,18 @@ static void destroy_channel(channel_data *chand) {
static void finish_start_new_rpc_and_unlock(grpc_server *server,
grpc_call_element *elem,
call_data **pending_root,
requested_call_array *array) {
requested_call rc;
requested_call **requests) {
requested_call *rc = *requests;
call_data *calld = elem->call_data;
if (array->count == 0) {
if (rc == NULL) {
calld->state = PENDING;
call_list_join(pending_root, calld, PENDING_START);
gpr_mu_unlock(&server->mu_call);
} else {
rc = array->calls[--array->count];
*requests = rc->next;
calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, &rc);
begin_call(server, calld, rc);
}
}
@ -442,7 +421,7 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->method != calld->path) continue;
finish_start_new_rpc_and_unlock(server, elem,
&rm->server_registered_method->pending,
&rm->server_registered_method->requested);
&rm->server_registered_method->requests);
return;
}
/* check for a wildcard method definition (no host set) */
@ -455,12 +434,12 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->method != calld->path) continue;
finish_start_new_rpc_and_unlock(server, elem,
&rm->server_registered_method->pending,
&rm->server_registered_method->requested);
&rm->server_registered_method->requests);
return;
}
}
finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
&server->requested_calls);
&server->requests);
}
static void kill_zombie(void *elem, int success) {
@ -476,6 +455,10 @@ static int num_listeners(grpc_server *server) {
return n;
}
static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
server_unref(server);
}
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!server->shutdown || server->shutdown_published) {
@ -494,8 +477,14 @@ static void maybe_finish_shutdown(grpc_server *server) {
}
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) {
grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
NULL, 1);
server_ref(server);
grpc_cq_end_op(server->shutdown_tags[i].cq,
server->shutdown_tags[i].tag,
1,
done_shutdown_event,
server,
&server->shutdown_tags[i].completion
);
}
}
@ -910,15 +899,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call_array requested_calls;
size_t i;
requested_call *requests = NULL;
registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
grpc_cq_begin_op(cq, NULL);
grpc_cq_begin_op(cq);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
@ -934,23 +922,15 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
requested_calls = server->requested_calls;
memset(&server->requested_calls, 0, sizeof(server->requested_calls));
requests = server->requests;
server->requests = NULL;
for (rm = server->registered_methods; rm; rm = rm->next) {
if (requested_calls.count + rm->requested.count >
requested_calls.capacity) {
requested_calls.capacity =
GPR_MAX(requested_calls.count + rm->requested.count,
2 * requested_calls.capacity);
requested_calls.calls =
gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
requested_calls.capacity);
while (rm->requests != NULL) {
requested_call *c = rm->requests;
rm->requests = c->next;
c->next = requests;
requests = c;
}
memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
sizeof(*requested_calls.calls) * rm->requested.count);
requested_calls.count += rm->requested.count;
gpr_free(rm->requested.calls);
memset(&rm->requested, 0, sizeof(rm->requested));
}
gpr_mu_unlock(&server->mu_call);
@ -959,10 +939,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
for (i = 0; i < requested_calls.count; i++) {
fail_call(server, &requested_calls.calls[i]);
while (requests != NULL) {
requested_call *next = requests->next;
fail_call(server, requests);
requests = next;
}
gpr_free(requested_calls.calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@ -1024,7 +1005,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
requested_call_array *requested_calls = NULL;
requested_call **requests;
gpr_mu_lock(&server->mu_call);
if (server->shutdown) {
gpr_mu_unlock(&server->mu_call);
@ -1035,12 +1016,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
case BATCH_CALL:
calld =
call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
requested_calls = &server->requested_calls;
requests = &server->requests;
break;
case REGISTERED_CALL:
calld = call_list_remove_head(
&rc->data.registered.registered_method->pending, PENDING_START);
requested_calls = &rc->data.registered.registered_method->requested;
requests = &rc->data.registered.registered_method->requests;
break;
}
if (calld) {
@ -1050,7 +1031,8 @@ static grpc_call_error queue_call_request(grpc_server *server,
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
*requested_call_array_add(requested_calls) = *rc;
rc->next = *requests;
*requests = rc;
gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
@ -1061,22 +1043,23 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
requested_call *rc = gpr_malloc(sizeof(*rc));
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
if (!grpc_cq_is_server_cq(cq_for_notification)) {
gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = BATCH_CALL;
rc.tag = tag;
rc.cq_bound_to_call = cq_bound_to_call;
rc.cq_for_notification = cq_for_notification;
rc.call = call;
rc.data.batch.details = details;
rc.data.batch.initial_metadata = initial_metadata;
return queue_call_request(server, &rc);
grpc_cq_begin_op(cq_for_notification);
rc->type = BATCH_CALL;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->cq_for_notification = cq_for_notification;
rc->call = call;
rc->data.batch.details = details;
rc->data.batch.initial_metadata = initial_metadata;
return queue_call_request(server, rc);
}
grpc_call_error grpc_server_request_registered_call(
@ -1084,22 +1067,23 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *registered_method = rm;
if (!grpc_cq_is_server_cq(cq_for_notification)) {
gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = REGISTERED_CALL;
rc.tag = tag;
rc.cq_bound_to_call = cq_bound_to_call;
rc.cq_for_notification = cq_for_notification;
rc.call = call;
rc.data.registered.registered_method = registered_method;
rc.data.registered.deadline = deadline;
rc.data.registered.initial_metadata = initial_metadata;
rc.data.registered.optional_payload = optional_payload;
return queue_call_request(server, &rc);
grpc_cq_begin_op(cq_for_notification);
rc->type = REGISTERED_CALL;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->cq_for_notification = cq_for_notification;
rc->call = call;
rc->data.registered.registered_method = registered_method;
rc->data.registered.deadline = deadline;
rc->data.registered.initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
return queue_call_request(server, rc);
}
static void publish_registered_or_batch(grpc_call *call, int success,
@ -1167,7 +1151,11 @@ static void begin_call(grpc_server *server, call_data *calld,
GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
rc->tag);
rc);
}
static void done_request_event(void *req, grpc_cq_completion *c) {
gpr_free(req);
}
static void fail_call(grpc_server *server, requested_call *rc) {
@ -1180,15 +1168,16 @@ static void fail_call(grpc_server *server, requested_call *rc) {
rc->data.registered.initial_metadata->count = 0;
break;
}
grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion);
}
static void publish_registered_or_batch(grpc_call *call, int success,
void *tag) {
void *prc) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
requested_call *rc = prc;
call_data *calld = elem->call_data;
grpc_cq_end_op(calld->cq_new, tag, call, success);
grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {

@ -74,17 +74,20 @@ static void test_wait_empty(void) {
shutdown_and_destroy(cc);
}
static void do_nothing_end_completion(void *arg, grpc_cq_completion *c) {}
static void test_cq_end_op(void) {
grpc_event ev;
grpc_completion_queue *cc;
grpc_cq_completion completion;
void *tag = create_test_tag();
LOG_TEST("test_cq_end_op");
cc = grpc_completion_queue_create();
grpc_cq_begin_op(cc, NULL);
grpc_cq_end_op(cc, tag, NULL, 1);
grpc_cq_begin_op(cc);
grpc_cq_end_op(cc, tag, 1, do_nothing_end_completion, NULL, &completion);
ev = grpc_completion_queue_next(cc, gpr_inf_past);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
@ -120,6 +123,7 @@ static void test_pluck(void) {
grpc_event ev;
grpc_completion_queue *cc;
void *tags[128];
grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
unsigned i, j;
LOG_TEST("test_pluck");
@ -134,8 +138,8 @@ static void test_pluck(void) {
cc = grpc_completion_queue_create();
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc, NULL);
grpc_cq_end_op(cc, tags[i], NULL, 1);
grpc_cq_begin_op(cc);
grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
@ -144,8 +148,8 @@ static void test_pluck(void) {
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc, NULL);
grpc_cq_end_op(cc, tags[i], NULL, 1);
grpc_cq_begin_op(cc);
grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
@ -174,6 +178,10 @@ gpr_timespec ten_seconds_time(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1);
}
static void free_completion(void *arg, grpc_cq_completion *completion) {
gpr_free(completion);
}
static void producer_thread(void *arg) {
test_thread_options *opt = arg;
int i;
@ -184,7 +192,7 @@ static void producer_thread(void *arg) {
gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
for (i = 0; i < TEST_THREAD_EVENTS; i++) {
grpc_cq_begin_op(opt->cc, NULL);
grpc_cq_begin_op(opt->cc);
}
gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
@ -193,7 +201,7 @@ static void producer_thread(void *arg) {
gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
for (i = 0; i < TEST_THREAD_EVENTS; i++) {
grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, NULL, 1);
grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion)));
opt->events_triggered++;
}

Loading…
Cancel
Save