|
|
@ -40,6 +40,7 @@ |
|
|
|
|
|
|
|
|
|
|
|
#include <grpc/grpc.h> |
|
|
|
#include <grpc/grpc.h> |
|
|
|
#include <grpc/support/time.h> |
|
|
|
#include <grpc/support/time.h> |
|
|
|
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
#include "rb_grpc.h" |
|
|
|
#include "rb_grpc.h" |
|
|
|
|
|
|
|
|
|
|
|
/* Used to allow grpc_completion_queue_next call to release the GIL */ |
|
|
|
/* Used to allow grpc_completion_queue_next call to release the GIL */ |
|
|
@ -51,23 +52,6 @@ typedef struct next_call_stack { |
|
|
|
volatile int interrupted; |
|
|
|
volatile int interrupted; |
|
|
|
} next_call_stack; |
|
|
|
} next_call_stack; |
|
|
|
|
|
|
|
|
|
|
|
/* Calls grpc_completion_queue_next without holding the ruby GIL */ |
|
|
|
|
|
|
|
static void *grpc_rb_completion_queue_next_no_gil(void *param) { |
|
|
|
|
|
|
|
next_call_stack *const next_call = (next_call_stack*)param; |
|
|
|
|
|
|
|
gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN); |
|
|
|
|
|
|
|
gpr_timespec deadline; |
|
|
|
|
|
|
|
do { |
|
|
|
|
|
|
|
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment); |
|
|
|
|
|
|
|
next_call->event = grpc_completion_queue_next(next_call->cq, |
|
|
|
|
|
|
|
deadline, NULL); |
|
|
|
|
|
|
|
if (next_call->event.type != GRPC_QUEUE_TIMEOUT || |
|
|
|
|
|
|
|
gpr_time_cmp(deadline, next_call->timeout) > 0) { |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} while (!next_call->interrupted); |
|
|
|
|
|
|
|
return NULL; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */ |
|
|
|
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */ |
|
|
|
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { |
|
|
|
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { |
|
|
|
next_call_stack *const next_call = (next_call_stack*)param; |
|
|
|
next_call_stack *const next_call = (next_call_stack*)param; |
|
|
@ -86,46 +70,9 @@ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { |
|
|
|
return NULL; |
|
|
|
return NULL; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* Shuts down and drains the completion queue if necessary.
|
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* This is done when the ruby completion queue object is about to be GCed. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) { |
|
|
|
|
|
|
|
next_call_stack next_call; |
|
|
|
|
|
|
|
grpc_completion_type type; |
|
|
|
|
|
|
|
int drained = 0; |
|
|
|
|
|
|
|
MEMZERO(&next_call, next_call_stack, 1); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grpc_completion_queue_shutdown(cq); |
|
|
|
|
|
|
|
next_call.cq = cq; |
|
|
|
|
|
|
|
next_call.event.type = GRPC_QUEUE_TIMEOUT; |
|
|
|
|
|
|
|
/* TODO: the timeout should be a module level constant that defaults
|
|
|
|
|
|
|
|
* to gpr_inf_future(GPR_CLOCK_REALTIME). |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* - at the moment this does not work, it stalls. Using a small timeout like |
|
|
|
|
|
|
|
* this one works, and leads to fast test run times; a longer timeout was |
|
|
|
|
|
|
|
* causing unnecessary delays in the test runs. |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* - investigate further, this is probably another example of C-level cleanup |
|
|
|
|
|
|
|
* not working consistently in all cases. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
next_call.timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), |
|
|
|
|
|
|
|
gpr_time_from_micros(5e3, GPR_TIMESPAN)); |
|
|
|
|
|
|
|
do { |
|
|
|
|
|
|
|
rb_thread_call_without_gvl(grpc_rb_completion_queue_next_no_gil, |
|
|
|
|
|
|
|
(void *)&next_call, NULL, NULL); |
|
|
|
|
|
|
|
type = next_call.event.type; |
|
|
|
|
|
|
|
if (type == GRPC_QUEUE_TIMEOUT) break; |
|
|
|
|
|
|
|
if (type != GRPC_QUEUE_SHUTDOWN) { |
|
|
|
|
|
|
|
++drained; |
|
|
|
|
|
|
|
rb_warning("completion queue shutdown: %d undrained events", drained); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} while (type != GRPC_QUEUE_SHUTDOWN); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Helper function to free a completion queue. */ |
|
|
|
/* Helper function to free a completion queue. */ |
|
|
|
void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) { |
|
|
|
void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) { |
|
|
|
grpc_rb_completion_queue_shutdown_drain(cq); |
|
|
|
grpc_completion_queue_shutdown(cq); |
|
|
|
grpc_completion_queue_destroy(cq); |
|
|
|
grpc_completion_queue_destroy(cq); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|