Merge remote-tracking branch 'origin/cq_create_api_changes' into cq_create_api_changes

reviewable/pr9972/r1
Sree Kuchibhotla 8 years ago
commit 45c4618c33
  1. 17
      src/node/ext/completion_queue_threadpool.cc
  2. 19
      src/node/ext/completion_queue_uv.cc
  3. 5
      src/node/ext/server_generic.cc
  4. 4
      src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
  5. 3
      src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m
  6. 8
      src/objective-c/tests/CronetUnitTests/CronetUnitTests.m
  7. 2
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  8. 15
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  9. 76
      src/ruby/ext/grpc/rb_channel.c
  10. 53
      src/ruby/ext/grpc/rb_server.c

@ -34,14 +34,14 @@
/* I don't like using #ifndef, but I don't see a better way to do this */ /* I don't like using #ifndef, but I don't see a better way to do this */
#ifndef GRPC_UV #ifndef GRPC_UV
#include <node.h>
#include <nan.h> #include <nan.h>
#include <node.h>
#include "call.h"
#include "completion_queue.h"
#include "grpc/grpc.h" #include "grpc/grpc.h"
#include "grpc/support/log.h" #include "grpc/support/log.h"
#include "grpc/support/time.h" #include "grpc/support/time.h"
#include "completion_queue.h"
#include "call.h"
namespace grpc { namespace grpc {
namespace node { namespace node {
@ -111,8 +111,8 @@ CompletionQueueAsyncWorker::CompletionQueueAsyncWorker()
CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
void CompletionQueueAsyncWorker::Execute() { void CompletionQueueAsyncWorker::Execute() {
result = result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME),
grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); NULL);
if (!result.success) { if (!result.success) {
SetErrorMessage("The async function encountered an error"); SetErrorMessage("The async function encountered an error");
} }
@ -141,7 +141,8 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
Nan::HandleScope scope; Nan::HandleScope scope;
current_threads = 0; current_threads = 0;
waiting_next_calls = 0; waiting_next_calls = 0;
queue = grpc_completion_queue_create(NULL); queue =
grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
} }
void CompletionQueueAsyncWorker::HandleOKCallback() { void CompletionQueueAsyncWorker::HandleOKCallback() {
@ -173,9 +174,7 @@ grpc_completion_queue *GetCompletionQueue() {
return CompletionQueueAsyncWorker::GetQueue(); return CompletionQueueAsyncWorker::GetQueue();
} }
void CompletionQueueNext() { void CompletionQueueNext() { CompletionQueueAsyncWorker::Next(); }
CompletionQueueAsyncWorker::Next();
}
void CompletionQueueInit(Local<Object> exports) { void CompletionQueueInit(Local<Object> exports) {
CompletionQueueAsyncWorker::Init(exports); CompletionQueueAsyncWorker::Init(exports);

@ -33,10 +33,10 @@
#ifdef GRPC_UV #ifdef GRPC_UV
#include <uv.h> #include <grpc/grpc.h>
#include <node.h> #include <node.h>
#include <uv.h>
#include <v8.h> #include <v8.h>
#include <grpc/grpc.h>
#include "call.h" #include "call.h"
#include "completion_queue.h" #include "completion_queue.h"
@ -57,8 +57,8 @@ void drain_completion_queue(uv_prepare_t *handle) {
grpc_event event; grpc_event event;
(void)handle; (void)handle;
do { do {
event = grpc_completion_queue_next( event = grpc_completion_queue_next(queue, gpr_inf_past(GPR_CLOCK_MONOTONIC),
queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL); NULL);
if (event.type == GRPC_OP_COMPLETE) { if (event.type == GRPC_OP_COMPLETE) {
Nan::Callback *callback = grpc::node::GetTagCallback(event.tag); Nan::Callback *callback = grpc::node::GetTagCallback(event.tag);
@ -67,8 +67,8 @@ void drain_completion_queue(uv_prepare_t *handle) {
grpc::node::GetTagNodeValue(event.tag)}; grpc::node::GetTagNodeValue(event.tag)};
callback->Call(2, argv); callback->Call(2, argv);
} else { } else {
Local<Value> argv[] = {Nan::Error( Local<Value> argv[] = {
"The async function encountered an error")}; Nan::Error("The async function encountered an error")};
callback->Call(1, argv); callback->Call(1, argv);
} }
grpc::node::CompleteTag(event.tag); grpc::node::CompleteTag(event.tag);
@ -81,9 +81,7 @@ void drain_completion_queue(uv_prepare_t *handle) {
} while (event.type != GRPC_QUEUE_TIMEOUT); } while (event.type != GRPC_QUEUE_TIMEOUT);
} }
grpc_completion_queue *GetCompletionQueue() { grpc_completion_queue *GetCompletionQueue() { return queue; }
return queue;
}
void CompletionQueueNext() { void CompletionQueueNext() {
if (pending_batches == 0) { if (pending_batches == 0) {
@ -94,7 +92,8 @@ void CompletionQueueNext() {
} }
void CompletionQueueInit(Local<Object> exports) { void CompletionQueueInit(Local<Object> exports) {
queue = grpc_completion_queue_create(NULL); queue =
grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
uv_prepare_init(uv_default_loop(), &prepare); uv_prepare_init(uv_default_loop(), &prepare);
pending_batches = 0; pending_batches = 0;
} }

@ -35,8 +35,8 @@
#include "server.h" #include "server.h"
#include <node.h>
#include <nan.h> #include <nan.h>
#include <node.h>
#include "grpc/grpc.h" #include "grpc/grpc.h"
#include "grpc/support/time.h" #include "grpc/support/time.h"
@ -44,7 +44,8 @@ namespace grpc {
namespace node { namespace node {
Server::Server(grpc_server *server) : wrapped_server(server) { Server::Server(grpc_server *server) : wrapped_server(server) {
shutdown_queue = grpc_completion_queue_create(NULL); shutdown_queue = grpc_completion_queue_create(GRPC_CQ_PLUCK,
GRPC_CQ_DEFAULT_POLLING, NULL);
grpc_server_register_non_listening_completion_queue(server, shutdown_queue, grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
NULL); NULL);
} }

@ -48,7 +48,9 @@
- (instancetype)init { - (instancetype)init {
if ((self = [super init])) { if ((self = [super init])) {
_unmanagedQueue = grpc_completion_queue_create(NULL); _unmanagedQueue = grpc_completion_queue_create(GRPC_CQ_NEXT,
GRPC_CQ_DEFAULT_POLLING,
NULL);
// This is for the following block to capture the pointer by value (instead // This is for the following block to capture the pointer by value (instead
// of retaining self and doing self->_unmanagedQueue). This is essential // of retaining self and doing self->_unmanagedQueue). This is essential

@ -79,7 +79,8 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack(
gpr_join_host_port(&ffd->localaddr, "127.0.0.1", port); gpr_join_host_port(&ffd->localaddr, "127.0.0.1", port);
f.fixture_data = ffd; f.fixture_data = ffd;
f.cq = grpc_completion_queue_create(NULL); f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING,
NULL);
return f; return f;
} }

@ -160,7 +160,9 @@ unsigned int parse_h2_length(const char *field) {
int port = grpc_pick_unused_port_or_die(); int port = grpc_pick_unused_port_or_die();
char *addr; char *addr;
gpr_join_host_port(&addr, "127.0.0.1", port); gpr_join_host_port(&addr, "127.0.0.1", port);
grpc_completion_queue *cq = grpc_completion_queue_create(NULL); grpc_completion_queue *cq =
grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING,
NULL);
stream_engine *cronetEngine = [Cronet getGlobalEngine]; stream_engine *cronetEngine = [Cronet getGlobalEngine];
grpc_channel *client = grpc_channel *client =
grpc_cronet_secure_channel_create(cronetEngine, addr, NULL, NULL); grpc_cronet_secure_channel_create(cronetEngine, addr, NULL, NULL);
@ -294,7 +296,9 @@ unsigned int parse_h2_length(const char *field) {
int port = grpc_pick_unused_port_or_die(); int port = grpc_pick_unused_port_or_die();
char *addr; char *addr;
gpr_join_host_port(&addr, "127.0.0.1", port); gpr_join_host_port(&addr, "127.0.0.1", port);
grpc_completion_queue *cq = grpc_completion_queue_create(NULL); grpc_completion_queue *cq =
grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING,
NULL);
stream_engine *cronetEngine = [Cronet getGlobalEngine]; stream_engine *cronetEngine = [Cronet getGlobalEngine];
grpc_channel *client = grpc_channel *client =
grpc_cronet_secure_channel_create(cronetEngine, addr, args, NULL); grpc_cronet_secure_channel_create(cronetEngine, addr, args, NULL);

@ -40,7 +40,7 @@ cdef class CompletionQueue:
def __cinit__(self): def __cinit__(self):
grpc_init() grpc_init()
with nogil: with nogil:
self.c_completion_queue = grpc_completion_queue_create(NULL) self.c_completion_queue = grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL)
self.is_shutting_down = False self.is_shutting_down = False
self.is_shutdown = False self.is_shutdown = False

@ -309,7 +309,20 @@ cdef extern from "grpc/grpc.h":
void grpc_init() nogil void grpc_init() nogil
void grpc_shutdown() nogil void grpc_shutdown() nogil
grpc_completion_queue *grpc_completion_queue_create(void *reserved) nogil ctypedef enum grpc_cq_completion_type:
GRPC_CQ_NEXT = 1
GRPC_CQ_PLUCK = 2
ctypedef enum grpc_cq_polling_type:
GRPC_CQ_DEFAULT_POLLING
GRPC_CQ_NON_LISTENING
GRPC_CQ_NON_POLLING
grpc_completion_queue *grpc_completion_queue_create(
grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type,
void *reserved) nogil
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline, gpr_timespec deadline,
void *reserved) nogil void *reserved) nogil

@ -33,20 +33,20 @@
#include <ruby/ruby.h> #include <ruby/ruby.h>
#include "rb_grpc_imports.generated.h"
#include "rb_channel.h"
#include "rb_byte_buffer.h" #include "rb_byte_buffer.h"
#include "rb_channel.h"
#include "rb_grpc_imports.generated.h"
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/grpc_security.h> #include <grpc/grpc_security.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include "rb_grpc.h"
#include "rb_call.h" #include "rb_call.h"
#include "rb_channel_args.h" #include "rb_channel_args.h"
#include "rb_channel_credentials.h" #include "rb_channel_credentials.h"
#include "rb_completion_queue.h" #include "rb_completion_queue.h"
#include "rb_grpc.h"
#include "rb_server.h" #include "rb_server.h"
/* id_channel is the name of the hidden ivar that preserves a reference to the /* id_channel is the name of the hidden ivar that preserves a reference to the
@ -104,11 +104,13 @@ static void grpc_rb_channel_mark(void *p) {
} }
} }
static rb_data_type_t grpc_channel_data_type = { static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
"grpc_channel", {grpc_rb_channel_mark,
{grpc_rb_channel_mark, grpc_rb_channel_free, GRPC_RB_MEMSIZE_UNAVAILABLE, grpc_rb_channel_free,
GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}}, {NULL, NULL}},
NULL, NULL, NULL,
NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY #ifdef RUBY_TYPED_FREE_IMMEDIATELY
RUBY_TYPED_FREE_IMMEDIATELY RUBY_TYPED_FREE_IMMEDIATELY
#endif #endif
@ -169,7 +171,8 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
} }
rb_ivar_set(self, id_target, target); rb_ivar_set(self, id_target, target);
wrapper->wrapped = ch; wrapper->wrapped = ch;
wrapper->queue = grpc_completion_queue_create(NULL); wrapper->queue = grpc_completion_queue_create(GRPC_CQ_PLUCK,
GRPC_CQ_DEFAULT_POLLING, NULL);
return self; return self;
} }
@ -225,14 +228,11 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil; return Qnil;
} }
grpc_channel_watch_connectivity_state( grpc_channel_watch_connectivity_state(
ch, ch, (grpc_connectivity_state)NUM2LONG(last_state),
(grpc_connectivity_state)NUM2LONG(last_state), grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag);
grpc_rb_time_timeval(deadline, /* absolute time */ 0),
cq,
tag);
event = rb_completion_queue_pluck(cq, tag, event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME),
gpr_inf_future(GPR_CLOCK_REALTIME), NULL); NULL);
if (event.success) { if (event.success) {
return Qtrue; return Qtrue;
@ -243,9 +243,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
/* Create a call given a grpc_channel, in order to call method. The request /* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */ is not sent until grpc_call_invoke is called. */
static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
VALUE mask, VALUE method, VALUE method, VALUE host,
VALUE host, VALUE deadline) { VALUE deadline) {
VALUE res = Qnil; VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL; grpc_call *call = NULL;
@ -256,10 +256,11 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
grpc_slice method_slice; grpc_slice method_slice;
grpc_slice host_slice; grpc_slice host_slice;
grpc_slice *host_slice_ptr = NULL; grpc_slice *host_slice_ptr = NULL;
char* tmp_str = NULL; char *tmp_str = NULL;
if (host != Qnil) { if (host != Qnil) {
host_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host)); host_slice =
grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host));
host_slice_ptr = &host_slice; host_slice_ptr = &host_slice;
} }
if (mask != Qnil) { if (mask != Qnil) {
@ -269,7 +270,8 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
parent_call = grpc_rb_get_wrapped_call(parent); parent_call = grpc_rb_get_wrapped_call(parent);
} }
cq = grpc_completion_queue_create(NULL); cq = grpc_completion_queue_create(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
NULL);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped; ch = wrapper->wrapped;
if (ch == NULL) { if (ch == NULL) {
@ -277,17 +279,18 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
return Qnil; return Qnil;
} }
method_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method)); method_slice =
grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice, call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice,
host_slice_ptr, grpc_rb_time_timeval( host_slice_ptr,
deadline, grpc_rb_time_timeval(deadline,
/* absolute time */ 0), NULL); /* absolute time */ 0),
NULL);
if (call == NULL) { if (call == NULL) {
tmp_str = grpc_slice_to_c_string(method_slice); tmp_str = grpc_slice_to_c_string(method_slice);
rb_raise(rb_eRuntimeError, "cannot create call with method %s", rb_raise(rb_eRuntimeError, "cannot create call with method %s", tmp_str);
tmp_str);
return Qnil; return Qnil;
} }
@ -304,7 +307,6 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
return res; return res;
} }
/* Closes the channel, calling it's destroy method */ /* Closes the channel, calling it's destroy method */
static VALUE grpc_rb_channel_destroy(VALUE self) { static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
@ -320,12 +322,11 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
return Qnil; return Qnil;
} }
/* Called to obtain the target that this channel accesses. */ /* Called to obtain the target that this channel accesses. */
static VALUE grpc_rb_channel_get_target(VALUE self) { static VALUE grpc_rb_channel_get_target(VALUE self) {
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
VALUE res = Qnil; VALUE res = Qnil;
char* target = NULL; char *target = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
target = grpc_channel_get_target(wrapper->wrapped); target = grpc_channel_get_target(wrapper->wrapped);
@ -337,8 +338,8 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
static void Init_grpc_propagate_masks() { static void Init_grpc_propagate_masks() {
/* Constants representing call propagation masks in grpc.h */ /* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mPropagateMasks = rb_define_module_under( VALUE grpc_rb_mPropagateMasks =
grpc_rb_mGrpcCore, "PropagateMasks"); rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks");
rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE", rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE",
UINT2NUM(GRPC_PROPAGATE_DEADLINE)); UINT2NUM(GRPC_PROPAGATE_DEADLINE));
rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT", rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT",
@ -353,8 +354,8 @@ static void Init_grpc_propagate_masks() {
static void Init_grpc_connectivity_states() { static void Init_grpc_connectivity_states() {
/* Constants representing call propagation masks in grpc.h */ /* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mConnectivityStates = rb_define_module_under( VALUE grpc_rb_mConnectivityStates =
grpc_rb_mGrpcCore, "ConnectivityStates"); rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates");
rb_define_const(grpc_rb_mConnectivityStates, "IDLE", rb_define_const(grpc_rb_mConnectivityStates, "IDLE",
LONG2NUM(GRPC_CHANNEL_IDLE)); LONG2NUM(GRPC_CHANNEL_IDLE));
rb_define_const(grpc_rb_mConnectivityStates, "CONNECTING", rb_define_const(grpc_rb_mConnectivityStates, "CONNECTING",
@ -382,12 +383,11 @@ void Init_grpc_channel() {
/* Add ruby analogues of the Channel methods. */ /* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "connectivity_state", rb_define_method(grpc_rb_cChannel, "connectivity_state",
grpc_rb_channel_get_connectivity_state, grpc_rb_channel_get_connectivity_state, -1);
-1);
rb_define_method(grpc_rb_cChannel, "watch_connectivity_state", rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
grpc_rb_channel_watch_connectivity_state, 4); grpc_rb_channel_watch_connectivity_state, 4);
rb_define_method(grpc_rb_cChannel, "create_call", rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call,
grpc_rb_channel_create_call, 5); 5);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0); rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy"); rb_define_alias(grpc_rb_cChannel, "close", "destroy");

@ -37,15 +37,15 @@
#include "rb_server.h" #include "rb_server.h"
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/atm.h>
#include <grpc/grpc_security.h> #include <grpc/grpc_security.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "rb_byte_buffer.h"
#include "rb_call.h" #include "rb_call.h"
#include "rb_channel_args.h" #include "rb_channel_args.h"
#include "rb_completion_queue.h" #include "rb_completion_queue.h"
#include "rb_server_credentials.h"
#include "rb_byte_buffer.h"
#include "rb_grpc.h" #include "rb_grpc.h"
#include "rb_server_credentials.h"
/* grpc_rb_cServer is the ruby class that proxies grpc_server. */ /* grpc_rb_cServer is the ruby class that proxies grpc_server. */
static VALUE grpc_rb_cServer = Qnil; static VALUE grpc_rb_cServer = Qnil;
@ -93,8 +93,7 @@ static void grpc_rb_server_free(void *p) {
}; };
svr = (grpc_rb_server *)p; svr = (grpc_rb_server *)p;
deadline = gpr_time_add( deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(2, GPR_TIMESPAN)); gpr_time_from_seconds(2, GPR_TIMESPAN));
destroy_server(svr, deadline); destroy_server(svr, deadline);
@ -104,13 +103,15 @@ static void grpc_rb_server_free(void *p) {
static const rb_data_type_t grpc_rb_server_data_type = { static const rb_data_type_t grpc_rb_server_data_type = {
"grpc_server", "grpc_server",
{GRPC_RB_GC_NOT_MARKED, grpc_rb_server_free, GRPC_RB_MEMSIZE_UNAVAILABLE, {GRPC_RB_GC_NOT_MARKED,
grpc_rb_server_free,
GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}}, {NULL, NULL}},
NULL, NULL,
NULL, NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY #ifdef RUBY_TYPED_FREE_IMMEDIATELY
/* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free function would block /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free
* and we might want to unlock GVL * function would block and we might want to unlock GVL
* TODO(yugui) Unlock GVL? * TODO(yugui) Unlock GVL?
*/ */
0, 0,
@ -131,7 +132,8 @@ static VALUE grpc_rb_server_alloc(VALUE cls) {
Initializes server instances. */ Initializes server instances. */
static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) { static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
grpc_completion_queue *cq = grpc_completion_queue_create(NULL); grpc_completion_queue *cq = grpc_completion_queue_create(
GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, NULL);
grpc_rb_server *wrapper = NULL; grpc_rb_server *wrapper = NULL;
grpc_server *srv = NULL; grpc_server *srv = NULL;
grpc_channel_args args; grpc_channel_args args;
@ -163,7 +165,7 @@ typedef struct request_call_stack {
/* grpc_request_call_stack_init ensures the request_call_stack is properly /* grpc_request_call_stack_init ensures the request_call_stack is properly
* initialized */ * initialized */
static void grpc_request_call_stack_init(request_call_stack* st) { static void grpc_request_call_stack_init(request_call_stack *st) {
MEMZERO(st, request_call_stack, 1); MEMZERO(st, request_call_stack, 1);
grpc_metadata_array_init(&st->md_ary); grpc_metadata_array_init(&st->md_ary);
grpc_call_details_init(&st->details); grpc_call_details_init(&st->details);
@ -171,7 +173,7 @@ static void grpc_request_call_stack_init(request_call_stack* st) {
/* grpc_request_call_stack_cleanup ensures the request_call_stack is properly /* grpc_request_call_stack_cleanup ensures the request_call_stack is properly
* cleaned up */ * cleaned up */
static void grpc_request_call_stack_cleanup(request_call_stack* st) { static void grpc_request_call_stack_cleanup(request_call_stack *st) {
grpc_metadata_array_destroy(&st->md_ary); grpc_metadata_array_destroy(&st->md_ary);
grpc_call_details_destroy(&st->details); grpc_call_details_destroy(&st->details);
} }
@ -187,8 +189,9 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_call_error err; grpc_call_error err;
request_call_stack st; request_call_stack st;
VALUE result; VALUE result;
void *tag = (void*)&st; void *tag = (void *)&st;
grpc_completion_queue *call_queue = grpc_completion_queue_create(NULL); grpc_completion_queue *call_queue = grpc_completion_queue_create(
GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, NULL);
gpr_timespec deadline; gpr_timespec deadline;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
@ -199,8 +202,7 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_request_call_stack_init(&st); grpc_request_call_stack_init(&st);
/* call grpc_server_request_call, then wait for it to complete using /* call grpc_server_request_call, then wait for it to complete using
* pluck_event */ * pluck_event */
err = grpc_server_request_call( err = grpc_server_request_call(s->wrapped, &call, &st.details, &st.md_ary,
s->wrapped, &call, &st.details, &st.md_ary,
call_queue, s->queue, tag); call_queue, s->queue, tag);
if (err != GRPC_CALL_OK) { if (err != GRPC_CALL_OK) {
grpc_request_call_stack_cleanup(&st); grpc_request_call_stack_cleanup(&st);
@ -218,8 +220,6 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
return Qnil; return Qnil;
} }
/* build the NewServerRpc struct result */ /* build the NewServerRpc struct result */
deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME); deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
result = rb_struct_new( result = rb_struct_new(
@ -299,8 +299,7 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
return Qnil; return Qnil;
} else if (TYPE(rb_creds) == T_SYMBOL) { } else if (TYPE(rb_creds) == T_SYMBOL) {
if (id_insecure_server != SYM2ID(rb_creds)) { if (id_insecure_server != SYM2ID(rb_creds)) {
rb_raise(rb_eTypeError, rb_raise(rb_eTypeError, "bad creds symbol, want :this_port_is_insecure");
"bad creds symbol, want :this_port_is_insecure");
return Qnil; return Qnil;
} }
recvd_port = recvd_port =
@ -312,9 +311,8 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
} }
} else { } else {
creds = grpc_rb_get_wrapped_server_credentials(rb_creds); creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
recvd_port = recvd_port = grpc_server_add_secure_http2_port(
grpc_server_add_secure_http2_port(s->wrapped, StringValueCStr(port), s->wrapped, StringValueCStr(port), creds);
creds);
if (recvd_port == 0) { if (recvd_port == 0) {
rb_raise(rb_eRuntimeError, rb_raise(rb_eRuntimeError,
"could not add secure port %s to server, not sure why", "could not add secure port %s to server, not sure why",
@ -333,18 +331,17 @@ void Init_grpc_server() {
/* Provides a ruby constructor and support for dup/clone. */ /* Provides a ruby constructor and support for dup/clone. */
rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1); rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
rb_define_method(grpc_rb_cServer, "initialize_copy", rb_define_method(grpc_rb_cServer, "initialize_copy", grpc_rb_cannot_init_copy,
grpc_rb_cannot_init_copy, 1); 1);
/* Add the server methods. */ /* Add the server methods. */
rb_define_method(grpc_rb_cServer, "request_call", rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
grpc_rb_server_request_call, 0); 0);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0); rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1); rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
rb_define_alias(grpc_rb_cServer, "close", "destroy"); rb_define_alias(grpc_rb_cServer, "close", "destroy");
rb_define_method(grpc_rb_cServer, "add_http2_port", rb_define_method(grpc_rb_cServer, "add_http2_port",
grpc_rb_server_add_http2_port, grpc_rb_server_add_http2_port, 2);
2);
id_at = rb_intern("at"); id_at = rb_intern("at");
id_insecure_server = rb_intern("this_port_is_insecure"); id_insecure_server = rb_intern("this_port_is_insecure");
} }

Loading…
Cancel
Save