From 137511f2d03224cd135db8f223a6704adff6beed Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sat, 1 Aug 2015 16:15:45 -0700 Subject: [PATCH 1/2] Formalize max pluckers --- include/grpc/grpc.h | 9 ++++++++- src/core/surface/completion_queue.c | 6 ++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 17906c80f69..d29f71bbe76 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -391,10 +391,17 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, otherwise a grpc_event describing the event that occurred. Callers must not call grpc_completion_queue_next and - grpc_completion_queue_pluck simultaneously on the same completion queue. */ + grpc_completion_queue_pluck simultaneously on the same completion queue. + + Completion queues support a maximum of GRPC_MAX_COMPLETION_QUEUE_PLUCKERS + concurrently executing plucks at any time. */ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline); +/** Maximum number of outstanding grpc_completion_queue_pluck executions per + completion queue */ +#define GRPC_MAX_COMPLETION_QUEUE_PLUCKERS 6 + /** Begin destruction of a completion queue. Once all possible events are drained then grpc_completion_queue_next will start to produce GRPC_QUEUE_SHUTDOWN events only. At that point it's safe to call diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 732813fed0e..6bfccd2a2e2 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -45,8 +45,6 @@ #include #include -#define MAX_PLUCKERS 4 - typedef struct { grpc_pollset_worker *worker; void *tag; @@ -68,7 +66,7 @@ struct grpc_completion_queue { int shutdown_called; int is_server_cq; int num_pluckers; - plucker pluckers[MAX_PLUCKERS]; + plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; }; grpc_completion_queue *grpc_completion_queue_create(void) { @@ -205,7 +203,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, static void add_plucker(grpc_completion_queue *cc, void *tag, grpc_pollset_worker *worker) { - GPR_ASSERT(cc->num_pluckers != MAX_PLUCKERS); + GPR_ASSERT(cc->num_pluckers != GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); cc->pluckers[cc->num_pluckers].tag = tag; cc->pluckers[cc->num_pluckers].worker = worker; cc->num_pluckers++; From 9c6e902c47a8618f292df2e977ce707907706473 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sat, 1 Aug 2015 16:20:17 -0700 Subject: [PATCH 2/2] Dont crash on too many pluckers --- src/core/surface/completion_queue.c | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 6bfccd2a2e2..9d6f78db559 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -201,12 +201,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, return ret; } -static void add_plucker(grpc_completion_queue *cc, void *tag, - grpc_pollset_worker *worker) { - GPR_ASSERT(cc->num_pluckers != GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); +static int add_plucker(grpc_completion_queue *cc, void *tag, + grpc_pollset_worker *worker) { + if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { + return 0; + } cc->pluckers[cc->num_pluckers].tag = tag; cc->pluckers[cc->num_pluckers].worker = worker; cc->num_pluckers++; + return 1; } static void del_plucker(grpc_completion_queue *cc, void *tag, @@ -259,7 +262,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ret.type = GRPC_QUEUE_SHUTDOWN; break; } - add_plucker(cc, tag, &worker); + if (!add_plucker(cc, tag, &worker)) { + gpr_log(GPR_DEBUG, + "Too many outstanding grpc_completion_queue_pluck calls: maximum is %d". + GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + memset(&ret, 0, sizeof(ret)); + /* TODO(ctiller): should we use a different result here */ + ret.type = GRPC_QUEUE_TIMEOUT; + break; + } if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { del_plucker(cc, tag, &worker); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));