add in background connectivity state poller

pull/9986/head
Alexander Polcyn 8 years ago
parent e57cd90c11
commit 9f4986603c
  1. 219
      src/ruby/ext/grpc/rb_channel.c
  2. 93
      src/ruby/spec/channel_connection_spec.rb

@ -32,21 +32,21 @@
*/
#include <ruby/ruby.h>
#include <ruby/thread.h>
#include "rb_grpc_imports.generated.h"
#include "rb_channel.h"
#include "rb_byte_buffer.h"
#include "rb_channel.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "rb_grpc.h"
#include "rb_call.h"
#include "rb_channel_args.h"
#include "rb_channel_credentials.h"
#include "rb_completion_queue.h"
#include "rb_grpc.h"
#include "rb_server.h"
/* id_channel is the name of the hidden ivar that preserves a reference to the
@ -74,8 +74,22 @@ typedef struct grpc_rb_channel {
/* The actual channel */
grpc_channel *wrapped;
grpc_completion_queue *queue;
int request_safe_destroy;
int safe_to_destroy;
gpr_mu safe_destroy_mu;
gpr_cv safe_destroy_cv;
} grpc_rb_channel;
/* Forward declarations of functions involved in temporary fix to
* https://github.com/grpc/grpc/issues/9941 */
static void grpc_rb_channel_try_register_connection_polling(
grpc_rb_channel *wrapper);
static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
static grpc_completion_queue *channel_polling_cq;
static gpr_mu channel_polling_mu;
static int abort_channel_polling = 0;
/* Destroys Channel instances. */
static void grpc_rb_channel_free(void *p) {
grpc_rb_channel *ch = NULL;
@ -85,8 +99,9 @@ static void grpc_rb_channel_free(void *p) {
ch = (grpc_rb_channel *)p;
if (ch->wrapped != NULL) {
grpc_channel_destroy(ch->wrapped);
grpc_rb_channel_safe_destroy(ch);
grpc_rb_completion_queue_destroy(ch->queue);
ch->wrapped = NULL;
}
xfree(p);
@ -104,13 +119,15 @@ static void grpc_rb_channel_mark(void *p) {
}
}
static rb_data_type_t grpc_channel_data_type = {
"grpc_channel",
{grpc_rb_channel_mark, grpc_rb_channel_free, GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}},
NULL, NULL,
static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
{grpc_rb_channel_mark,
grpc_rb_channel_free,
GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}},
NULL,
NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
RUBY_TYPED_FREE_IMMEDIATELY
RUBY_TYPED_FREE_IMMEDIATELY
#endif
};
@ -159,6 +176,18 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
creds = grpc_rb_get_wrapped_channel_credentials(credentials);
ch = grpc_secure_channel_create(creds, target_chars, &args, NULL);
}
GPR_ASSERT(ch);
wrapper->wrapped = ch;
gpr_mu_init(&wrapper->safe_destroy_mu);
gpr_cv_init(&wrapper->safe_destroy_cv);
gpr_mu_lock(&wrapper->safe_destroy_mu);
wrapper->safe_to_destroy = 0;
wrapper->request_safe_destroy = 0;
gpr_mu_unlock(&wrapper->safe_destroy_mu);
grpc_rb_channel_try_register_connection_polling(wrapper);
if (args.args != NULL) {
xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
}
@ -191,7 +220,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
/* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
rb_scan_args(argc, argv, "01", &try_to_connect_param);
grpc_try_to_connect = try_to_connect_param == Qtrue? 1 : 0;
grpc_try_to_connect = try_to_connect_param == Qtrue ? 1 : 0;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
@ -229,14 +258,11 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil;
}
grpc_channel_watch_connectivity_state(
ch,
(grpc_connectivity_state)NUM2LONG(last_state),
grpc_rb_time_timeval(deadline, /* absolute time */ 0),
cq,
tag);
ch, (grpc_connectivity_state)NUM2LONG(last_state),
grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag);
event = rb_completion_queue_pluck(cq, tag,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME),
NULL);
if (event.success) {
return Qtrue;
@ -247,9 +273,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
/* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */
static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
VALUE mask, VALUE method,
VALUE host, VALUE deadline) {
static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
VALUE method, VALUE host,
VALUE deadline) {
VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL;
@ -260,10 +286,11 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
grpc_slice method_slice;
grpc_slice host_slice;
grpc_slice *host_slice_ptr = NULL;
char* tmp_str = NULL;
char *tmp_str = NULL;
if (host != Qnil) {
host_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host));
host_slice =
grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host));
host_slice_ptr = &host_slice;
}
if (mask != Qnil) {
@ -281,17 +308,18 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
return Qnil;
}
method_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
method_slice =
grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice,
host_slice_ptr, grpc_rb_time_timeval(
deadline,
/* absolute time */ 0), NULL);
host_slice_ptr,
grpc_rb_time_timeval(deadline,
/* absolute time */ 0),
NULL);
if (call == NULL) {
tmp_str = grpc_slice_to_c_string(method_slice);
rb_raise(rb_eRuntimeError, "cannot create call with method %s",
tmp_str);
rb_raise(rb_eRuntimeError, "cannot create call with method %s", tmp_str);
return Qnil;
}
@ -308,7 +336,6 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
return res;
}
/* Closes the channel, calling it's destroy method */
static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL;
@ -317,19 +344,20 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch != NULL) {
grpc_channel_destroy(ch);
grpc_rb_channel_safe_destroy(wrapper);
GPR_ASSERT(wrapper->queue != NULL);
grpc_rb_completion_queue_destroy(wrapper->queue);
wrapper->wrapped = NULL;
}
return Qnil;
}
/* Called to obtain the target that this channel accesses. */
static VALUE grpc_rb_channel_get_target(VALUE self) {
grpc_rb_channel *wrapper = NULL;
VALUE res = Qnil;
char* target = NULL;
char *target = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
target = grpc_channel_get_target(wrapper->wrapped);
@ -339,10 +367,119 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
return res;
}
// Either start polling channel connection state or signal that it's free to
// destroy.
// Not safe to call while a channel's connection state is polled.
static void grpc_rb_channel_try_register_connection_polling(
grpc_rb_channel *wrapper) {
grpc_connectivity_state conn_state;
gpr_timespec sleep_time = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
GPR_ASSERT(wrapper);
GPR_ASSERT(wrapper->wrapped);
gpr_mu_lock(&wrapper->safe_destroy_mu);
if (wrapper->request_safe_destroy) {
wrapper->safe_to_destroy = 1;
gpr_cv_signal(&wrapper->safe_destroy_cv);
gpr_mu_unlock(&wrapper->safe_destroy_mu);
return;
}
gpr_mu_lock(&channel_polling_mu);
conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
// avoid posting work to the channel polling cq if it's been shutdown
if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_channel_watch_connectivity_state(
wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper);
} else {
wrapper->safe_to_destroy = 1;
gpr_cv_signal(&wrapper->safe_destroy_cv);
}
gpr_mu_unlock(&channel_polling_mu);
gpr_mu_unlock(&wrapper->safe_destroy_mu);
}
// Note requires wrapper->wrapped, wrapper->safe_destroy_mu/cv initialized
static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
gpr_mu_lock(&wrapper->safe_destroy_mu);
if (!wrapper->safe_to_destroy) {
wrapper->request_safe_destroy = 1;
gpr_cv_wait(&wrapper->safe_destroy_cv, &wrapper->safe_destroy_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
GPR_ASSERT(wrapper->safe_to_destroy);
gpr_mu_unlock(&wrapper->safe_destroy_mu);
gpr_mu_destroy(&wrapper->safe_destroy_mu);
gpr_cv_destroy(&wrapper->safe_destroy_cv);
grpc_channel_destroy(wrapper->wrapped);
}
// Note this loop breaks out when a single call of
// "grpc_rb_event_unblocking_func".
// TODO (apolcyn) does a ruby call to the unblocking func
// necesarily mean process shutdown?
// In the worst case, this stops polling channel connectivity
// early and falls back to current behavior.
static void *run_poll_channels_loop_no_gil(void *arg) {
grpc_event event;
grpc_rb_channel *wrapper;
(void)arg;
for (;;) {
event = grpc_completion_queue_next(
channel_polling_cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
if (event.type == GRPC_QUEUE_SHUTDOWN) {
// TODO (apolcyn) is it guaranteed that this cq is empty by now?
break;
}
if (event.type == GRPC_OP_COMPLETE) {
wrapper = (grpc_rb_channel *)event.tag;
grpc_rb_channel_try_register_connection_polling(wrapper);
}
}
grpc_completion_queue_destroy(channel_polling_cq);
return NULL;
}
// Notify the channel polling loop to cleanup and shutdown.
static void grpc_rb_event_unblocking_func(void *arg) {
(void)arg;
gpr_mu_lock(&channel_polling_mu);
abort_channel_polling = 1;
grpc_completion_queue_shutdown(channel_polling_cq);
gpr_mu_unlock(&channel_polling_mu);
}
// Poll channel connectivity states in background thread without the GIL.
static VALUE run_poll_channels_loop(VALUE arg) {
(void)arg;
rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
grpc_rb_event_unblocking_func, NULL);
return Qnil;
}
/* Temporary fix for
* https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/899.
* Transports in idle channels can get destroyed. Normally c-core re-connects,
* but in grpc-ruby core never gets a thread until an RPC is made, because ruby
* only calls c-core's "completion_queu_pluck" API.
* This uses a global background thread that calls
* "completion_queue_next" on registered "watch_channel_connectivity_state"
* calls - so that c-core can reconnect if needed, when there aren't any RPC's.
* TODO(apolcyn) remove this when core handles new RPCs on dead connections.
*/
static void start_poll_channels_loop() {
channel_polling_cq = grpc_completion_queue_create(NULL);
gpr_mu_init(&channel_polling_mu);
abort_channel_polling = 0;
rb_thread_create(run_poll_channels_loop, NULL);
}
static void Init_grpc_propagate_masks() {
/* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mPropagateMasks = rb_define_module_under(
grpc_rb_mGrpcCore, "PropagateMasks");
VALUE grpc_rb_mPropagateMasks =
rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks");
rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE",
UINT2NUM(GRPC_PROPAGATE_DEADLINE));
rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT",
@ -357,8 +494,8 @@ static void Init_grpc_propagate_masks() {
static void Init_grpc_connectivity_states() {
/* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mConnectivityStates = rb_define_module_under(
grpc_rb_mGrpcCore, "ConnectivityStates");
VALUE grpc_rb_mConnectivityStates =
rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates");
rb_define_const(grpc_rb_mConnectivityStates, "IDLE",
LONG2NUM(GRPC_CHANNEL_IDLE));
rb_define_const(grpc_rb_mConnectivityStates, "CONNECTING",
@ -386,12 +523,11 @@ void Init_grpc_channel() {
/* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "connectivity_state",
grpc_rb_channel_get_connectivity_state,
-1);
grpc_rb_channel_get_connectivity_state, -1);
rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
grpc_rb_channel_watch_connectivity_state, 4);
rb_define_method(grpc_rb_cChannel, "create_call",
grpc_rb_channel_create_call, 5);
rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call,
5);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy");
@ -409,6 +545,7 @@ void Init_grpc_channel() {
id_insecure_channel = rb_intern("this_channel_is_insecure");
Init_grpc_propagate_masks();
Init_grpc_connectivity_states();
start_poll_channels_loop();
}
/* Gets the wrapped channel from the ruby wrapper */

@ -0,0 +1,93 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
# A test message
class EchoMsg
def self.marshal(_o)
''
end
def self.unmarshal(_o)
EchoMsg.new
end
end
# A test service with an echo implementation.
class EchoService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
attr_reader :received_md
def initialize(**kw)
@trailing_metadata = kw
@received_md = []
end
def an_rpc(req, call)
GRPC.logger.info('echo service received a request')
call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
end
end
EchoStub = EchoService.rpc_stub_class
def start_server(port = 0)
@srv = GRPC::RpcServer.new
server_port = @srv.add_http2_port("0.0.0.0:#{port}", :this_port_is_insecure)
@srv.handle(EchoService)
@server_thd = Thread.new { @srv.run }
@srv.wait_till_running
server_port
end
def stop_server
expect(@srv.stopped?).to be(false)
@srv.stop
@server_thd.join
expect(@srv.stopped?).to be(true)
end
describe 'channel connection behavior' do
it 'the client channel handles temporary loss of a transport' do
port = start_server
stub = EchoStub.new("localhost:#{port}", :this_channel_is_insecure)
req = EchoMsg.new
expect(stub.an_rpc(req)).to be_a(EchoMsg)
stop_server
expect { stub.an_rpc(req) }.to raise_error(GRPC::Unavailable)
# TODO(apolcyn) grabbing the same port might fail, is this stable enough?
start_server(port)
expect(stub.an_rpc(req)).to be_a(EchoMsg)
stop_server
end
end
Loading…
Cancel
Save