pull/36903/head
Alexander Polcyn 9 months ago
parent 0263574532
commit ecd3a79e8d
  1. 10
      src/ruby/ext/grpc/rb_completion_queue.c
  2. 8
      src/ruby/ext/grpc/rb_completion_queue.h
  3. 17
      src/ruby/ext/grpc/rb_server.c

@ -35,15 +35,15 @@ typedef struct next_call_stack {
grpc_event event; grpc_event event;
gpr_timespec timeout; gpr_timespec timeout;
void* tag; void* tag;
void(*unblock_func)(void*); void (*unblock_func)(void*);
void* unblock_func_arg; void* unblock_func_arg;
} next_call_stack; } next_call_stack;
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */ /* Calls grpc_completion_queue_pluck without holding the ruby GIL */
static void* grpc_rb_completion_queue_pluck_no_gil(void* param) { static void* grpc_rb_completion_queue_pluck_no_gil(void* param) {
next_call_stack* const next_call = (next_call_stack*)param; next_call_stack* const next_call = (next_call_stack*)param;
next_call->event = grpc_completion_queue_pluck( next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
next_call->cq, next_call->tag, next_call->timeout, NULL); next_call->timeout, NULL);
return NULL; return NULL;
} }
@ -66,7 +66,9 @@ static void outer_unblock_func(void* param) {
/* Does the same thing as grpc_completion_queue_pluck, while properly releasing /* Does the same thing as grpc_completion_queue_pluck, while properly releasing
the GVL and handling interrupts */ the GVL and handling interrupts */
grpc_event rb_completion_queue_pluck(grpc_completion_queue* queue, void* tag, grpc_event rb_completion_queue_pluck(grpc_completion_queue* queue, void* tag,
gpr_timespec deadline, void(*unblock_func)(void* param), void* unblock_func_arg) { gpr_timespec deadline,
void (*unblock_func)(void* param),
void* unblock_func_arg) {
next_call_stack next_call; next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1); MEMZERO(&next_call, next_call_stack, 1);
next_call.cq = queue; next_call.cq = queue;

@ -31,10 +31,12 @@ void grpc_rb_completion_queue_destroy(grpc_completion_queue* cq);
* This avoids having code that holds the GIL repeated at multiple sites. * This avoids having code that holds the GIL repeated at multiple sites.
* *
* unblock_func is invoked with the provided argument to unblock the CQ * unblock_func is invoked with the provided argument to unblock the CQ
* operation in the event of process termination (e.g. a signal), but unblock_func * operation in the event of process termination (e.g. a signal), but
* may be NULL in which case it's unused. * unblock_func may be NULL in which case it's unused.
*/ */
grpc_event rb_completion_queue_pluck(grpc_completion_queue* queue, void* tag, grpc_event rb_completion_queue_pluck(grpc_completion_queue* queue, void* tag,
gpr_timespec deadline, void(*unblock_func)(void* param), void* unblock_func_arg); gpr_timespec deadline,
void (*unblock_func)(void* param),
void* unblock_func_arg);
#endif /* GRPC_RB_COMPLETION_QUEUE_H_ */ #endif /* GRPC_RB_COMPLETION_QUEUE_H_ */

@ -205,9 +205,10 @@ struct server_request_call_args {
static void shutdown_server_unblock_func(void* arg) { static void shutdown_server_unblock_func(void* arg) {
grpc_rb_server* server = (grpc_rb_server*)arg; grpc_rb_server* server = (grpc_rb_server*)arg;
gpr_mu_lock(&server->shutdown_and_notify_done_mu); gpr_mu_lock(&server->shutdown_and_notify_done_mu);
gpr_log(GPR_INFO, gpr_log(
"GRPC_RUBY: shutdown_server_unblock_func shutdown_and_notify_done: %d", GPR_INFO,
server->shutdown_and_notify_done); "GRPC_RUBY: shutdown_server_unblock_func shutdown_and_notify_done: %d",
server->shutdown_and_notify_done);
if (server->shutdown_and_notify_done) { if (server->shutdown_and_notify_done) {
gpr_mu_unlock(&server->shutdown_and_notify_done_mu); gpr_mu_unlock(&server->shutdown_and_notify_done_mu);
return; return;
@ -221,10 +222,11 @@ static void shutdown_server_unblock_func(void* arg) {
grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag); grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
// Following call is blocking, but should finish quickly since we've // Following call is blocking, but should finish quickly since we've
// cancelled all calls. // cancelled all calls.
event = grpc_completion_queue_pluck( event = grpc_completion_queue_pluck(server->queue, tag,
server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"GRPC_RUBY: shutdown_server_unblock_func pluck event.type: %d event.success: %d", "GRPC_RUBY: shutdown_server_unblock_func pluck event.type: %d "
"event.success: %d",
event.type, event.success); event.type, event.success);
} }
@ -251,7 +253,8 @@ static VALUE grpc_rb_server_request_call_try(VALUE value_args) {
} }
grpc_event ev = rb_completion_queue_pluck( grpc_event ev = rb_completion_queue_pluck(
args->server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), shutdown_server_unblock_func, args->server); args->server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME),
shutdown_server_unblock_func, args->server);
if (!ev.success) { if (!ev.success) {
rb_raise(grpc_rb_eCallError, "request_call completion failed"); rb_raise(grpc_rb_eCallError, "request_call completion failed");
} }

Loading…
Cancel
Save