C Core API cleanup.

Simplify grpc_event into something that can be non-heap allocated.
Deprecate grpc_event_finish.
Remove grpc_op_error - use an int as this is more idiomatic C style.
pull/1472/head
Craig Tiller 10 years ago
parent c112d146a2
commit 64be9f7a30
  1. 28
      include/grpc/grpc.h
  2. 81
      src/core/surface/call.c
  3. 3
      src/core/surface/call.h
  4. 89
      src/core/surface/completion_queue.c
  5. 14
      src/core/surface/completion_queue.h
  6. 26
      src/core/surface/event_string.c
  7. 28
      src/core/surface/server.c
  8. 12
      src/cpp/client/client_context.cc
  9. 42
      src/cpp/common/completion_queue.cc
  10. 22
      src/csharp/ext/grpc_csharp_ext.c
  11. 16
      src/node/ext/completion_queue_async_worker.cc
  12. 2
      src/node/ext/completion_queue_async_worker.h
  13. 15
      src/php/ext/grpc/call.c
  14. 15
      src/php/ext/grpc/server.c
  15. 30
      src/python/src/grpc/_adapter/_completion_queue.c
  16. 48
      test/core/end2end/cq_verifier.c
  17. 3
      test/core/end2end/cq_verifier.h
  18. 17
      test/core/end2end/dualstack_socket_test.c
  19. 12
      test/core/end2end/no_server_test.c
  20. 10
      test/core/end2end/tests/bad_hostname.c
  21. 14
      test/core/end2end/tests/cancel_after_accept.c
  22. 14
      test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
  23. 10
      test/core/end2end/tests/cancel_after_invoke.c
  24. 10
      test/core/end2end/tests/cancel_before_invoke.c
  25. 8
      test/core/end2end/tests/cancel_in_a_vacuum.c
  26. 14
      test/core/end2end/tests/census_simple_request.c
  27. 14
      test/core/end2end/tests/disappearing_server.c
  28. 14
      test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
  29. 10
      test/core/end2end/tests/early_server_shutdown_finishes_tags.c
  30. 10
      test/core/end2end/tests/empty_batch.c
  31. 16
      test/core/end2end/tests/graceful_server_shutdown.c
  32. 14
      test/core/end2end/tests/invoke_large_request.c
  33. 40
      test/core/end2end/tests/max_concurrent_streams.c
  34. 8
      test/core/end2end/tests/no_op.c
  35. 24
      test/core/end2end/tests/ping_pong_streaming.c
  36. 14
      test/core/end2end/tests/registered_call.c
  37. 14
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
  38. 14
      test/core/end2end/tests/request_response_with_metadata_and_payload.c
  39. 14
      test/core/end2end/tests/request_response_with_payload.c
  40. 6
      test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
  41. 14
      test/core/end2end/tests/request_with_large_metadata.c
  42. 14
      test/core/end2end/tests/request_with_payload.c
  43. 14
      test/core/end2end/tests/simple_delayed_request.c
  44. 14
      test/core/end2end/tests/simple_request.c
  45. 16
      test/core/fling/client.c
  46. 12
      test/core/fling/server.c
  47. 72
      test/core/surface/completion_queue_test.c
  48. 2
      test/core/surface/lame_client_test.c

@ -140,14 +140,6 @@ typedef enum grpc_call_error {
GRPC_CALL_ERROR_INVALID_FLAGS
} grpc_call_error;
/* Result of a grpc operation */
typedef enum grpc_op_error {
/* everything went ok */
GRPC_OP_OK = 0,
/* something failed, we don't know what */
GRPC_OP_ERROR
} grpc_op_error;
/* Write Flags: */
/* Hint that the write may be buffered and need not go out on the wire
immediately. GRPC is free to buffer the message until the next non-buffered
@ -197,21 +189,14 @@ typedef struct grpc_metadata {
typedef enum grpc_completion_type {
GRPC_QUEUE_SHUTDOWN, /* Shutting down */
GRPC_OP_COMPLETE, /* operation completion */
GRPC_SERVER_SHUTDOWN, /* The server has finished shutting down */
GRPC_COMPLETION_DO_NOT_USE /* must be last, forces users to include
a default: case */
GRPC_QUEUE_TIMEOUT, /* No event before timeout */
GRPC_OP_COMPLETE /* operation completion */
} grpc_completion_type;
typedef struct grpc_event {
grpc_completion_type type;
int success;
void *tag;
grpc_call *call;
/* Data associated with the completion type. Field names match the type of
completion as listed in grpc_completion_type. */
union {
grpc_op_error op_complete;
} data;
} grpc_event;
typedef struct {
@ -352,7 +337,7 @@ grpc_completion_queue *grpc_completion_queue_create(void);
Callers must not call grpc_completion_queue_next and
grpc_completion_queue_pluck simultaneously on the same completion queue. */
grpc_event *grpc_completion_queue_next(grpc_completion_queue *cq,
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline);
/* Blocks until an event with tag 'tag' is available, the completion queue is
@ -362,12 +347,9 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cq,
Callers must not call grpc_completion_queue_next and
grpc_completion_queue_pluck simultaneously on the same completion queue. */
grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline);
/* Clean up any data owned by the event */
void grpc_event_finish(grpc_event *event);
/* Begin destruction of a completion queue. Once all possible events are
drained then grpc_completion_queue_next will start to produce
GRPC_QUEUE_SHUTDOWN events only. At that point it's safe to call

@ -62,7 +62,7 @@ typedef enum {
typedef struct {
grpc_ioreq_completion_func on_complete;
void *user_data;
grpc_op_error status;
int success;
} completed_request;
/* See request_set in grpc_call below for a description */
@ -74,7 +74,7 @@ typedef struct {
typedef struct {
/* Overall status of the operation: starts OK, may degrade to
non-OK */
grpc_op_error status;
int success;
/* Completion function to call at the end of the operation */
grpc_ioreq_completion_func on_complete;
void *user_data;
@ -239,7 +239,6 @@ struct grpc_call {
y = temp; \
} while (0)
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
static void call_on_done_recv(void *call, int success);
static void call_on_done_send(void *call, int success);
@ -460,7 +459,7 @@ static void unlock(grpc_call *call) {
if (completing_requests > 0) {
for (i = 0; i < completing_requests; i++) {
completed_requests[i].on_complete(call, completed_requests[i].status,
completed_requests[i].on_complete(call, completed_requests[i].success,
completed_requests[i].user_data);
}
lock(call);
@ -520,7 +519,7 @@ no_details:
}
static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
grpc_op_error status) {
int success) {
completed_request *cr;
gpr_uint8 master_set = call->request_set[op];
reqinfo_master *master;
@ -528,8 +527,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
/* ioreq is live: we need to do something */
master = &call->masters[master_set];
master->complete_mask |= 1u << op;
if (status != GRPC_OP_OK) {
master->status = status;
if (!success) {
master->success = 0;
}
if (master->complete_mask == master->need_mask) {
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
@ -540,7 +539,7 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
switch ((grpc_ioreq_op)i) {
case GRPC_IOREQ_RECV_MESSAGE:
case GRPC_IOREQ_SEND_MESSAGE:
if (master->status == GRPC_OP_OK) {
if (master->success) {
call->request_set[i] = REQSET_EMPTY;
} else {
call->write_state = WRITE_STATE_WRITE_CLOSED;
@ -575,33 +574,31 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
}
}
cr = &call->completed_requests[call->num_completed_requests++];
cr->status = master->status;
cr->success = master->success;
cr->on_complete = master->on_complete;
cr->user_data = master->user_data;
}
}
static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
grpc_op_error status) {
static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
if (is_op_live(call, op)) {
finish_live_ioreq_op(call, op, status);
finish_live_ioreq_op(call, op, success);
}
}
static void call_on_done_send(void *pc, int success) {
grpc_call *call = pc;
grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR;
lock(call);
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
}
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error);
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
}
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
}
call->last_send_contains = 0;
call->sending = 0;
@ -720,12 +717,12 @@ static void call_on_done_recv(void *pc, int success) {
}
finish_read_ops(call);
} else {
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 0);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 0);
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 0);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 0);
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 0);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0);
}
call->recv_ops.nops = 0;
unlock(call);
@ -877,7 +874,7 @@ static void finish_read_ops(grpc_call *call) {
(NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message =
grpc_bbq_pop(&call->incoming_queue)));
if (!empty) {
finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
empty = grpc_bbq_empty(&call->incoming_queue);
}
} else {
@ -887,19 +884,19 @@ static void finish_read_ops(grpc_call *call) {
switch (call->read_state) {
case READ_STATE_STREAM_CLOSED:
if (empty) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1);
}
/* fallthrough */
case READ_STATE_READ_CLOSED:
if (empty) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
}
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 1);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 1);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 1);
/* fallthrough */
case READ_STATE_GOT_INITIAL_METADATA:
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 1);
/* fallthrough */
case READ_STATE_INITIAL:
/* do nothing */
@ -910,13 +907,13 @@ static void finish_read_ops(grpc_call *call) {
static void early_out_write_ops(grpc_call *call) {
switch (call->write_state) {
case WRITE_STATE_WRITE_CLOSED:
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
/* fallthrough */
case WRITE_STATE_STARTED:
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
/* fallthrough */
case WRITE_STATE_INITIAL:
/* do nothing */
@ -957,7 +954,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
}
master = &call->masters[set];
master->status = GRPC_OP_OK;
master->success = 1;
master->need_mask = have_ops;
master->complete_mask = 0;
master->on_complete = completion;
@ -1144,8 +1141,8 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
*(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
}
static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {
grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
static void finish_batch(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
@ -1159,8 +1156,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
if (nops == 0) {
grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
grpc_cq_begin_op(call->cq, call);
grpc_cq_end_op(call->cq, tag, call, 1);
return GRPC_CALL_OK;
}
@ -1251,7 +1248,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
}
}
grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
grpc_cq_begin_op(call->cq, call);
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
tag);

@ -80,8 +80,7 @@ typedef struct {
grpc_ioreq_data data;
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
grpc_op_error status,
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,

@ -51,8 +51,6 @@
function (on_finish) that is hidden from outside this module */
typedef struct event {
grpc_event base;
grpc_event_finish_func on_finish;
void *on_finish_user_data;
struct event *queue_next;
struct event *queue_prev;
struct event *bucket_next;
@ -78,16 +76,8 @@ struct grpc_completion_queue {
event *queue;
/* Fixed size chained hash table of events for pluck() */
event *buckets[NUM_TAG_BUCKETS];
#ifndef NDEBUG
/* Debug support: track which operations are in flight at any given time */
gpr_atm pending_op_count[GRPC_COMPLETION_DO_NOT_USE];
#endif
};
/* Default do-nothing on_finish function */
static void null_on_finish(void *user_data, grpc_op_error error) {}
grpc_completion_queue *grpc_completion_queue_create(void) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
memset(cc, 0, sizeof(*cc));
@ -124,15 +114,11 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
members can be filled in.
Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data) {
void *tag, grpc_call *call) {
event *ev = gpr_malloc(sizeof(event));
gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
ev->base.type = type;
ev->base.tag = tag;
ev->base.call = call;
ev->on_finish = on_finish ? on_finish : null_on_finish;
ev->on_finish_user_data = user_data;
if (cc->queue == NULL) {
cc->queue = ev->queue_next = ev->queue_prev = ev;
} else {
@ -152,22 +138,15 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
return ev;
}
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
grpc_completion_type type) {
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
gpr_ref(&cc->refs);
if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
#ifndef NDEBUG
gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
#endif
}
/* Signal the end of an operation - if this is the last waiting-to-be-queued
event, then enter shutdown mode */
static void end_op_locked(grpc_completion_queue *cc,
grpc_completion_type type) {
#ifndef NDEBUG
GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0);
#endif
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
@ -176,20 +155,12 @@ static void end_op_locked(grpc_completion_queue *cc,
}
}
void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
add_locked(cc, GRPC_SERVER_SHUTDOWN, tag, NULL, NULL, NULL);
end_op_locked(cc, GRPC_SERVER_SHUTDOWN);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_op_error error) {
int success) {
event *ev;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
ev->base.data.op_complete = error;
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
ev->base.success = success;
end_op_locked(cc, GRPC_OP_COMPLETE);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
@ -198,15 +169,14 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
static event *create_shutdown_event(void) {
event *ev = gpr_malloc(sizeof(event));
ev->base.type = GRPC_QUEUE_SHUTDOWN;
ev->base.call = NULL;
ev->base.tag = NULL;
ev->on_finish = null_on_finish;
return ev;
}
grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
event *ev = NULL;
grpc_event ret;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
@ -240,12 +210,16 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
GRPC_POLLSET_MU(&cc->pollset), deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
return NULL;
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
return ret;
}
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
ret = ev->base;
gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
return ret;
}
static event *pluck_event(grpc_completion_queue *cc, void *tag) {
@ -277,9 +251,10 @@ static event *pluck_event(grpc_completion_queue *cc, void *tag) {
return NULL;
}
grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
event *ev = NULL;
grpc_event ret;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
@ -296,12 +271,16 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
GRPC_POLLSET_MU(&cc->pollset), deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
return NULL;
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
return ret;
}
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret = ev->base;
gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
return ret;
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
@ -324,30 +303,6 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
grpc_cq_internal_unref(cc);
}
void grpc_event_finish(grpc_event *base) {
event *ev = (event *)base;
ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
if (ev->base.call) {
GRPC_CALL_INTERNAL_UNREF(ev->base.call, "cq", 1);
}
gpr_free(ev);
}
void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
#ifndef NDEBUG
char tmp[GRPC_COMPLETION_DO_NOT_USE * (1 + GPR_LTOA_MIN_BUFSIZE)];
char *p = tmp;
int i;
for (i = 0; i < GRPC_COMPLETION_DO_NOT_USE; i++) {
*p++ = ' ';
p += gpr_ltoa(cc->pending_op_count[i], p);
}
gpr_log(GPR_INFO, "pending ops:%s", tmp);
#endif
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return &cc->pollset;
}

@ -39,17 +39,12 @@
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
/* A finish func is executed whenever the event consumer calls
grpc_event_finish */
typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error);
void grpc_cq_internal_ref(grpc_completion_queue *cc);
void grpc_cq_internal_unref(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
grpc_completion_type type);
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
/* grpc_cq_end_* functions pair with a grpc_cq_begin_op
@ -64,16 +59,11 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
/* Queue a GRPC_OP_COMPLETED operation */
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_op_error error);
void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag);
int success);
/* disable polling for some tests */
void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc);
void grpc_cq_dump_pending_ops(grpc_completion_queue *cc);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);

@ -40,23 +40,15 @@
static void addhdr(gpr_strvec *buf, grpc_event *ev) {
char *tmp;
gpr_asprintf(&tmp, "tag:%p call:%p", ev->tag, (void *)ev->call);
gpr_asprintf(&tmp, "tag:%p", ev->tag);
gpr_strvec_add(buf, tmp);
}
static const char *errstr(grpc_op_error err) {
switch (err) {
case GRPC_OP_OK:
return "OK";
case GRPC_OP_ERROR:
return "ERROR";
}
return "UNKNOWN_UNKNOWN";
}
static const char *errstr(int success) { return success ? "OK" : "ERROR"; }
static void adderr(gpr_strvec *buf, grpc_op_error err) {
static void adderr(gpr_strvec *buf, int success) {
char *tmp;
gpr_asprintf(&tmp, " err=%s", errstr(err));
gpr_asprintf(&tmp, " %s", errstr(success));
gpr_strvec_add(buf, tmp);
}
@ -69,8 +61,8 @@ char *grpc_event_string(grpc_event *ev) {
gpr_strvec_init(&buf);
switch (ev->type) {
case GRPC_SERVER_SHUTDOWN:
gpr_strvec_add(&buf, gpr_strdup("SERVER_SHUTDOWN"));
case GRPC_QUEUE_TIMEOUT:
gpr_strvec_add(&buf, gpr_strdup("QUEUE_TIMEOUT"));
break;
case GRPC_QUEUE_SHUTDOWN:
gpr_strvec_add(&buf, gpr_strdup("QUEUE_SHUTDOWN"));
@ -78,11 +70,7 @@ char *grpc_event_string(grpc_event *ev) {
case GRPC_OP_COMPLETE:
gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: "));
addhdr(&buf, ev);
adderr(&buf, ev->data.op_complete);
break;
case GRPC_COMPLETION_DO_NOT_USE:
gpr_strvec_add(&buf, gpr_strdup("DO_NOT_USE (this is a bug)"));
addhdr(&buf, ev);
adderr(&buf, ev->success);
break;
}

@ -188,8 +188,6 @@ struct call_data {
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
static void do_nothing(void *unused, grpc_op_error ignored) {}
static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc);
@ -538,8 +536,8 @@ static void destroy_call_elem(grpc_call_element *elem) {
if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
for (i = 0; i < chand->server->num_shutdown_tags; i++) {
for (j = 0; j < chand->server->cq_count; j++) {
grpc_cq_end_server_shutdown(chand->server->cqs[j],
chand->server->shutdown_tags[i]);
grpc_cq_end_op(chand->server->cqs[j], chand->server->shutdown_tags[i],
NULL, 1);
}
}
}
@ -817,7 +815,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
gpr_mu_lock(&server->mu);
if (have_shutdown_tag) {
for (i = 0; i < server->cq_count; i++) {
grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
grpc_cq_begin_op(server->cqs[i], NULL);
}
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
@ -867,7 +865,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
if (server->lists[ALL_CALLS] == NULL) {
for (i = 0; i < server->num_shutdown_tags; i++) {
for (j = 0; j < server->cq_count; j++) {
grpc_cq_end_server_shutdown(server->cqs[j], server->shutdown_tags[i]);
grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1);
}
}
}
@ -1018,7 +1016,7 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_completion_queue *cq_bind,
void *tag) {
requested_call rc;
grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
grpc_cq_begin_op(server->unregistered_cq, NULL);
rc.type = BATCH_CALL;
rc.tag = tag;
rc.data.batch.cq_bind = cq_bind;
@ -1034,7 +1032,7 @@ grpc_call_error grpc_server_request_registered_call(
grpc_completion_queue *cq_bind, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
grpc_cq_begin_op(registered_method->cq, NULL);
rc.type = REGISTERED_CALL;
rc.tag = tag;
rc.data.registered.cq_bind = cq_bind;
@ -1046,10 +1044,9 @@ grpc_call_error grpc_server_request_registered_call(
return queue_call_request(server, &rc);
}
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
static void publish_registered_or_batch(grpc_call *call, int success,
void *tag);
static void publish_was_not_set(grpc_call *call, grpc_op_error status,
void *tag) {
static void publish_was_not_set(grpc_call *call, int success, void *tag) {
abort();
}
@ -1118,24 +1115,23 @@ static void fail_call(grpc_server *server, requested_call *rc) {
case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
GRPC_OP_ERROR);
grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, 0);
break;
case REGISTERED_CALL:
*rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL,
do_nothing, NULL, GRPC_OP_ERROR);
0);
break;
}
}
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
static void publish_registered_or_batch(grpc_call *call, int success,
void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
grpc_cq_end_op(calld->cq_new, tag, call, do_nothing, NULL, status);
grpc_cq_end_op(calld->cq_new, tag, call, success);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {

@ -49,15 +49,11 @@ ClientContext::~ClientContext() {
grpc_call_destroy(call_);
}
if (cq_) {
grpc_completion_queue_shutdown(cq_);
// Drain cq_.
grpc_event* ev;
grpc_completion_type t;
do {
ev = grpc_completion_queue_next(cq_, gpr_inf_future);
t = ev->type;
grpc_event_finish(ev);
} while (t != GRPC_QUEUE_SHUTDOWN);
grpc_completion_queue_shutdown(cq_);
while (grpc_completion_queue_next(cq_, gpr_inf_future).type !=
GRPC_QUEUE_SHUTDOWN)
;
grpc_completion_queue_destroy(cq_);
}
}

@ -48,53 +48,41 @@ CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
// Helper class so we can declare a unique_ptr with grpc_event
class EventDeleter {
public:
void operator()(grpc_event* ev) {
if (ev) grpc_event_finish(ev);
}
};
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
void** tag, bool* ok, gpr_timespec deadline) {
std::unique_ptr<grpc_event, EventDeleter> ev;
for (;;) {
ev.reset(grpc_completion_queue_next(cq_, deadline));
if (!ev) { /* got a NULL back because deadline passed */
auto ev = grpc_completion_queue_next(cq_, deadline);
switch (ev.type) {
case GRPC_QUEUE_TIMEOUT:
return TIMEOUT;
}
if (ev->type == GRPC_QUEUE_SHUTDOWN) {
case GRPC_QUEUE_SHUTDOWN:
return SHUTDOWN;
}
auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag);
*ok = ev->data.op_complete == GRPC_OP_OK;
case GRPC_OP_COMPLETE:
auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag);
*ok = ev.success != 0;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
return GOT_EVENT;
}
break;
}
}
}
bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
std::unique_ptr<grpc_event, EventDeleter> ev;
ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_future));
bool ok = ev->data.op_complete == GRPC_OP_OK;
auto ev = grpc_completion_queue_pluck(cq_, tag, gpr_inf_future);
bool ok = ev.success != 0;
void* ignored = tag;
GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
GPR_ASSERT(ignored == tag);
// Ignore mutations by FinalizeResult: Pluck returns the C API status
return ev->data.op_complete == GRPC_OP_OK;
return ev.success != 0;
}
void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
std::unique_ptr<grpc_event, EventDeleter> ev;
ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_time_0));
if (!ev) return;
bool ok = ev->data.op_complete == GRPC_OP_OK;
auto ev = grpc_completion_queue_pluck(cq_, tag, gpr_time_0);
if (ev.type == GRPC_QUEUE_TIMEOUT) return;
bool ok = ev.success != 0;
void* ignored = tag;
// the tag must be swallowed if using TryPluck
GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));

@ -63,8 +63,7 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) {
return bb;
}
typedef void(GPR_CALLTYPE *callback_funcptr)(grpc_op_error op_error,
void *batch_context);
typedef void(GPR_CALLTYPE *callback_funcptr)(int success, void *batch_context);
/*
* Helper to maintain lifetime of batch op inputs and store batch op outputs.
@ -302,27 +301,26 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) {
GPR_EXPORT grpc_completion_type GPR_CALLTYPE
grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_event ev;
grpcsharp_batch_context *batch_context;
grpc_completion_type t;
void(GPR_CALLTYPE * callback)(grpc_event *);
ev = grpc_completion_queue_next(cq, gpr_inf_future);
t = ev->type;
if (t == GRPC_OP_COMPLETE && ev->tag) {
t = ev.type;
if (t == GRPC_OP_COMPLETE && ev.tag) {
/* NEW API handler */
batch_context = (grpcsharp_batch_context *)ev->tag;
batch_context->callback(ev->data.op_complete, batch_context);
batch_context = (grpcsharp_batch_context *)ev.tag;
batch_context->callback(ev.success, batch_context);
grpcsharp_batch_context_destroy(batch_context);
} else if (ev->tag) {
} else if (ev.tag) {
/* call the callback in ev->tag */
/* C forbids to cast object pointers to function pointers, so
* we cast to intptr first.
*/
callback = (void(GPR_CALLTYPE *)(grpc_event *))(gpr_intptr)ev->tag;
(*callback)(ev);
callback = (void(GPR_CALLTYPE *)(grpc_event *))(gpr_intptr)ev.tag;
(*callback)(&ev);
}
grpc_event_finish(ev);
/* return completion type to allow some handling for events that have no
* tag - such as GRPC_QUEUE_SHUTDOWN
@ -789,7 +787,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_redirect_log(grpcsharp_log_func func) {
/* For testing */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_test_callback(callback_funcptr callback) {
callback(GRPC_OP_OK, NULL);
callback(1, NULL);
}
/* For testing */

@ -63,7 +63,7 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
void CompletionQueueAsyncWorker::Execute() {
result = grpc_completion_queue_next(queue, gpr_inf_future);
if (result->data.op_complete != GRPC_OP_OK) {
if (!result.success) {
SetErrorMessage("The batch encountered an error");
}
}
@ -96,25 +96,21 @@ void CompletionQueueAsyncWorker::HandleOKCallback() {
} else {
current_threads -= 1;
}
NanCallback *callback = GetTagCallback(result->tag);
Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result->tag)};
NanCallback *callback = GetTagCallback(result.tag);
Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result.tag)};
callback->Call(2, argv);
DestroyTag(result->tag);
grpc_event_finish(result);
result = NULL;
DestroyTag(result.tag);
}
void CompletionQueueAsyncWorker::HandleErrorCallback() {
NanScope();
NanCallback *callback = GetTagCallback(result->tag);
NanCallback *callback = GetTagCallback(result.tag);
Handle<Value> argv[] = {NanError(ErrorMessage())};
callback->Call(1, argv);
DestroyTag(result->tag);
grpc_event_finish(result);
result = NULL;
DestroyTag(result.tag);
}
} // namespace node

@ -70,7 +70,7 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker {
void HandleErrorCallback();
private:
grpc_event *result;
grpc_event result;
static grpc_completion_queue *queue;

@ -61,17 +61,12 @@ zend_class_entry *grpc_ce_call;
/* Frees and destroys an instance of wrapped_grpc_call */
void free_wrapped_grpc_call(void *object TSRMLS_DC) {
wrapped_grpc_call *call = (wrapped_grpc_call *)object;
grpc_event *event;
if (call->owned && call->wrapped != NULL) {
if (call->queue != NULL) {
grpc_completion_queue_shutdown(call->queue);
event = grpc_completion_queue_next(call->queue, gpr_inf_future);
while (event != NULL) {
if (event->type == GRPC_QUEUE_SHUTDOWN) {
break;
}
event = grpc_completion_queue_next(call->queue, gpr_inf_future);
}
while (grpc_completion_queue_next(call->queue, gpr_inf_future).type !=
GRPC_QUEUE_SHUTDOWN)
;
grpc_completion_queue_destroy(call->queue);
}
grpc_call_destroy(call->wrapped);
@ -287,7 +282,7 @@ PHP_METHOD(Call, startBatch) {
grpc_byte_buffer *message;
int cancelled;
grpc_call_error error;
grpc_event *event;
grpc_event event;
zval *result;
char *message_str;
size_t message_len;
@ -422,7 +417,7 @@ PHP_METHOD(Call, startBatch) {
}
event = grpc_completion_queue_pluck(call->queue, call->wrapped,
gpr_inf_future);
if (event->data.op_complete != GRPC_OP_OK) {
if (!event.success) {
zend_throw_exception(spl_ce_LogicException,
"The batch failed for some reason",
1 TSRMLS_CC);

@ -61,16 +61,11 @@ zend_class_entry *grpc_ce_server;
/* Frees and destroys an instance of wrapped_grpc_server */
void free_wrapped_grpc_server(void *object TSRMLS_DC) {
wrapped_grpc_server *server = (wrapped_grpc_server *)object;
grpc_event *event;
if (server->queue != NULL) {
grpc_completion_queue_shutdown(server->queue);
event = grpc_completion_queue_next(server->queue, gpr_inf_future);
while (event != NULL) {
if (event->type == GRPC_QUEUE_SHUTDOWN) {
break;
}
event = grpc_completion_queue_next(server->queue, gpr_inf_future);
}
while (grpc_completion_queue_next(server->queue, gpr_inf_future).type !=
GRPC_QUEUE_SHUTDOWN)
;
grpc_completion_queue_destroy(server->queue);
}
if (server->wrapped != NULL) {
@ -141,7 +136,7 @@ PHP_METHOD(Server, requestCall) {
grpc_call_details details;
grpc_metadata_array metadata;
zval *result;
grpc_event *event;
grpc_event event;
MAKE_STD_ZVAL(result);
object_init(result);
grpc_call_details_init(&details);
@ -154,7 +149,7 @@ PHP_METHOD(Server, requestCall) {
goto cleanup;
}
event = grpc_completion_queue_pluck(server->queue, NULL, gpr_inf_future);
if (event->data.op_complete != GRPC_OP_OK) {
if (!event.success) {
zend_throw_exception(spl_ce_LogicException,
"Failed to request a call for some reason",
1 TSRMLS_CC);

@ -349,7 +349,7 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
PyObject *deadline;
double double_deadline;
gpr_timespec deadline_timespec;
grpc_event *c_event;
grpc_event c_event;
PyObject *event_args;
PyObject *event;
@ -378,15 +378,14 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
grpc_completion_queue_next(self->c_completion_queue, deadline_timespec);
Py_END_ALLOW_THREADS;
if (c_event == NULL) {
Py_RETURN_NONE;
}
tag = (pygrpc_tag *)c_event->tag;
tag = (pygrpc_tag *)c_event.tag;
switch (c_event->type) {
switch (c_event.type) {
case GRPC_QUEUE_TIMEOUT:
Py_RETURN_NONE;
break;
case GRPC_QUEUE_SHUTDOWN:
event_args = pygrpc_stop_event_args(c_event);
event_args = pygrpc_stop_event_args(&c_event);
break;
case GRPC_OP_COMPLETE: {
if (!tag) {
@ -398,28 +397,27 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
if (tag) {
pygrpc_tag_destroy(tag);
}
grpc_event_finish(c_event);
return pygrpc_completion_queue_get(self, args);
case PYGRPC_WRITE_ACCEPTED:
event_args = pygrpc_write_event_args(c_event);
event_args = pygrpc_write_event_args(&c_event);
break;
case PYGRPC_FINISH_ACCEPTED:
event_args = pygrpc_complete_event_args(c_event);
event_args = pygrpc_complete_event_args(&c_event);
break;
case PYGRPC_SERVER_RPC_NEW:
event_args = pygrpc_service_event_args(c_event);
event_args = pygrpc_service_event_args(&c_event);
break;
case PYGRPC_READ:
event_args = pygrpc_read_event_args(c_event);
event_args = pygrpc_read_event_args(&c_event);
break;
case PYGRPC_CLIENT_METADATA_READ:
event_args = pygrpc_metadata_event_args(c_event);
event_args = pygrpc_metadata_event_args(&c_event);
break;
case PYGRPC_FINISHED_CLIENT:
event_args = pygrpc_finished_client_event_args(c_event);
event_args = pygrpc_finished_client_event_args(&c_event);
break;
case PYGRPC_FINISHED_SERVER:
event_args = pygrpc_finished_server_event_args(c_event);
event_args = pygrpc_finished_server_event_args(&c_event);
break;
default:
PyErr_SetString(PyExc_Exception, "Unrecognized op event type!");

@ -45,6 +45,8 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#define ROOT_EXPECTATION 1000
/* a set of metadata we expect to find on an event */
typedef struct metadata {
size_t count;
@ -60,9 +62,7 @@ typedef struct expectation {
struct expectation *prev;
grpc_completion_type type;
void *tag;
union {
grpc_op_error op_complete;
} data;
int success;
} expectation;
/* the verifier itself */
@ -75,7 +75,7 @@ struct cq_verifier {
cq_verifier *cq_verifier_create(grpc_completion_queue *cq) {
cq_verifier *v = gpr_malloc(sizeof(cq_verifier));
v->expect.type = GRPC_COMPLETION_DO_NOT_USE;
v->expect.type = ROOT_EXPECTATION;
v->expect.tag = NULL;
v->expect.next = &v->expect;
v->expect.prev = &v->expect;
@ -149,11 +149,9 @@ static void verify_matches(expectation *e, grpc_event *ev) {
abort();
break;
case GRPC_OP_COMPLETE:
GPR_ASSERT(e->data.op_complete == ev->data.op_complete);
break;
case GRPC_SERVER_SHUTDOWN:
GPR_ASSERT(e->success == ev->success);
break;
case GRPC_COMPLETION_DO_NOT_USE:
case GRPC_QUEUE_TIMEOUT:
gpr_log(GPR_ERROR, "not implemented");
abort();
break;
@ -165,13 +163,10 @@ static void expectation_to_strvec(gpr_strvec *buf, expectation *e) {
switch (e->type) {
case GRPC_OP_COMPLETE:
gpr_asprintf(&tmp, "GRPC_OP_COMPLETE result=%d", e->data.op_complete);
gpr_asprintf(&tmp, "GRPC_OP_COMPLETE result=%d", e->success);
gpr_strvec_add(buf, tmp);
break;
case GRPC_SERVER_SHUTDOWN:
gpr_strvec_add(buf, gpr_strdup("GRPC_SERVER_SHUTDOWN"));
break;
case GRPC_COMPLETION_DO_NOT_USE:
case GRPC_QUEUE_TIMEOUT:
case GRPC_QUEUE_SHUTDOWN:
gpr_log(GPR_ERROR, "not implemented");
abort();
@ -203,7 +198,7 @@ static void fail_no_event_received(cq_verifier *v) {
void cq_verify(cq_verifier *v) {
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
grpc_event *ev;
grpc_event ev;
expectation *e;
char *s;
gpr_strvec have_tags;
@ -212,15 +207,16 @@ void cq_verify(cq_verifier *v) {
while (v->expect.next != &v->expect) {
ev = grpc_completion_queue_next(v->cq, deadline);
if (!ev) {
if (ev.type == GRPC_QUEUE_TIMEOUT) {
fail_no_event_received(v);
break;
}
for (e = v->expect.next; e != &v->expect; e = e->next) {
gpr_asprintf(&s, " %p", e->tag);
gpr_strvec_add(&have_tags, s);
if (e->tag == ev->tag) {
verify_matches(e, ev);
if (e->tag == ev.tag) {
verify_matches(e, &ev);
e->next->prev = e->prev;
e->prev->next = e->next;
gpr_free(e);
@ -228,7 +224,7 @@ void cq_verify(cq_verifier *v) {
}
}
if (e == &v->expect) {
s = grpc_event_string(ev);
s = grpc_event_string(&ev);
gpr_log(GPR_ERROR, "event not found: %s", s);
gpr_free(s);
s = gpr_strvec_flatten(&have_tags, NULL);
@ -237,8 +233,6 @@ void cq_verify(cq_verifier *v) {
gpr_strvec_destroy(&have_tags);
abort();
}
grpc_event_finish(ev);
}
gpr_strvec_destroy(&have_tags);
@ -246,13 +240,13 @@ void cq_verify(cq_verifier *v) {
void cq_verify_empty(cq_verifier *v) {
gpr_timespec deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(1));
grpc_event *ev;
grpc_event ev;
GPR_ASSERT(v->expect.next == &v->expect && "expectation queue must be empty");
ev = grpc_completion_queue_next(v->cq, deadline);
if (ev != NULL) {
char *s = grpc_event_string(ev);
if (ev.type != GRPC_QUEUE_TIMEOUT) {
char *s = grpc_event_string(&ev);
gpr_log(GPR_ERROR, "unexpected event (expected nothing): %s", s);
gpr_free(s);
abort();
@ -269,10 +263,6 @@ static expectation *add(cq_verifier *v, grpc_completion_type type, void *tag) {
return e;
}
void cq_expect_completion(cq_verifier *v, void *tag, grpc_op_error result) {
add(v, GRPC_OP_COMPLETE, tag)->data.op_complete = result;
}
void cq_expect_server_shutdown(cq_verifier *v, void *tag) {
add(v, GRPC_SERVER_SHUTDOWN, tag);
void cq_expect_completion(cq_verifier *v, void *tag, int success) {
add(v, GRPC_OP_COMPLETE, tag)->success = success;
}

@ -57,8 +57,7 @@ void cq_verify_empty(cq_verifier *v);
Any functions taking ... expect a NULL terminated list of key/value pairs
(each pair using two parameter slots) of metadata that MUST be present in
the event. */
void cq_expect_completion(cq_verifier *v, void *tag, grpc_op_error result);
void cq_expect_server_shutdown(cq_verifier *v, void *tag);
void cq_expect_completion(cq_verifier *v, void *tag, int success);
int byte_buffer_eq_string(grpc_byte_buffer *byte_buffer, const char *string);
int contains_metadata(grpc_metadata_array *array, const char *key, const char *value);

@ -50,15 +50,10 @@ static gpr_timespec ms_from_now(int ms) {
}
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, ms_from_now(5000));
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
gpr_log(GPR_INFO, "Drained event type %d", type);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
void test_connect(const char *server_host, const char *client_host, int port,
@ -159,7 +154,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
&call_details,
&request_metadata_recv,
server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -177,10 +172,10 @@ void test_connect(const char *server_host, const char *client_host, int port,
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
@ -192,7 +187,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_call_destroy(s);
} else {
/* Check for a failed connection. */
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_DEADLINE_EXCEEDED);

@ -45,8 +45,6 @@ int main(int argc, char **argv) {
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2);
grpc_completion_queue *cq;
cq_verifier *cqv;
grpc_event *ev;
int done;
grpc_op ops[6];
grpc_op *op;
grpc_metadata_array trailing_metadata_recv;
@ -79,17 +77,15 @@ int main(int argc, char **argv) {
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call, ops, op - ops, tag(1)));
/* verify that all tags get completed */
cq_expect_completion(cqv, tag(1), GRPC_OP_OK);
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_DEADLINE_EXCEEDED);
grpc_completion_queue_shutdown(cq);
for (done = 0; !done;) {
ev = grpc_completion_queue_next(cq, gpr_inf_future);
done = ev->type == GRPC_QUEUE_SHUTDOWN;
grpc_event_finish(ev);
}
while (grpc_completion_queue_next(cq, gpr_inf_future).type !=
GRPC_QUEUE_SHUTDOWN)
;
grpc_completion_queue_destroy(cq);
grpc_call_destroy(call);
grpc_channel_destroy(chan);

@ -69,14 +69,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -144,7 +140,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNAUTHENTICATED);

@ -68,14 +68,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -164,7 +160,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
f.server, &s, &call_details,
&request_metadata_recv, f.server_cq, tag(2)));
cq_expect_completion(v_server, tag(2), GRPC_OP_OK);
cq_expect_completion(v_server, tag(2), 1);
cq_verify(v_server);
op = ops;
@ -184,10 +180,10 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
cq_expect_completion(v_server, tag(3), GRPC_OP_OK);
cq_expect_completion(v_server, tag(3), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == mode.expect_status);

@ -68,14 +68,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -166,7 +162,7 @@ static void test_cancel_after_accept_and_writes_closed(
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
f.server, &s, &call_details,
&request_metadata_recv, f.server_cq, tag(2)));
cq_expect_completion(v_server, tag(2), GRPC_OP_OK);
cq_expect_completion(v_server, tag(2), 1);
cq_verify(v_server);
op = ops;
@ -186,10 +182,10 @@ static void test_cancel_after_accept_and_writes_closed(
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
cq_expect_completion(v_server, tag(3), GRPC_OP_OK);
cq_expect_completion(v_server, tag(3), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == mode.expect_status);

@ -69,14 +69,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -160,7 +156,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == mode.expect_status);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -157,7 +153,7 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config,
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, test_ops, tag(1)));
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_CANCELLED);

@ -66,14 +66,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {

@ -75,14 +75,10 @@ static void shutdown_client(grpc_end2end_test_fixture *f) {
}
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, n_seconds_time(5));
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void end_test(grpc_end2end_test_fixture *f) {
@ -146,7 +142,7 @@ static void test_body(grpc_end2end_test_fixture f) {
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -163,10 +159,10 @@ static void test_body(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -55,14 +55,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -137,7 +133,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
&call_details,
&request_metadata_recv,
f->server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
/* should be able to shut down the server early
@ -158,10 +154,10 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -152,7 +148,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -164,10 +160,10 @@ static void test_early_server_shutdown_finishes_inflight_calls(
/* shutdown and destroy the server */
shutdown_server(&f);
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -120,7 +116,7 @@ static void test_early_server_shutdown_finishes_tags(
&request_metadata_recv,
f.server_cq, tag(101)));
grpc_server_shutdown(f.server);
cq_expect_completion(v_server, tag(101), GRPC_OP_ERROR);
cq_expect_completion(v_server, tag(101), 0);
cq_verify(v_server);
GPR_ASSERT(s == NULL);

@ -69,14 +69,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -115,7 +111,7 @@ static void empty_batch_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, op, 0, tag(1)));
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
grpc_call_destroy(c);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -151,7 +147,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
/* shutdown and destroy the server */
@ -172,14 +168,14 @@ static void test_early_server_shutdown_finishes_inflight_calls(
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
grpc_call_destroy(s);
cq_expect_server_shutdown(v_server, tag(0xdead));
cq_expect_completion(v_server, tag(0xdead), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -65,14 +65,10 @@ static gpr_timespec n_seconds_time(int n) {
}
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, n_seconds_time(5));
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -169,7 +165,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -192,10 +188,10 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -149,7 +145,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -166,10 +162,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
@ -203,7 +199,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
gpr_timespec deadline;
cq_verifier *v_client;
cq_verifier *v_server;
grpc_event *ev;
grpc_event ev;
grpc_call_details call_details;
grpc_metadata_array request_metadata_recv;
grpc_metadata_array initial_metadata_recv1;
@ -303,20 +299,18 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(c2, ops, op - ops, tag(402)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
ev = grpc_completion_queue_next(f.client_cq,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3));
GPR_ASSERT(ev);
GPR_ASSERT(ev->type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev->data.op_complete == GRPC_OP_OK);
GPR_ASSERT(ev->tag == tag(301) || ev->tag == tag(401));
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.success);
GPR_ASSERT(ev.tag == tag(301) || ev.tag == tag(401));
/* The /alpha or /beta calls started above could be invoked (but NOT both);
* check this here */
/* We'll get tag 303 or 403, we want 300, 400 */
live_call = ((int)(gpr_intptr)ev->tag) - 1;
grpc_event_finish(ev);
live_call = ((int)(gpr_intptr)ev.tag) - 1;
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
@ -333,20 +327,20 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(s1, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(live_call + 2), GRPC_OP_OK);
cq_expect_completion(v_client, tag(live_call + 2), 1);
/* first request is finished, we should be able to start the second */
live_call = (live_call == 300) ? 400 : 300;
cq_expect_completion(v_client, tag(live_call + 1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(live_call + 1), 1);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s2,
&call_details,
&request_metadata_recv,
f.server_cq, tag(201)));
cq_expect_completion(v_server, tag(201), GRPC_OP_OK);
cq_expect_completion(v_server, tag(201), 1);
cq_verify(v_server);
op = ops;
@ -364,10 +358,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(s2, ops, op - ops, tag(202)));
cq_expect_completion(v_client, tag(live_call + 2), GRPC_OP_OK);
cq_expect_completion(v_client, tag(live_call + 2), 1);
cq_verify(v_client);
cq_expect_completion(v_server, tag(202), GRPC_OP_OK);
cq_expect_completion(v_server, tag(202), 1);
cq_verify(v_server);
cq_verifier_destroy(v_client);

@ -65,14 +65,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -157,7 +153,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
&call_details,
&request_metadata_recv,
f.server_cq, tag(100)));
cq_expect_completion(v_server, tag(100), GRPC_OP_OK);
cq_expect_completion(v_server, tag(100), 1);
cq_verify(v_server);
op = ops;
@ -188,7 +184,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
op++;
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
@ -197,10 +193,10 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
op++;
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(103), GRPC_OP_OK);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(2), GRPC_OP_OK);
cq_expect_completion(v_client, tag(2), 1);
cq_verify(v_client);
grpc_byte_buffer_destroy(request_payload);
@ -225,12 +221,12 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(104)));
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(3), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_expect_completion(v_client, tag(3), 1);
cq_verify(v_client);
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(104), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_expect_completion(v_server, tag(104), 1);
cq_verify(v_server);
grpc_call_destroy(c);

@ -69,14 +69,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -150,7 +146,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) {
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -167,10 +163,10 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -185,7 +181,7 @@ static void test_request_response_with_metadata_and_payload(
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -209,10 +205,10 @@ static void test_request_response_with_metadata_and_payload(
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -171,7 +167,7 @@ static void test_request_response_with_metadata_and_payload(
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -195,10 +191,10 @@ static void test_request_response_with_metadata_and_payload(
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -163,7 +159,7 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -186,10 +182,10 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -170,7 +170,7 @@ static void test_request_response_with_metadata_and_payload(
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -195,10 +195,10 @@ static void test_request_response_with_metadata_and_payload(
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -167,7 +163,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -187,10 +183,10 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -67,14 +67,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -158,7 +154,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -178,10 +174,10 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -55,14 +55,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -145,7 +141,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
&call_details,
&request_metadata_recv,
f->server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -162,10 +158,10 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -69,14 +69,10 @@ static gpr_timespec n_seconds_time(int n) {
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
@ -151,7 +147,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
&call_details,
&request_metadata_recv,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
@ -168,10 +164,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), GRPC_OP_OK);
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), GRPC_OP_OK);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -93,7 +93,7 @@ static void step_ping_pong_request(void) {
"localhost", gpr_inf_future);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call, ops, op - ops, (void *)1));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_completion_queue_next(cq, gpr_inf_future);
grpc_call_destroy(call);
grpc_byte_buffer_destroy(response_payload_recv);
call = NULL;
@ -106,7 +106,7 @@ static void init_ping_pong_stream(void) {
stream_init_op.data.send_initial_metadata.count = 0;
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call, &stream_init_op, 1, (void *)1));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_completion_queue_next(cq, gpr_inf_future);
grpc_metadata_array_init(&initial_metadata_recv);
@ -119,7 +119,7 @@ static void init_ping_pong_stream(void) {
static void step_ping_pong_stream(void) {
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call, stream_step_ops, 2, (void *)1));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_completion_queue_next(cq, gpr_inf_future);
grpc_byte_buffer_destroy(response_payload_recv);
}
@ -147,7 +147,6 @@ int main(int argc, char **argv) {
char *fake_argv[1];
int payload_size = 1;
int done;
int secure = 0;
char *target = "localhost:443";
gpr_cmdline *cl;
@ -209,12 +208,9 @@ int main(int argc, char **argv) {
grpc_channel_destroy(channel);
grpc_completion_queue_shutdown(cq);
done = 0;
while (!done) {
grpc_event *ev = grpc_completion_queue_next(cq, gpr_inf_future);
done = (ev->type == GRPC_QUEUE_SHUTDOWN);
grpc_event_finish(ev);
}
while (grpc_completion_queue_next(cq, gpr_inf_future).type !=
GRPC_QUEUE_SHUTDOWN)
;
grpc_completion_queue_destroy(cq);
grpc_byte_buffer_destroy(the_buffer);
gpr_slice_unref(slice);

@ -169,7 +169,7 @@ static void start_send_status(void) {
static void sigint_handler(int x) { _exit(0); }
int main(int argc, char **argv) {
grpc_event *ev;
grpc_event ev;
call_state *s;
char *addr_buf = NULL;
gpr_cmdline *cl;
@ -233,9 +233,8 @@ int main(int argc, char **argv) {
}
ev = grpc_completion_queue_next(
cq, gpr_time_add(gpr_now(), gpr_time_from_micros(1000000)));
if (!ev) continue;
s = ev->tag;
switch (ev->type) {
s = ev.tag;
switch (ev.type) {
case GRPC_OP_COMPLETE:
switch ((gpr_intptr)s) {
case FLING_SERVER_NEW_REQUEST:
@ -297,10 +296,9 @@ int main(int argc, char **argv) {
GPR_ASSERT(shutdown_started);
shutdown_finished = 1;
break;
default:
GPR_ASSERT(0);
case GRPC_QUEUE_TIMEOUT:
break;
}
grpc_event_finish(ev);
}
grpc_profiler_stop();
grpc_call_details_destroy(&call_details);

@ -43,10 +43,6 @@
#define LOG_TEST() gpr_log(GPR_INFO, "%s", __FUNCTION__)
static void increment_int_on_finish(void *user_data, grpc_op_error error) {
++*(int *)user_data;
}
static void *create_test_tag(void) {
static gpr_intptr i = 0;
return (void *)(++i);
@ -54,12 +50,10 @@ static void *create_test_tag(void) {
/* helper for tests to shutdown correctly and tersely */
static void shutdown_and_destroy(grpc_completion_queue *cc) {
grpc_event *ev;
grpc_event ev;
grpc_completion_queue_shutdown(cc);
ev = grpc_completion_queue_next(cc, gpr_inf_past);
GPR_ASSERT(ev != NULL);
GPR_ASSERT(ev->type == GRPC_QUEUE_SHUTDOWN);
grpc_event_finish(ev);
GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
grpc_completion_queue_destroy(cc);
}
@ -75,42 +69,36 @@ static void test_wait_empty(void) {
LOG_TEST();
cc = grpc_completion_queue_create();
GPR_ASSERT(grpc_completion_queue_next(cc, gpr_now()) == NULL);
GPR_ASSERT(grpc_completion_queue_next(cc, gpr_now()).type ==
GRPC_QUEUE_TIMEOUT);
shutdown_and_destroy(cc);
}
static void test_cq_end_op(void) {
grpc_event *ev;
grpc_event ev;
grpc_completion_queue *cc;
int on_finish_called = 0;
void *tag = create_test_tag();
LOG_TEST();
cc = grpc_completion_queue_create();
grpc_cq_begin_op(cc, NULL, GRPC_OP_COMPLETE);
grpc_cq_end_op(cc, tag, NULL, increment_int_on_finish, &on_finish_called,
GRPC_OP_OK);
grpc_cq_begin_op(cc, NULL);
grpc_cq_end_op(cc, tag, NULL, 1);
ev = grpc_completion_queue_next(cc, gpr_inf_past);
GPR_ASSERT(ev != NULL);
GPR_ASSERT(ev->type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev->tag == tag);
GPR_ASSERT(ev->data.op_complete == GRPC_OP_OK);
GPR_ASSERT(on_finish_called == 0);
grpc_event_finish(ev);
GPR_ASSERT(on_finish_called == 1);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag);
GPR_ASSERT(ev.success);
shutdown_and_destroy(cc);
}
static void test_pluck(void) {
grpc_event *ev;
grpc_event ev;
grpc_completion_queue *cc;
void *tags[128];
unsigned i, j;
int on_finish_called = 0;
LOG_TEST();
@ -124,34 +112,26 @@ static void test_pluck(void) {
cc = grpc_completion_queue_create();
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc, NULL, GRPC_OP_COMPLETE);
grpc_cq_end_op(cc, tags[i], NULL, increment_int_on_finish,
&on_finish_called, GRPC_OP_OK);
grpc_cq_begin_op(cc, NULL);
grpc_cq_end_op(cc, tags[i], NULL, 1);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
ev = grpc_completion_queue_pluck(cc, tags[i], gpr_inf_past);
GPR_ASSERT(ev->tag == tags[i]);
grpc_event_finish(ev);
GPR_ASSERT(ev.tag == tags[i]);
}
GPR_ASSERT(on_finish_called == GPR_ARRAY_SIZE(tags));
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc, NULL, GRPC_OP_COMPLETE);
grpc_cq_end_op(cc, tags[i], NULL, increment_int_on_finish,
&on_finish_called, GRPC_OP_OK);
grpc_cq_begin_op(cc, NULL);
grpc_cq_end_op(cc, tags[i], NULL, 1);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
gpr_inf_past);
GPR_ASSERT(ev->tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
grpc_event_finish(ev);
GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
}
GPR_ASSERT(on_finish_called == 2 * GPR_ARRAY_SIZE(tags));
shutdown_and_destroy(cc);
}
@ -182,7 +162,7 @@ static void producer_thread(void *arg) {
gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
for (i = 0; i < TEST_THREAD_EVENTS; i++) {
grpc_cq_begin_op(opt->cc, NULL, GRPC_OP_COMPLETE);
grpc_cq_begin_op(opt->cc, NULL);
}
gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
@ -191,8 +171,7 @@ static void producer_thread(void *arg) {
gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
for (i = 0; i < TEST_THREAD_EVENTS; i++) {
grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, NULL, NULL, NULL,
GRPC_OP_OK);
grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, NULL, 1);
opt->events_triggered++;
}
@ -202,7 +181,7 @@ static void producer_thread(void *arg) {
static void consumer_thread(void *arg) {
test_thread_options *opt = arg;
grpc_event *ev;
grpc_event ev;
gpr_log(GPR_INFO, "consumer %d started", opt->id);
gpr_event_set(&opt->on_started, (void *)(gpr_intptr) 1);
@ -217,20 +196,17 @@ static void consumer_thread(void *arg) {
gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
for (;;) {
ev = grpc_completion_queue_next(opt->cc, ten_seconds_time());
GPR_ASSERT(ev);
switch (ev->type) {
switch (ev.type) {
case GRPC_OP_COMPLETE:
GPR_ASSERT(ev->data.op_complete == GRPC_OP_OK);
GPR_ASSERT(ev.success);
opt->events_triggered++;
grpc_event_finish(ev);
break;
case GRPC_QUEUE_SHUTDOWN:
gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
gpr_event_set(&opt->on_finished, (void *)(gpr_intptr) 1);
grpc_event_finish(ev);
return;
default:
gpr_log(GPR_ERROR, "Invalid event received: %d", ev->type);
case GRPC_QUEUE_TIMEOUT:
gpr_log(GPR_ERROR, "Invalid timeout received");
abort();
}
}

@ -79,7 +79,7 @@ int main(int argc, char **argv) {
grpc_call_start_batch(call, ops, op - ops, tag(1)));
/* the call should immediately fail */
cq_expect_completion(cqv, tag(1), GRPC_OP_OK);
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
grpc_call_destroy(call);

Loading…
Cancel
Save