Added grpc_cq_alarm

pull/3618/head
David Garcia Quintas 9 years ago
parent f8460df564
commit 0dfbdf6e54
  1. 35
      src/core/surface/completion_queue.c
  2. 27
      src/core/surface/completion_queue.h
  3. 42
      test/core/surface/completion_queue_test.c

@ -44,6 +44,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/time.h>
typedef struct { typedef struct {
grpc_pollset_worker *worker; grpc_pollset_worker *worker;
@ -354,3 +355,37 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
grpc_cq_completion *c) {}
static void cq_alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, int success) {
grpc_cq_alarm *cq_alarm = arg;
grpc_cq_end_op(exec_ctx, cq_alarm->cq, cq_alarm->tag, success,
do_nothing_end_completion, NULL, &cq_alarm->completion);
}
grpc_cq_alarm *grpc_cq_alarm_create(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq,
gpr_timespec deadline, void *tag) {
grpc_cq_alarm *cq_alarm = gpr_malloc(sizeof(grpc_cq_alarm));
GRPC_CQ_INTERNAL_REF(cq, "cq_alarm");
cq_alarm->cq = cq;
cq_alarm->tag = tag;
grpc_alarm_init(exec_ctx, &cq_alarm->alarm, deadline, cq_alarm_cb, cq_alarm,
gpr_now(GPR_CLOCK_MONOTONIC));
grpc_cq_begin_op(cq);
return cq_alarm;
}
void grpc_cq_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm) {
grpc_alarm_cancel(exec_ctx, &cq_alarm->alarm);
}
void grpc_cq_alarm_destroy(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm) {
grpc_cq_alarm_cancel(exec_ctx, cq_alarm);
GRPC_CQ_INTERNAL_UNREF(cq_alarm->cq, "cq_alarm");
gpr_free(cq_alarm);
}

@ -36,6 +36,7 @@
/* Internal API for completion queues */ /* Internal API for completion queues */
#include "src/core/iomgr/alarm.h"
#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h> #include <grpc/grpc.h>
@ -51,6 +52,16 @@ typedef struct grpc_cq_completion {
gpr_uintptr next; gpr_uintptr next;
} grpc_cq_completion; } grpc_cq_completion;
/** An alarm associated with a completion queue. */
typedef struct grpc_cq_alarm {
grpc_alarm alarm;
grpc_cq_completion completion;
/** completion queue where events about this alarm will be posted */
grpc_completion_queue *cq;
/** user supplied tag */
void *tag;
} grpc_cq_alarm;
#ifdef GRPC_CQ_REF_COUNT_DEBUG #ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line); const char *file, int line);
@ -83,4 +94,20 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc);
/** Create a completion queue alarm instance associated to \a cq.
*
* Once the alarm expires (at \a deadline) or it's cancelled (see ...), an event
* with tag \a tag will be added to \a cq. If the alarm expired, the event's
* success bit will be true, false otherwise (ie, upon cancellation). */
grpc_cq_alarm *grpc_cq_alarm_create(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq,
gpr_timespec deadline, void *tag);
/** Cancel a completion queue alarm. Calling this function ove an alarm that has
* already run has no effect. */
void grpc_cq_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm);
/** Destroy the given completion queue alarm, cancelling it in the process. */
void grpc_cq_alarm_destroy(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm);
#endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ #endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */

@ -102,6 +102,47 @@ static void test_cq_end_op(void) {
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
static void test_cq_alarm(void) {
grpc_completion_queue *cc;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
LOG_TEST("test_cq_alarm");
cc = grpc_completion_queue_create(NULL);
{
/* regular expiry */
grpc_event ev;
void *tag = create_test_tag();
grpc_cq_alarm *cq_alarm = grpc_cq_alarm_create(
&exec_ctx, cc, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), tag);
ev = grpc_completion_queue_next(cc, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2),
NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag);
GPR_ASSERT(ev.success);
grpc_cq_alarm_destroy(&exec_ctx, cq_alarm);
}
{
/* cancellation */
grpc_event ev;
void *tag = create_test_tag();
grpc_cq_alarm *cq_alarm = grpc_cq_alarm_create(
&exec_ctx, cc, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), tag);
grpc_cq_alarm_cancel(&exec_ctx, cq_alarm);
GPR_ASSERT(grpc_exec_ctx_flush(&exec_ctx) == 1);
ev = grpc_completion_queue_next(cc, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1),
NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag);
GPR_ASSERT(ev.success == 0);
grpc_cq_alarm_destroy(&exec_ctx, cq_alarm);
}
shutdown_and_destroy(cc);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_shutdown_then_next_polling(void) { static void test_shutdown_then_next_polling(void) {
grpc_completion_queue *cc; grpc_completion_queue *cc;
grpc_event event; grpc_event event;
@ -343,6 +384,7 @@ int main(int argc, char **argv) {
test_shutdown_then_next_with_timeout(); test_shutdown_then_next_with_timeout();
test_cq_end_op(); test_cq_end_op();
test_pluck(); test_pluck();
test_cq_alarm();
test_threading(1, 1); test_threading(1, 1);
test_threading(1, 10); test_threading(1, 10);
test_threading(10, 1); test_threading(10, 1);

Loading…
Cancel
Save