From e57cd90c11285c8643bbfd2fb778093129806ac9 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 2 Mar 2017 16:13:46 -0800 Subject: [PATCH 01/25] fix channel connectivity state function --- src/ruby/ext/grpc/rb_channel.c | 20 ++++++++++++-------- src/ruby/spec/channel_spec.rb | 29 +++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 84e43d3f7bf..a284852500a 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -175,19 +175,23 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { /* call-seq: - insecure_channel = Channel:new("myhost:8080", {'arg1': 'value1'}) - creds = ... - secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds) + ch.connectivity_state -> state + ch.connectivity_state(true) -> state - Creates channel instances. */ + Indicates the current state of the channel, whose value is one of the + constants defined in GRPC::Core::ConnectivityStates. + + It also tries to connect if the chennel is idle in the second form. */ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, VALUE self) { - VALUE try_to_connect = Qfalse; + VALUE try_to_connect_param = Qfalse; + int grpc_try_to_connect = 0; grpc_rb_channel *wrapper = NULL; grpc_channel *ch = NULL; /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */ - rb_scan_args(argc, argv, "01", try_to_connect); + rb_scan_args(argc, argv, "01", &try_to_connect_param); + grpc_try_to_connect = try_to_connect_param == Qtrue? 1 : 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; @@ -195,8 +199,8 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - return NUM2LONG( - grpc_channel_check_connectivity_state(ch, (int)try_to_connect)); + return LONG2NUM( + grpc_channel_check_connectivity_state(ch, grpc_try_to_connect)); } /* Watch for a change in connectivity state. diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb index 740eac631a3..a289a00f043 100644 --- a/src/ruby/spec/channel_spec.rb +++ b/src/ruby/spec/channel_spec.rb @@ -153,6 +153,35 @@ describe GRPC::Core::Channel do end end + describe '#connectivity_state' do + it 'returns an enum' do + ch = GRPC::Core::Channel.new(fake_host, nil, :this_channel_is_insecure) + valid_states = [ + GRPC::Core::ConnectivityStates::IDLE, + GRPC::Core::ConnectivityStates::CONNECTING, + GRPC::Core::ConnectivityStates::READY, + GRPC::Core::ConnectivityStates::TRANSIENT_FAILURE, + GRPC::Core::ConnectivityStates::FATAL_FAILURE + ] + + expect(valid_states).to include(ch.connectivity_state) + end + + it 'returns an enum when trying to connect' do + ch = GRPC::Core::Channel.new(fake_host, nil, :this_channel_is_insecure) + ch.connectivity_state(true) + valid_states = [ + GRPC::Core::ConnectivityStates::IDLE, + GRPC::Core::ConnectivityStates::CONNECTING, + GRPC::Core::ConnectivityStates::READY, + GRPC::Core::ConnectivityStates::TRANSIENT_FAILURE, + GRPC::Core::ConnectivityStates::FATAL_FAILURE + ] + + expect(valid_states).to include(ch.connectivity_state) + end + end + describe '::SSL_TARGET' do it 'is a symbol' do expect(GRPC::Core::Channel::SSL_TARGET).to be_a(Symbol) From 9f4986603caec7ff569158ed07d9c201dbc00ccb Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 2 Mar 2017 19:20:07 -0800 Subject: [PATCH 02/25] add in background connectivity state poller --- src/ruby/ext/grpc/rb_channel.c | 219 ++++++++++++++++++----- src/ruby/spec/channel_connection_spec.rb | 93 ++++++++++ 2 files changed, 271 insertions(+), 41 deletions(-) create mode 100644 src/ruby/spec/channel_connection_spec.rb diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index a284852500a..ccdca15c988 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -32,21 +32,21 @@ */ #include +#include -#include "rb_grpc_imports.generated.h" -#include "rb_channel.h" #include "rb_byte_buffer.h" +#include "rb_channel.h" #include #include #include #include #include -#include "rb_grpc.h" #include "rb_call.h" #include "rb_channel_args.h" #include "rb_channel_credentials.h" #include "rb_completion_queue.h" +#include "rb_grpc.h" #include "rb_server.h" /* id_channel is the name of the hidden ivar that preserves a reference to the @@ -74,8 +74,22 @@ typedef struct grpc_rb_channel { /* The actual channel */ grpc_channel *wrapped; grpc_completion_queue *queue; + int request_safe_destroy; + int safe_to_destroy; + gpr_mu safe_destroy_mu; + gpr_cv safe_destroy_cv; } grpc_rb_channel; +/* Forward declarations of functions involved in temporary fix to + * https://github.com/grpc/grpc/issues/9941 */ +static void grpc_rb_channel_try_register_connection_polling( + grpc_rb_channel *wrapper); +static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); + +static grpc_completion_queue *channel_polling_cq; +static gpr_mu channel_polling_mu; +static int abort_channel_polling = 0; + /* Destroys Channel instances. */ static void grpc_rb_channel_free(void *p) { grpc_rb_channel *ch = NULL; @@ -85,8 +99,9 @@ static void grpc_rb_channel_free(void *p) { ch = (grpc_rb_channel *)p; if (ch->wrapped != NULL) { - grpc_channel_destroy(ch->wrapped); + grpc_rb_channel_safe_destroy(ch); grpc_rb_completion_queue_destroy(ch->queue); + ch->wrapped = NULL; } xfree(p); @@ -104,13 +119,15 @@ static void grpc_rb_channel_mark(void *p) { } } -static rb_data_type_t grpc_channel_data_type = { - "grpc_channel", - {grpc_rb_channel_mark, grpc_rb_channel_free, GRPC_RB_MEMSIZE_UNAVAILABLE, - {NULL, NULL}}, - NULL, NULL, +static rb_data_type_t grpc_channel_data_type = {"grpc_channel", + {grpc_rb_channel_mark, + grpc_rb_channel_free, + GRPC_RB_MEMSIZE_UNAVAILABLE, + {NULL, NULL}}, + NULL, + NULL, #ifdef RUBY_TYPED_FREE_IMMEDIATELY - RUBY_TYPED_FREE_IMMEDIATELY + RUBY_TYPED_FREE_IMMEDIATELY #endif }; @@ -159,6 +176,18 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { creds = grpc_rb_get_wrapped_channel_credentials(credentials); ch = grpc_secure_channel_create(creds, target_chars, &args, NULL); } + + GPR_ASSERT(ch); + + wrapper->wrapped = ch; + gpr_mu_init(&wrapper->safe_destroy_mu); + gpr_cv_init(&wrapper->safe_destroy_cv); + gpr_mu_lock(&wrapper->safe_destroy_mu); + wrapper->safe_to_destroy = 0; + wrapper->request_safe_destroy = 0; + gpr_mu_unlock(&wrapper->safe_destroy_mu); + grpc_rb_channel_try_register_connection_polling(wrapper); + if (args.args != NULL) { xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */ } @@ -191,7 +220,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */ rb_scan_args(argc, argv, "01", &try_to_connect_param); - grpc_try_to_connect = try_to_connect_param == Qtrue? 1 : 0; + grpc_try_to_connect = try_to_connect_param == Qtrue ? 1 : 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; @@ -229,14 +258,11 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, return Qnil; } grpc_channel_watch_connectivity_state( - ch, - (grpc_connectivity_state)NUM2LONG(last_state), - grpc_rb_time_timeval(deadline, /* absolute time */ 0), - cq, - tag); + ch, (grpc_connectivity_state)NUM2LONG(last_state), + grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag); - event = rb_completion_queue_pluck(cq, tag, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME), + NULL); if (event.success) { return Qtrue; @@ -247,9 +273,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, /* Create a call given a grpc_channel, in order to call method. The request is not sent until grpc_call_invoke is called. */ -static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, - VALUE mask, VALUE method, - VALUE host, VALUE deadline) { +static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, + VALUE method, VALUE host, + VALUE deadline) { VALUE res = Qnil; grpc_rb_channel *wrapper = NULL; grpc_call *call = NULL; @@ -260,10 +286,11 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, grpc_slice method_slice; grpc_slice host_slice; grpc_slice *host_slice_ptr = NULL; - char* tmp_str = NULL; + char *tmp_str = NULL; if (host != Qnil) { - host_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host)); + host_slice = + grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host)); host_slice_ptr = &host_slice; } if (mask != Qnil) { @@ -281,17 +308,18 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, return Qnil; } - method_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method)); + method_slice = + grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method)); call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice, - host_slice_ptr, grpc_rb_time_timeval( - deadline, - /* absolute time */ 0), NULL); + host_slice_ptr, + grpc_rb_time_timeval(deadline, + /* absolute time */ 0), + NULL); if (call == NULL) { tmp_str = grpc_slice_to_c_string(method_slice); - rb_raise(rb_eRuntimeError, "cannot create call with method %s", - tmp_str); + rb_raise(rb_eRuntimeError, "cannot create call with method %s", tmp_str); return Qnil; } @@ -308,7 +336,6 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, return res; } - /* Closes the channel, calling it's destroy method */ static VALUE grpc_rb_channel_destroy(VALUE self) { grpc_rb_channel *wrapper = NULL; @@ -317,19 +344,20 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; if (ch != NULL) { - grpc_channel_destroy(ch); + grpc_rb_channel_safe_destroy(wrapper); + GPR_ASSERT(wrapper->queue != NULL); + grpc_rb_completion_queue_destroy(wrapper->queue); wrapper->wrapped = NULL; } return Qnil; } - /* Called to obtain the target that this channel accesses. */ static VALUE grpc_rb_channel_get_target(VALUE self) { grpc_rb_channel *wrapper = NULL; VALUE res = Qnil; - char* target = NULL; + char *target = NULL; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); target = grpc_channel_get_target(wrapper->wrapped); @@ -339,10 +367,119 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { return res; } +// Either start polling channel connection state or signal that it's free to +// destroy. +// Not safe to call while a channel's connection state is polled. +static void grpc_rb_channel_try_register_connection_polling( + grpc_rb_channel *wrapper) { + grpc_connectivity_state conn_state; + gpr_timespec sleep_time = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); + + GPR_ASSERT(wrapper); + GPR_ASSERT(wrapper->wrapped); + gpr_mu_lock(&wrapper->safe_destroy_mu); + if (wrapper->request_safe_destroy) { + wrapper->safe_to_destroy = 1; + gpr_cv_signal(&wrapper->safe_destroy_cv); + gpr_mu_unlock(&wrapper->safe_destroy_mu); + return; + } + gpr_mu_lock(&channel_polling_mu); + conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + // avoid posting work to the channel polling cq if it's been shutdown + if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) { + grpc_channel_watch_connectivity_state( + wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper); + } else { + wrapper->safe_to_destroy = 1; + gpr_cv_signal(&wrapper->safe_destroy_cv); + } + gpr_mu_unlock(&channel_polling_mu); + gpr_mu_unlock(&wrapper->safe_destroy_mu); +} + +// Note requires wrapper->wrapped, wrapper->safe_destroy_mu/cv initialized +static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { + gpr_mu_lock(&wrapper->safe_destroy_mu); + if (!wrapper->safe_to_destroy) { + wrapper->request_safe_destroy = 1; + gpr_cv_wait(&wrapper->safe_destroy_cv, &wrapper->safe_destroy_mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + GPR_ASSERT(wrapper->safe_to_destroy); + gpr_mu_unlock(&wrapper->safe_destroy_mu); + + gpr_mu_destroy(&wrapper->safe_destroy_mu); + gpr_cv_destroy(&wrapper->safe_destroy_cv); + + grpc_channel_destroy(wrapper->wrapped); +} + +// Note this loop breaks out when a single call of +// "grpc_rb_event_unblocking_func". +// TODO (apolcyn) does a ruby call to the unblocking func +// necesarily mean process shutdown? +// In the worst case, this stops polling channel connectivity +// early and falls back to current behavior. +static void *run_poll_channels_loop_no_gil(void *arg) { + grpc_event event; + grpc_rb_channel *wrapper; + (void)arg; + for (;;) { + event = grpc_completion_queue_next( + channel_polling_cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + if (event.type == GRPC_QUEUE_SHUTDOWN) { + // TODO (apolcyn) is it guaranteed that this cq is empty by now? + break; + } + if (event.type == GRPC_OP_COMPLETE) { + wrapper = (grpc_rb_channel *)event.tag; + grpc_rb_channel_try_register_connection_polling(wrapper); + } + } + grpc_completion_queue_destroy(channel_polling_cq); + return NULL; +} + +// Notify the channel polling loop to cleanup and shutdown. +static void grpc_rb_event_unblocking_func(void *arg) { + (void)arg; + gpr_mu_lock(&channel_polling_mu); + abort_channel_polling = 1; + grpc_completion_queue_shutdown(channel_polling_cq); + gpr_mu_unlock(&channel_polling_mu); +} + +// Poll channel connectivity states in background thread without the GIL. +static VALUE run_poll_channels_loop(VALUE arg) { + (void)arg; + rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, + grpc_rb_event_unblocking_func, NULL); + return Qnil; +} + +/* Temporary fix for + * https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/899. + * Transports in idle channels can get destroyed. Normally c-core re-connects, + * but in grpc-ruby core never gets a thread until an RPC is made, because ruby + * only calls c-core's "completion_queu_pluck" API. + * This uses a global background thread that calls + * "completion_queue_next" on registered "watch_channel_connectivity_state" + * calls - so that c-core can reconnect if needed, when there aren't any RPC's. + * TODO(apolcyn) remove this when core handles new RPCs on dead connections. + */ +static void start_poll_channels_loop() { + channel_polling_cq = grpc_completion_queue_create(NULL); + gpr_mu_init(&channel_polling_mu); + abort_channel_polling = 0; + rb_thread_create(run_poll_channels_loop, NULL); +} + static void Init_grpc_propagate_masks() { /* Constants representing call propagation masks in grpc.h */ - VALUE grpc_rb_mPropagateMasks = rb_define_module_under( - grpc_rb_mGrpcCore, "PropagateMasks"); + VALUE grpc_rb_mPropagateMasks = + rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks"); rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE", UINT2NUM(GRPC_PROPAGATE_DEADLINE)); rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT", @@ -357,8 +494,8 @@ static void Init_grpc_propagate_masks() { static void Init_grpc_connectivity_states() { /* Constants representing call propagation masks in grpc.h */ - VALUE grpc_rb_mConnectivityStates = rb_define_module_under( - grpc_rb_mGrpcCore, "ConnectivityStates"); + VALUE grpc_rb_mConnectivityStates = + rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates"); rb_define_const(grpc_rb_mConnectivityStates, "IDLE", LONG2NUM(GRPC_CHANNEL_IDLE)); rb_define_const(grpc_rb_mConnectivityStates, "CONNECTING", @@ -386,12 +523,11 @@ void Init_grpc_channel() { /* Add ruby analogues of the Channel methods. */ rb_define_method(grpc_rb_cChannel, "connectivity_state", - grpc_rb_channel_get_connectivity_state, - -1); + grpc_rb_channel_get_connectivity_state, -1); rb_define_method(grpc_rb_cChannel, "watch_connectivity_state", grpc_rb_channel_watch_connectivity_state, 4); - rb_define_method(grpc_rb_cChannel, "create_call", - grpc_rb_channel_create_call, 5); + rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call, + 5); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0); rb_define_alias(grpc_rb_cChannel, "close", "destroy"); @@ -409,6 +545,7 @@ void Init_grpc_channel() { id_insecure_channel = rb_intern("this_channel_is_insecure"); Init_grpc_propagate_masks(); Init_grpc_connectivity_states(); + start_poll_channels_loop(); } /* Gets the wrapped channel from the ruby wrapper */ diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb new file mode 100644 index 00000000000..58ab37d7bc7 --- /dev/null +++ b/src/ruby/spec/channel_connection_spec.rb @@ -0,0 +1,93 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'grpc' + +# A test message +class EchoMsg + def self.marshal(_o) + '' + end + + def self.unmarshal(_o) + EchoMsg.new + end +end + +# A test service with an echo implementation. +class EchoService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + attr_reader :received_md + + def initialize(**kw) + @trailing_metadata = kw + @received_md = [] + end + + def an_rpc(req, call) + GRPC.logger.info('echo service received a request') + call.output_metadata.update(@trailing_metadata) + @received_md << call.metadata unless call.metadata.nil? + req + end +end + +EchoStub = EchoService.rpc_stub_class + +def start_server(port = 0) + @srv = GRPC::RpcServer.new + server_port = @srv.add_http2_port("0.0.0.0:#{port}", :this_port_is_insecure) + @srv.handle(EchoService) + @server_thd = Thread.new { @srv.run } + @srv.wait_till_running + server_port +end + +def stop_server + expect(@srv.stopped?).to be(false) + @srv.stop + @server_thd.join + expect(@srv.stopped?).to be(true) +end + +describe 'channel connection behavior' do + it 'the client channel handles temporary loss of a transport' do + port = start_server + stub = EchoStub.new("localhost:#{port}", :this_channel_is_insecure) + req = EchoMsg.new + expect(stub.an_rpc(req)).to be_a(EchoMsg) + stop_server + expect { stub.an_rpc(req) }.to raise_error(GRPC::Unavailable) + # TODO(apolcyn) grabbing the same port might fail, is this stable enough? + start_server(port) + expect(stub.an_rpc(req)).to be_a(EchoMsg) + stop_server + end +end From 89d62d30612433770f85ba4fafb348e0cb990d8a Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 9 Mar 2017 13:28:03 -0800 Subject: [PATCH 03/25] debug print in docker file --- third_party/rake-compiler-dock/Dockerfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/third_party/rake-compiler-dock/Dockerfile b/third_party/rake-compiler-dock/Dockerfile index 475c6a8d3db..ad902dfe660 100644 --- a/third_party/rake-compiler-dock/Dockerfile +++ b/third_party/rake-compiler-dock/Dockerfile @@ -169,6 +169,8 @@ RUN bash -c " \ USER root +RUN echo "HERE IS VERSION OF LINUX KERNEL USED: $(uname -r)" + # Fix paths in rake-compiler/config.yml and add rvm and mingw-tools to the global bashrc RUN sed -i -- "s:/root/.rake-compiler:/usr/local/rake-compiler:g" /usr/local/rake-compiler/config.yml && \ echo "source /etc/profile.d/rvm.sh" >> /etc/bash.bashrc && \ From b7e1f535f3af352dfb708fd3261314f3333a70f1 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Fri, 10 Mar 2017 10:37:06 -0800 Subject: [PATCH 04/25] revert changes to rake-compiler-dock dockerfile --- third_party/rake-compiler-dock/Dockerfile | 2 -- 1 file changed, 2 deletions(-) diff --git a/third_party/rake-compiler-dock/Dockerfile b/third_party/rake-compiler-dock/Dockerfile index ad902dfe660..475c6a8d3db 100644 --- a/third_party/rake-compiler-dock/Dockerfile +++ b/third_party/rake-compiler-dock/Dockerfile @@ -169,8 +169,6 @@ RUN bash -c " \ USER root -RUN echo "HERE IS VERSION OF LINUX KERNEL USED: $(uname -r)" - # Fix paths in rake-compiler/config.yml and add rvm and mingw-tools to the global bashrc RUN sed -i -- "s:/root/.rake-compiler:/usr/local/rake-compiler:g" /usr/local/rake-compiler/config.yml && \ echo "source /etc/profile.d/rvm.sh" >> /etc/bash.bashrc && \ From 69719089da708a4630a792131e8cf1ee579e15d5 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Mon, 13 Mar 2017 12:00:42 -0700 Subject: [PATCH 05/25] remove a TODO --- src/ruby/ext/grpc/rb_channel.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index ccdca15c988..9a3f82c2b7a 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -416,10 +416,10 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { grpc_channel_destroy(wrapper->wrapped); } -// Note this loop breaks out when a single call of +// Note this loop breaks out with a single call of // "grpc_rb_event_unblocking_func". -// TODO (apolcyn) does a ruby call to the unblocking func -// necesarily mean process shutdown? +// This assumes that a ruby call the unblocking func +// indicates process shutdown. // In the worst case, this stops polling channel connectivity // early and falls back to current behavior. static void *run_poll_channels_loop_no_gil(void *arg) { @@ -430,7 +430,6 @@ static void *run_poll_channels_loop_no_gil(void *arg) { event = grpc_completion_queue_next( channel_polling_cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); if (event.type == GRPC_QUEUE_SHUTDOWN) { - // TODO (apolcyn) is it guaranteed that this cq is empty by now? break; } if (event.type == GRPC_OP_COMPLETE) { From 427ec5e433e006b7095e68029f5e99a6b11ce962 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Mon, 13 Mar 2017 17:57:47 -0700 Subject: [PATCH 06/25] change if to while to avoid unsafe wakeup --- src/ruby/ext/grpc/rb_channel.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 9a3f82c2b7a..2489ec2fefc 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -402,7 +402,7 @@ static void grpc_rb_channel_try_register_connection_polling( // Note requires wrapper->wrapped, wrapper->safe_destroy_mu/cv initialized static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { gpr_mu_lock(&wrapper->safe_destroy_mu); - if (!wrapper->safe_to_destroy) { + while (!wrapper->safe_to_destroy) { wrapper->request_safe_destroy = 1; gpr_cv_wait(&wrapper->safe_destroy_cv, &wrapper->safe_destroy_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); From fcad5799b4c785d428ed30340d79581a5d97026c Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 14 Mar 2017 12:26:17 -0700 Subject: [PATCH 07/25] in the middle of fixing watch and get connectivity state to work with new changes --- src/ruby/ext/grpc/rb_channel.c | 113 +++++++++++++++-------- src/ruby/spec/channel_connection_spec.rb | 49 ++++++++++ 2 files changed, 121 insertions(+), 41 deletions(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 2489ec2fefc..8cd489345dd 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -76,8 +76,10 @@ typedef struct grpc_rb_channel { grpc_completion_queue *queue; int request_safe_destroy; int safe_to_destroy; - gpr_mu safe_destroy_mu; - gpr_cv safe_destroy_cv; + grpc_connectivity_state current_connectivity_state; + + gpr_mu channel_mu; + gpr_cv channel_cv; } grpc_rb_channel; /* Forward declarations of functions involved in temporary fix to @@ -180,12 +182,19 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { GPR_ASSERT(ch); wrapper->wrapped = ch; - gpr_mu_init(&wrapper->safe_destroy_mu); - gpr_cv_init(&wrapper->safe_destroy_cv); - gpr_mu_lock(&wrapper->safe_destroy_mu); + + gpr_mu_init(&wrapper->channel_mu); + gpr_cv_init(&wrapper->channel_cv); + + gpr_mu_lock(&wrapper->channel_mu); + wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + gpr_cv_signal(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); + + gpr_mu_lock(&wrapper->channel_mu); wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; - gpr_mu_unlock(&wrapper->safe_destroy_mu); + gpr_mu_unlock(&wrapper->channel_mu); grpc_rb_channel_try_register_connection_polling(wrapper); if (args.args != NULL) { @@ -232,43 +241,57 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, grpc_channel_check_connectivity_state(ch, grpc_try_to_connect)); } -/* Watch for a change in connectivity state. - - Once the channel connectivity state is different from the last observed - state, tag will be enqueued on cq with success=1 - - If deadline expires BEFORE the state is changed, tag will be enqueued on - the completion queue with success=0 */ +/* Wait until the channel's connectivity state becomes different from + * "last_state", or "deadline" expires. + * Returns true if the the channel's connectivity state becomes + * different from "last_state" within "deadline". + * Returns false if "deadline" expires before the channel's connectivity + * state changes from "last_state". + * */ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, VALUE deadline) { grpc_rb_channel *wrapper = NULL; - grpc_channel *ch = NULL; - grpc_completion_queue *cq = NULL; - - void *tag = wrapper; - - grpc_event event; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - ch = wrapper->wrapped; - cq = wrapper->queue; - if (ch == NULL) { + + if (wrapper->wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - grpc_channel_watch_connectivity_state( - ch, (grpc_connectivity_state)NUM2LONG(last_state), - grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag); - event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME), - NULL); + if (!FIXNUM_P(last_state)) { + rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant"); + return Qnil; + } - if (event.success) { + gpr_mu_lock(&wrapper->channel_mu); + if (wrapper->current_connectivity_state != NUM2LONG(last_state)) { + gpr_mu_unlock(&wrapper->channel_mu); return Qtrue; - } else { + } + if (wrapper->request_safe_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + rb_raise(rb_eRuntimeError, "watch_connectivity_state called on closed channel"); + return Qfalse; + } + if (wrapper->safe_to_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + gpr_log(GPR_DEBUG, "GRPC_RUBY_RB_CHANNEL: attempt to watch_connectivity_state on a non-state-polled channel"); + return Qfalse; + } + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0)); + if (wrapper->request_safe_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state"); return Qfalse; } + if (wrapper->current_connectivity_state != NUM2LONG(last_state)) { + gpr_mu_unlock(&wrapper->channel_mu); + return Qtrue; + } + gpr_mu_unlock(&wrapper->channel_mu); + return Qfalse; } /* Create a call given a grpc_channel, in order to call method. The request @@ -378,40 +401,47 @@ static void grpc_rb_channel_try_register_connection_polling( GPR_ASSERT(wrapper); GPR_ASSERT(wrapper->wrapped); - gpr_mu_lock(&wrapper->safe_destroy_mu); + gpr_mu_lock(&wrapper->channel_mu); if (wrapper->request_safe_destroy) { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->safe_destroy_cv); - gpr_mu_unlock(&wrapper->safe_destroy_mu); + gpr_cv_signal(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); return; } gpr_mu_lock(&channel_polling_mu); + + gpr_mu_lock(&wrapper->channel_mu); conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + if (conn_state != wrapper->current_connectivity_state) { + wrapper->current_connectivity_state = conn_state; + gpr_cv_signal(&wrapper->channel_cv); + } // avoid posting work to the channel polling cq if it's been shutdown if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) { grpc_channel_watch_connectivity_state( wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper); } else { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->safe_destroy_cv); + gpr_cv_signal(&wrapper->channel_cv); } + gpr_mu_unlock(&wrapper->channel_mu); gpr_mu_unlock(&channel_polling_mu); - gpr_mu_unlock(&wrapper->safe_destroy_mu); + gpr_mu_unlock(&wrapper->channel_mu); } -// Note requires wrapper->wrapped, wrapper->safe_destroy_mu/cv initialized +// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { - gpr_mu_lock(&wrapper->safe_destroy_mu); + gpr_mu_lock(&wrapper->channel_mu); while (!wrapper->safe_to_destroy) { wrapper->request_safe_destroy = 1; - gpr_cv_wait(&wrapper->safe_destroy_cv, &wrapper->safe_destroy_mu, + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } GPR_ASSERT(wrapper->safe_to_destroy); - gpr_mu_unlock(&wrapper->safe_destroy_mu); + gpr_mu_unlock(&wrapper->channel_mu); - gpr_mu_destroy(&wrapper->safe_destroy_mu); - gpr_cv_destroy(&wrapper->safe_destroy_cv); + gpr_mu_destroy(&wrapper->channel_mu); + gpr_cv_destroy(&wrapper->channel_cv); grpc_channel_destroy(wrapper->wrapped); } @@ -434,6 +464,7 @@ static void *run_poll_channels_loop_no_gil(void *arg) { } if (event.type == GRPC_OP_COMPLETE) { wrapper = (grpc_rb_channel *)event.tag; + grpc_rb_channel_try_register_connection_polling(wrapper); } } @@ -524,7 +555,7 @@ void Init_grpc_channel() { rb_define_method(grpc_rb_cChannel, "connectivity_state", grpc_rb_channel_get_connectivity_state, -1); rb_define_method(grpc_rb_cChannel, "watch_connectivity_state", - grpc_rb_channel_watch_connectivity_state, 4); + grpc_rb_channel_watch_connectivity_state, 2); rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call, 5); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb index 58ab37d7bc7..d8e10f7b761 100644 --- a/src/ruby/spec/channel_connection_spec.rb +++ b/src/ruby/spec/channel_connection_spec.rb @@ -90,4 +90,53 @@ describe 'channel connection behavior' do expect(stub.an_rpc(req)).to be_a(EchoMsg) stop_server end + + it 'observably connects and reconnects to transient server when using the channel state API', trial: true do + port = start_server + ch = GRPC::Core::Channel.new("localhost:#{port}", {}, :this_channel_is_insecure) + + expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE) + + state = ch.connectivity_state(true) + + count = 0 + while count < 20 and state != GRPC::Core::ConnectivityStates::READY do + STDERR.puts "first round of waiting for state to become READY" + ch.watch_connectivity_state(state, Time.now + 60) + state = ch.connectivity_state(true) + count += 1 + end + + expect(state).to be(GRPC::Core::ConnectivityStates::READY) + + stop_server + + state = ch.connectivity_state + + count = 0 + while count < 20 and state == GRPC::Core::ConnectivityStates::READY do + STDERR.puts "server shut down. waiting for state to not be READY" + ch.watch_connectivity_state(state, Time.now + 60) + state = ch.connectivity_state + count += 1 + end + + expect(state).to_not be(GRPC::Core::ConnectivityStates::READY) + + start_server(port) + + state = ch.connectivity_state(true) + + count = 0 + while count < 20 and state != GRPC::Core::ConnectivityStates::READY do + STDERR.puts "second round of waiting for state to become READY" + ch.watch_connectivity_state(state, Time.now + 60) + state = ch.connectivity_state(true) + count += 1 + end + + expect(state).to be(GRPC::Core::ConnectivityStates::READY) + + stop_server + end end From be30114a3b95b6dc843f8338d2b8a5470bec2553 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 14 Mar 2017 16:33:44 -0700 Subject: [PATCH 08/25] fix up tests and remove two unlocks in a row bug --- examples/ruby/greeter_client.rb | 32 +++++++++++++++++++++-- src/ruby/ext/grpc/rb_channel.c | 33 +++++++++++------------- src/ruby/spec/channel_connection_spec.rb | 19 +++++++------- 3 files changed, 54 insertions(+), 30 deletions(-) diff --git a/examples/ruby/greeter_client.rb b/examples/ruby/greeter_client.rb index 1cdf79ebf40..379f41536ee 100755 --- a/examples/ruby/greeter_client.rb +++ b/examples/ruby/greeter_client.rb @@ -40,11 +40,39 @@ $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) require 'grpc' require 'helloworld_services_pb' +$int_count = 0 + +def shut_down_term + puts "term sig" + $int_count += 1 + if $int_count > 4 + exit + end +end + +def shut_down_kill + puts "kill sig" + $int_count += 1 + if $int_count > 4 + exit + end +end + + def main stub = Helloworld::Greeter::Stub.new('localhost:50051', :this_channel_is_insecure) user = ARGV.size > 0 ? ARGV[0] : 'world' - message = stub.say_hello(Helloworld::HelloRequest.new(name: user)).message - p "Greeting: #{message}" + Signal.trap("TERM") do + shut_down_term + end + Signal.trap("INT") do + shut_down_kill + end + loop do + message = stub.say_hello(Helloworld::HelloRequest.new(name: user)).message + p "Greeting: #{message}" + sleep 4 + end end main diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 8cd489345dd..d143c54d21b 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -89,7 +89,7 @@ static void grpc_rb_channel_try_register_connection_polling( static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); static grpc_completion_queue *channel_polling_cq; -static gpr_mu channel_polling_mu; +static gpr_mu global_connection_polling_mu; static int abort_channel_polling = 0; /* Destroys Channel instances. */ @@ -188,13 +188,13 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { gpr_mu_lock(&wrapper->channel_mu); wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); - gpr_cv_signal(&wrapper->channel_cv); - gpr_mu_unlock(&wrapper->channel_mu); - - gpr_mu_lock(&wrapper->channel_mu); wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; + + gpr_cv_signal(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); + + grpc_rb_channel_try_register_connection_polling(wrapper); if (args.args != NULL) { @@ -277,7 +277,6 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, } if (wrapper->safe_to_destroy) { gpr_mu_unlock(&wrapper->channel_mu); - gpr_log(GPR_DEBUG, "GRPC_RUBY_RB_CHANNEL: attempt to watch_connectivity_state on a non-state-polled channel"); return Qfalse; } gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0)); @@ -394,7 +393,7 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { // destroy. // Not safe to call while a channel's connection state is polled. static void grpc_rb_channel_try_register_connection_polling( - grpc_rb_channel *wrapper) { + grpc_rb_channel *wrapper) { grpc_connectivity_state conn_state; gpr_timespec sleep_time = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); @@ -408,9 +407,8 @@ static void grpc_rb_channel_try_register_connection_polling( gpr_mu_unlock(&wrapper->channel_mu); return; } - gpr_mu_lock(&channel_polling_mu); + gpr_mu_lock(&global_connection_polling_mu); - gpr_mu_lock(&wrapper->channel_mu); conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); if (conn_state != wrapper->current_connectivity_state) { wrapper->current_connectivity_state = conn_state; @@ -424,8 +422,7 @@ static void grpc_rb_channel_try_register_connection_polling( wrapper->safe_to_destroy = 1; gpr_cv_signal(&wrapper->channel_cv); } - gpr_mu_unlock(&wrapper->channel_mu); - gpr_mu_unlock(&channel_polling_mu); + gpr_mu_unlock(&global_connection_polling_mu); gpr_mu_unlock(&wrapper->channel_mu); } @@ -454,7 +451,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { // early and falls back to current behavior. static void *run_poll_channels_loop_no_gil(void *arg) { grpc_event event; - grpc_rb_channel *wrapper; (void)arg; for (;;) { event = grpc_completion_queue_next( @@ -463,27 +459,28 @@ static void *run_poll_channels_loop_no_gil(void *arg) { break; } if (event.type == GRPC_OP_COMPLETE) { - wrapper = (grpc_rb_channel *)event.tag; - - grpc_rb_channel_try_register_connection_polling(wrapper); + grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag); } } grpc_completion_queue_destroy(channel_polling_cq); + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop"); return NULL; } // Notify the channel polling loop to cleanup and shutdown. static void grpc_rb_event_unblocking_func(void *arg) { (void)arg; - gpr_mu_lock(&channel_polling_mu); + gpr_mu_lock(&global_connection_polling_mu); + gpr_log(GPR_DEBUG, "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting connection polling"); abort_channel_polling = 1; grpc_completion_queue_shutdown(channel_polling_cq); - gpr_mu_unlock(&channel_polling_mu); + gpr_mu_unlock(&global_connection_polling_mu); } // Poll channel connectivity states in background thread without the GIL. static VALUE run_poll_channels_loop(VALUE arg) { (void)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, grpc_rb_event_unblocking_func, NULL); return Qnil; @@ -501,7 +498,7 @@ static VALUE run_poll_channels_loop(VALUE arg) { */ static void start_poll_channels_loop() { channel_polling_cq = grpc_completion_queue_create(NULL); - gpr_mu_init(&channel_polling_mu); + gpr_mu_init(&global_connection_polling_mu); abort_channel_polling = 0; rb_thread_create(run_poll_channels_loop, NULL); } diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb index d8e10f7b761..b344052a215 100644 --- a/src/ruby/spec/channel_connection_spec.rb +++ b/src/ruby/spec/channel_connection_spec.rb @@ -63,7 +63,7 @@ EchoStub = EchoService.rpc_stub_class def start_server(port = 0) @srv = GRPC::RpcServer.new - server_port = @srv.add_http2_port("0.0.0.0:#{port}", :this_port_is_insecure) + server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure) @srv.handle(EchoService) @server_thd = Thread.new { @srv.run } @srv.wait_till_running @@ -84,24 +84,25 @@ describe 'channel connection behavior' do req = EchoMsg.new expect(stub.an_rpc(req)).to be_a(EchoMsg) stop_server - expect { stub.an_rpc(req) }.to raise_error(GRPC::Unavailable) + sleep 1 # TODO(apolcyn) grabbing the same port might fail, is this stable enough? start_server(port) expect(stub.an_rpc(req)).to be_a(EchoMsg) stop_server end - it 'observably connects and reconnects to transient server when using the channel state API', trial: true do + it 'observably connects and reconnects to transient server' \ + 'when using the channel state API' do port = start_server - ch = GRPC::Core::Channel.new("localhost:#{port}", {}, :this_channel_is_insecure) + ch = GRPC::Core::Channel.new("localhost:#{port}", {}, + :this_channel_is_insecure) expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE) state = ch.connectivity_state(true) count = 0 - while count < 20 and state != GRPC::Core::ConnectivityStates::READY do - STDERR.puts "first round of waiting for state to become READY" + while count < 20 && state != GRPC::Core::ConnectivityStates::READY ch.watch_connectivity_state(state, Time.now + 60) state = ch.connectivity_state(true) count += 1 @@ -114,8 +115,7 @@ describe 'channel connection behavior' do state = ch.connectivity_state count = 0 - while count < 20 and state == GRPC::Core::ConnectivityStates::READY do - STDERR.puts "server shut down. waiting for state to not be READY" + while count < 20 && state == GRPC::Core::ConnectivityStates::READY ch.watch_connectivity_state(state, Time.now + 60) state = ch.connectivity_state count += 1 @@ -128,8 +128,7 @@ describe 'channel connection behavior' do state = ch.connectivity_state(true) count = 0 - while count < 20 and state != GRPC::Core::ConnectivityStates::READY do - STDERR.puts "second round of waiting for state to become READY" + while count < 20 && state != GRPC::Core::ConnectivityStates::READY ch.watch_connectivity_state(state, Time.now + 60) state = ch.connectivity_state(true) count += 1 From c44c16e330a8532c8c917c2aa64ffd1aeafe0126 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 14 Mar 2017 17:44:21 -0700 Subject: [PATCH 09/25] add initial framework for full end2end tests outside of rspec --- src/ruby/end2end/echo_server.rb | 68 ++++++++++++++ src/ruby/end2end/gen_protos.sh | 2 + src/ruby/end2end/lib/client_control_pb.rb | 21 +++++ .../end2end/lib/client_control_services_pb.rb | 54 +++++++++++ src/ruby/end2end/lib/echo_pb.rb | 18 ++++ src/ruby/end2end/lib/echo_services_pb.rb | 52 +++++++++++ src/ruby/end2end/protos/client_control.proto | 48 ++++++++++ src/ruby/end2end/protos/echo.proto | 46 ++++++++++ src/ruby/end2end/sig_handling_client.rb | 91 +++++++++++++++++++ src/ruby/end2end/sig_handling_driver.rb | 88 ++++++++++++++++++ src/ruby/qps/worker.rb | 2 + .../helper_scripts/run_ruby_end2end_tests.sh | 36 ++++++++ tools/run_tests/run_tests.py | 11 ++- 13 files changed, 534 insertions(+), 3 deletions(-) create mode 100755 src/ruby/end2end/echo_server.rb create mode 100644 src/ruby/end2end/gen_protos.sh create mode 100644 src/ruby/end2end/lib/client_control_pb.rb create mode 100644 src/ruby/end2end/lib/client_control_services_pb.rb create mode 100644 src/ruby/end2end/lib/echo_pb.rb create mode 100644 src/ruby/end2end/lib/echo_services_pb.rb create mode 100644 src/ruby/end2end/protos/client_control.proto create mode 100644 src/ruby/end2end/protos/echo.proto create mode 100755 src/ruby/end2end/sig_handling_client.rb create mode 100755 src/ruby/end2end/sig_handling_driver.rb create mode 100755 tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh diff --git a/src/ruby/end2end/echo_server.rb b/src/ruby/end2end/echo_server.rb new file mode 100755 index 00000000000..5e80740aa0a --- /dev/null +++ b/src/ruby/end2end/echo_server.rb @@ -0,0 +1,68 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +this_dir = File.expand_path(File.dirname(__FILE__)) +protos_lib_dir = File.join(this_dir, 'lib') +grpc_lib_dir = File.join(File.dirname(this_dir), 'lib') +$LOAD_PATH.unshift(grpc_lib_dir) unless $LOAD_PATH.include?(grpc_lib_dir) +$LOAD_PATH.unshift(protos_lib_dir) unless $LOAD_PATH.include?(protos_lib_dir) +$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) + +require 'grpc' +require 'echo_services_pb' + +# GreeterServer is simple server that implements the Helloworld Greeter server. +class EchoServerImpl < Echo::EchoServer::Service + # say_hello implements the SayHello rpc method. + def echo(echo_req, _) + Echo::EchoReply.new(response: echo_req.request) + end +end + +class ServerRunner + def initialize(port) + @port = port + end + def run + @srv = GRPC::RpcServer.new + @thd = Thread.new do + @srv.add_http2_port("localhost:#{@port}", :this_port_is_insecure) + @srv.handle(EchoServerImpl) + @srv.run + end + @srv.wait_till_running + end + def stop + @srv.stop + @thd.join + raise "server not stopped" unless @srv.stopped? + end +end diff --git a/src/ruby/end2end/gen_protos.sh b/src/ruby/end2end/gen_protos.sh new file mode 100644 index 00000000000..c26b5572da1 --- /dev/null +++ b/src/ruby/end2end/gen_protos.sh @@ -0,0 +1,2 @@ +#!/bin/bash +grpc_tools_ruby_protoc -I protos --ruby_out=lib --grpc_out=lib protos/echo.proto protos/client_control.proto diff --git a/src/ruby/end2end/lib/client_control_pb.rb b/src/ruby/end2end/lib/client_control_pb.rb new file mode 100644 index 00000000000..866095a80ac --- /dev/null +++ b/src/ruby/end2end/lib/client_control_pb.rb @@ -0,0 +1,21 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: client_control.proto + +require 'google/protobuf' + +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "client_control.DoEchoRpcRequest" do + optional :request, :string, 1 + end + add_message "client_control.CreateClientStubRequest" do + optional :server_address, :string, 1 + end + add_message "client_control.Void" do + end +end + +module ClientControl + DoEchoRpcRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.DoEchoRpcRequest").msgclass + CreateClientStubRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.CreateClientStubRequest").msgclass + Void = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.Void").msgclass +end diff --git a/src/ruby/end2end/lib/client_control_services_pb.rb b/src/ruby/end2end/lib/client_control_services_pb.rb new file mode 100644 index 00000000000..4c31f3e5900 --- /dev/null +++ b/src/ruby/end2end/lib/client_control_services_pb.rb @@ -0,0 +1,54 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: client_control.proto for package 'client_control' +# Original file comments: +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +require 'grpc' +require 'client_control_pb' + +module ClientControl + module ClientController + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'client_control.ClientController' + + rpc :DoEchoRpc, DoEchoRpcRequest, Void + rpc :CreateClientStub, CreateClientStubRequest, Void + rpc :Shutdown, Void, Void + end + + Stub = Service.rpc_stub_class + end +end diff --git a/src/ruby/end2end/lib/echo_pb.rb b/src/ruby/end2end/lib/echo_pb.rb new file mode 100644 index 00000000000..c62adc07534 --- /dev/null +++ b/src/ruby/end2end/lib/echo_pb.rb @@ -0,0 +1,18 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: echo.proto + +require 'google/protobuf' + +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "echo.EchoRequest" do + optional :request, :string, 1 + end + add_message "echo.EchoReply" do + optional :response, :string, 1 + end +end + +module Echo + EchoRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoRequest").msgclass + EchoReply = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoReply").msgclass +end diff --git a/src/ruby/end2end/lib/echo_services_pb.rb b/src/ruby/end2end/lib/echo_services_pb.rb new file mode 100644 index 00000000000..c66e975bf32 --- /dev/null +++ b/src/ruby/end2end/lib/echo_services_pb.rb @@ -0,0 +1,52 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: echo.proto for package 'echo' +# Original file comments: +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +require 'grpc' +require 'echo_pb' + +module Echo + module EchoServer + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'echo.EchoServer' + + rpc :Echo, EchoRequest, EchoReply + end + + Stub = Service.rpc_stub_class + end +end diff --git a/src/ruby/end2end/protos/client_control.proto b/src/ruby/end2end/protos/client_control.proto new file mode 100644 index 00000000000..5e11876cb40 --- /dev/null +++ b/src/ruby/end2end/protos/client_control.proto @@ -0,0 +1,48 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package client_control; + +service ClientController { + rpc DoEchoRpc (DoEchoRpcRequest) returns (Void) {} + rpc CreateClientStub(CreateClientStubRequest) returns (Void) {} + rpc Shutdown(Void) returns (Void) {} +} + +message DoEchoRpcRequest { + string request = 1; +} + +message CreateClientStubRequest { + string server_address = 1; +} + +message Void{} diff --git a/src/ruby/end2end/protos/echo.proto b/src/ruby/end2end/protos/echo.proto new file mode 100644 index 00000000000..d47afef70f1 --- /dev/null +++ b/src/ruby/end2end/protos/echo.proto @@ -0,0 +1,46 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package echo; + +service EchoServer { + rpc Echo (EchoRequest) returns (EchoReply) {} +} + +// The request message containing the user's name. +message EchoRequest { + string request = 1; +} + +// The response message containing the greetings +message EchoReply { + string response = 1; +} diff --git a/src/ruby/end2end/sig_handling_client.rb b/src/ruby/end2end/sig_handling_client.rb new file mode 100755 index 00000000000..860c6b5f0d3 --- /dev/null +++ b/src/ruby/end2end/sig_handling_client.rb @@ -0,0 +1,91 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +this_dir = File.expand_path(File.dirname(__FILE__)) +protos_lib_dir = File.join(this_dir, 'lib') +grpc_lib_dir = File.join(File.dirname(this_dir), 'lib') +$LOAD_PATH.unshift(grpc_lib_dir) unless $LOAD_PATH.include?(grpc_lib_dir) +$LOAD_PATH.unshift(protos_lib_dir) unless $LOAD_PATH.include?(protos_lib_dir) +$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) + +require 'grpc' +require 'echo_services_pb' +require 'client_control_services_pb' +require 'optparse' +require 'thread' + +class SigHandlingClientController < ClientControl::ClientController::Service + def initialize(srv) + @srv = srv + end + def do_echo_rpc(req, _) + response = @stub.echo(Echo::EchoRequest.new(request: req.request)) + raise "bad response" unless response.response == req.request + ClientControl::Void.new + end + def create_client_stub(req, _) + @stub = Echo::EchoServer::Stub.new(req.server_address, :this_channel_is_insecure) + ClientControl::Void.new + end + def shutdown(_, _) + Thread.new do + #TODO(apolcyn) There is a race between stopping the server and the "shutdown" rpc completing, + # See if stop method on server can end active RPC cleanly, to avoid this sleep. + sleep 3 + @srv.stop + end + ClientControl::Void.new + end +end + +def main + client_control_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do |p| + client_control_port = p + end + end.parse! + + Signal.trap("TERM") do + STDERR.puts "SIGTERM received" + end + + Signal.trap("INT") do + STDERR.puts "SIGINT received" + end + + srv = GRPC::RpcServer.new + srv.add_http2_port("localhost:#{client_control_port}", :this_port_is_insecure) + srv.handle(SigHandlingClientController.new(srv)) + srv.run +end + +main diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb new file mode 100755 index 00000000000..524f0cebe8d --- /dev/null +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -0,0 +1,88 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# smoke test for a grpc-using app that receives and +# handles process-ending signals + +this_dir = File.expand_path(File.dirname(__FILE__)) +protos_lib_dir = File.join(this_dir, 'lib') +grpc_lib_dir = File.join(File.dirname(this_dir), 'lib') +$LOAD_PATH.unshift(grpc_lib_dir) unless $LOAD_PATH.include?(grpc_lib_dir) +$LOAD_PATH.unshift(protos_lib_dir) unless $LOAD_PATH.include?(protos_lib_dir) +$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) + +require 'grpc' +require 'echo_server' +require 'client_control_services_pb' + +def main + this_dir = File.expand_path(File.dirname(__FILE__)) + lib_dir = File.join(File.dirname(this_dir), 'lib') + + server_port = '50051' + STDERR.puts "start server" + server_runner = ServerRunner.new(server_port) + server_runner.run + + sleep 1 + + client_control_port = '50052' + + STDERR.puts "start client" + client_path = File.join(this_dir, "sig_handling_client.rb") + client_pid = Process.spawn(RbConfig.ruby, client_path, "--client_control_port=#{client_control_port}") + control_stub = ClientControl::ClientController::Stub.new("localhost:#{client_control_port}", :this_channel_is_insecure) + + sleep 1 + + control_stub.create_client_stub(ClientControl::CreateClientStubRequest.new(server_address: "localhost:#{server_port}")) + + count = 0 + while count < 5 + control_stub.do_echo_rpc(ClientControl::DoEchoRpcRequest.new(request: 'hello')) + Process.kill('SIGTERM', client_pid) + Process.kill('SIGINT', client_pid) + count += 1 + end + + control_stub.shutdown(ClientControl::Void.new) + Process.wait(client_pid) + + client_exit_code = $?.exitstatus + + if client_exit_code != 0 + raise "term sig test failure: client exit code: #{client_exit_code}" + end + + server_runner.stop +end + +main diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 61a0b723a3f..318c1f9e22c 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -36,6 +36,8 @@ lib_dir = File.join(File.dirname(this_dir), 'lib') $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) +puts $LOAD_PATH + require 'grpc' require 'optparse' require 'histogram' diff --git a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh new file mode 100755 index 00000000000..7ccbcfca70d --- /dev/null +++ b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +set -ex + +# change to grpc repo root +cd $(dirname $0)/../../.. + +ruby src/ruby/end2end/sig_handling_driver.rb diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 43b8f8184e2..dc17e738e6e 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -692,9 +692,14 @@ class RubyLanguage(object): _check_compiler(self.args.compiler, ['default']) def test_specs(self): - return [self.config.job_spec(['tools/run_tests/helper_scripts/run_ruby.sh'], - timeout_seconds=10*60, - environ=_FORCE_ENVIRON_FOR_WRAPPERS)] + tests = [self.config.job_spec(['tools/run_tests/helper_scripts/run_ruby.sh'], + timeout_seconds=10*60, + environ=_FORCE_ENVIRON_FOR_WRAPPERS)] + # note these aren't getting ran on windows since no workers + tests.append(self.config.job_spec(['tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh'], + timeout_seconds=10*60, + environ=_FORCE_ENVIRON_FOR_WRAPPERS)) + return tests def pre_build_steps(self): return [['tools/run_tests/helper_scripts/pre_build_ruby.sh']] From 7a32e8a499e94efb5dab6e19459f9b9968382000 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 14 Mar 2017 23:35:29 -0700 Subject: [PATCH 10/25] Revert changes to example greeter client --- examples/ruby/greeter_client.rb | 32 ++------------------------------ 1 file changed, 2 insertions(+), 30 deletions(-) diff --git a/examples/ruby/greeter_client.rb b/examples/ruby/greeter_client.rb index 379f41536ee..1cdf79ebf40 100755 --- a/examples/ruby/greeter_client.rb +++ b/examples/ruby/greeter_client.rb @@ -40,39 +40,11 @@ $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) require 'grpc' require 'helloworld_services_pb' -$int_count = 0 - -def shut_down_term - puts "term sig" - $int_count += 1 - if $int_count > 4 - exit - end -end - -def shut_down_kill - puts "kill sig" - $int_count += 1 - if $int_count > 4 - exit - end -end - - def main stub = Helloworld::Greeter::Stub.new('localhost:50051', :this_channel_is_insecure) user = ARGV.size > 0 ? ARGV[0] : 'world' - Signal.trap("TERM") do - shut_down_term - end - Signal.trap("INT") do - shut_down_kill - end - loop do - message = stub.say_hello(Helloworld::HelloRequest.new(name: user)).message - p "Greeting: #{message}" - sleep 4 - end + message = stub.say_hello(Helloworld::HelloRequest.new(name: user)).message + p "Greeting: #{message}" end main From f8dc32e9e2153b2ea97091b611d8c83cbc37c05e Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 15 Mar 2017 00:04:33 -0700 Subject: [PATCH 11/25] make end2end test ports dynamic and slight refactor --- .../{echo_server.rb => end2end_common.rb} | 42 +++++++++++++++++-- src/ruby/end2end/lib/client_control_pb.rb | 4 -- .../end2end/lib/client_control_services_pb.rb | 1 - src/ruby/end2end/protos/client_control.proto | 5 --- src/ruby/end2end/sig_handling_client.rb | 29 +++++-------- src/ruby/end2end/sig_handling_driver.rb | 38 +++-------------- 6 files changed, 53 insertions(+), 66 deletions(-) rename src/ruby/end2end/{echo_server.rb => end2end_common.rb} (69%) diff --git a/src/ruby/end2end/echo_server.rb b/src/ruby/end2end/end2end_common.rb similarity index 69% rename from src/ruby/end2end/echo_server.rb rename to src/ruby/end2end/end2end_common.rb index 5e80740aa0a..67961cdf972 100755 --- a/src/ruby/end2end/echo_server.rb +++ b/src/ruby/end2end/end2end_common.rb @@ -38,6 +38,10 @@ $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) require 'grpc' require 'echo_services_pb' +require 'client_control_services_pb' +require 'socket' +require 'optparse' +require 'thread' # GreeterServer is simple server that implements the Helloworld Greeter server. class EchoServerImpl < Echo::EchoServer::Service @@ -48,17 +52,18 @@ class EchoServerImpl < Echo::EchoServer::Service end class ServerRunner - def initialize(port) - @port = port + def initialize end def run @srv = GRPC::RpcServer.new + port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) + @srv.handle(EchoServerImpl) + @thd = Thread.new do - @srv.add_http2_port("localhost:#{@port}", :this_port_is_insecure) - @srv.handle(EchoServerImpl) @srv.run end @srv.wait_till_running + port end def stop @srv.stop @@ -66,3 +71,32 @@ class ServerRunner raise "server not stopped" unless @srv.stopped? end end + +def start_client(client_main, server_port) + this_dir = File.expand_path(File.dirname(__FILE__)) + + tmp_server = TCPServer.new(0) + client_control_port = tmp_server.local_address.ip_port + tmp_server.close + + client_path = File.join(this_dir, client_main) + client_pid = Process.spawn(RbConfig.ruby, client_path, + "--client_control_port=#{client_control_port}", + "--server_port=#{server_port}") + sleep 1 + control_stub = ClientControl::ClientController::Stub.new("localhost:#{client_control_port}", :this_channel_is_insecure) + return control_stub, client_pid +end + +def cleanup(control_stub, server_runner) + control_stub.shutdown(ClientControl::Void.new) + Process.wait(client_pid) + + client_exit_code = $?.exitstatus + + if client_exit_code != 0 + raise "term sig test failure: client exit code: #{client_exit_code}" + end + + server_runner.stop +end diff --git a/src/ruby/end2end/lib/client_control_pb.rb b/src/ruby/end2end/lib/client_control_pb.rb index 866095a80ac..1a938f1b5ae 100644 --- a/src/ruby/end2end/lib/client_control_pb.rb +++ b/src/ruby/end2end/lib/client_control_pb.rb @@ -7,15 +7,11 @@ Google::Protobuf::DescriptorPool.generated_pool.build do add_message "client_control.DoEchoRpcRequest" do optional :request, :string, 1 end - add_message "client_control.CreateClientStubRequest" do - optional :server_address, :string, 1 - end add_message "client_control.Void" do end end module ClientControl DoEchoRpcRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.DoEchoRpcRequest").msgclass - CreateClientStubRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.CreateClientStubRequest").msgclass Void = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.Void").msgclass end diff --git a/src/ruby/end2end/lib/client_control_services_pb.rb b/src/ruby/end2end/lib/client_control_services_pb.rb index 4c31f3e5900..04b2291bc78 100644 --- a/src/ruby/end2end/lib/client_control_services_pb.rb +++ b/src/ruby/end2end/lib/client_control_services_pb.rb @@ -45,7 +45,6 @@ module ClientControl self.service_name = 'client_control.ClientController' rpc :DoEchoRpc, DoEchoRpcRequest, Void - rpc :CreateClientStub, CreateClientStubRequest, Void rpc :Shutdown, Void, Void end diff --git a/src/ruby/end2end/protos/client_control.proto b/src/ruby/end2end/protos/client_control.proto index 5e11876cb40..f985bb486dc 100644 --- a/src/ruby/end2end/protos/client_control.proto +++ b/src/ruby/end2end/protos/client_control.proto @@ -33,7 +33,6 @@ package client_control; service ClientController { rpc DoEchoRpc (DoEchoRpcRequest) returns (Void) {} - rpc CreateClientStub(CreateClientStubRequest) returns (Void) {} rpc Shutdown(Void) returns (Void) {} } @@ -41,8 +40,4 @@ message DoEchoRpcRequest { string request = 1; } -message CreateClientStubRequest { - string server_address = 1; -} - message Void{} diff --git a/src/ruby/end2end/sig_handling_client.rb b/src/ruby/end2end/sig_handling_client.rb index 860c6b5f0d3..f0b2ba61caf 100755 --- a/src/ruby/end2end/sig_handling_client.rb +++ b/src/ruby/end2end/sig_handling_client.rb @@ -29,32 +29,18 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -this_dir = File.expand_path(File.dirname(__FILE__)) -protos_lib_dir = File.join(this_dir, 'lib') -grpc_lib_dir = File.join(File.dirname(this_dir), 'lib') -$LOAD_PATH.unshift(grpc_lib_dir) unless $LOAD_PATH.include?(grpc_lib_dir) -$LOAD_PATH.unshift(protos_lib_dir) unless $LOAD_PATH.include?(protos_lib_dir) -$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) - -require 'grpc' -require 'echo_services_pb' -require 'client_control_services_pb' -require 'optparse' -require 'thread' +require_relative './end2end_common' class SigHandlingClientController < ClientControl::ClientController::Service - def initialize(srv) + def initialize(srv, stub) @srv = srv + @stub = stub end def do_echo_rpc(req, _) response = @stub.echo(Echo::EchoRequest.new(request: req.request)) raise "bad response" unless response.response == req.request ClientControl::Void.new end - def create_client_stub(req, _) - @stub = Echo::EchoServer::Stub.new(req.server_address, :this_channel_is_insecure) - ClientControl::Void.new - end def shutdown(_, _) Thread.new do #TODO(apolcyn) There is a race between stopping the server and the "shutdown" rpc completing, @@ -68,10 +54,14 @@ end def main client_control_port = '' + server_port = '' OptionParser.new do |opts| opts.on('--client_control_port=P', String) do |p| client_control_port = p end + opts.on('--server_port=P', String) do |p| + server_port = p + end end.parse! Signal.trap("TERM") do @@ -83,8 +73,9 @@ def main end srv = GRPC::RpcServer.new - srv.add_http2_port("localhost:#{client_control_port}", :this_port_is_insecure) - srv.handle(SigHandlingClientController.new(srv)) + srv.add_http2_port("0.0.0.0:#{client_control_port}", :this_port_is_insecure) + stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", :this_channel_is_insecure) + srv.handle(SigHandlingClientController.new(srv, stub)) srv.run end diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb index 524f0cebe8d..bda5c03c168 100755 --- a/src/ruby/end2end/sig_handling_driver.rb +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -32,39 +32,20 @@ # smoke test for a grpc-using app that receives and # handles process-ending signals -this_dir = File.expand_path(File.dirname(__FILE__)) -protos_lib_dir = File.join(this_dir, 'lib') -grpc_lib_dir = File.join(File.dirname(this_dir), 'lib') -$LOAD_PATH.unshift(grpc_lib_dir) unless $LOAD_PATH.include?(grpc_lib_dir) -$LOAD_PATH.unshift(protos_lib_dir) unless $LOAD_PATH.include?(protos_lib_dir) -$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) - -require 'grpc' -require 'echo_server' -require 'client_control_services_pb' +require_relative './end2end_common' def main - this_dir = File.expand_path(File.dirname(__FILE__)) - lib_dir = File.join(File.dirname(this_dir), 'lib') - - server_port = '50051' STDERR.puts "start server" - server_runner = ServerRunner.new(server_port) - server_runner.run + server_runner = ServerRunner.new + server_port = server_runner.run sleep 1 - client_control_port = '50052' - STDERR.puts "start client" - client_path = File.join(this_dir, "sig_handling_client.rb") - client_pid = Process.spawn(RbConfig.ruby, client_path, "--client_control_port=#{client_control_port}") - control_stub = ClientControl::ClientController::Stub.new("localhost:#{client_control_port}", :this_channel_is_insecure) + control_stub, client_pid = start_client("sig_handling_client.rb", server_port) sleep 1 - control_stub.create_client_stub(ClientControl::CreateClientStubRequest.new(server_address: "localhost:#{server_port}")) - count = 0 while count < 5 control_stub.do_echo_rpc(ClientControl::DoEchoRpcRequest.new(request: 'hello')) @@ -73,16 +54,7 @@ def main count += 1 end - control_stub.shutdown(ClientControl::Void.new) - Process.wait(client_pid) - - client_exit_code = $?.exitstatus - - if client_exit_code != 0 - raise "term sig test failure: client exit code: #{client_exit_code}" - end - - server_runner.stop + cleanup(control_stub, server_runner) end main From 16d97edf56b036c28dbe60d1184ef89b71bf8a90 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 15 Mar 2017 10:01:13 -0700 Subject: [PATCH 12/25] add failing test revealing bug in channel state api --- src/ruby/end2end/channel_state_client.rb | 53 ++++++++++++++++ src/ruby/end2end/channel_state_driver.rb | 63 +++++++++++++++++++ src/ruby/end2end/end2end_common.rb | 3 +- src/ruby/end2end/sig_handling_driver.rb | 2 +- .../helper_scripts/run_ruby_end2end_tests.sh | 5 +- 5 files changed, 123 insertions(+), 3 deletions(-) create mode 100755 src/ruby/end2end/channel_state_client.rb create mode 100755 src/ruby/end2end/channel_state_driver.rb diff --git a/src/ruby/end2end/channel_state_client.rb b/src/ruby/end2end/channel_state_client.rb new file mode 100755 index 00000000000..476329ff73a --- /dev/null +++ b/src/ruby/end2end/channel_state_client.rb @@ -0,0 +1,53 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +def main + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do |p| + STDERR.puts "client_control_port ignored" + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + ch = GRPC::Core::Channel.new("localhost:#{server_port}", {}, :this_channel_is_insecure) + + loop do + state = ch.connectivity_state + ch.watch_connectivity_state(state, Time.now + 360) + end +end + +main diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb new file mode 100755 index 00000000000..cab0147e1f9 --- /dev/null +++ b/src/ruby/end2end/channel_state_driver.rb @@ -0,0 +1,63 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# smoke test for a grpc-using app that receives and +# handles process-ending signals + +require_relative './end2end_common' + +def main + STDERR.puts "start server" + server_runner = ServerRunner.new + server_port = server_runner.run + + sleep 1 + + STDERR.puts "start client" + _, client_pid = start_client("channel_state_client.rb", server_port) + + sleep 3 + + Process.kill('SIGTERM', client_pid) + + begin + Timeout.timeout(10) { Process.wait(client_pid) } + rescue Timeout::Error + STDERR.puts "timeout wait for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + raise 'Timed out waiting for client process. It likely hangs' + end + + server_runner.stop +end + +main diff --git a/src/ruby/end2end/end2end_common.rb b/src/ruby/end2end/end2end_common.rb index 67961cdf972..d98e41f6421 100755 --- a/src/ruby/end2end/end2end_common.rb +++ b/src/ruby/end2end/end2end_common.rb @@ -42,6 +42,7 @@ require 'client_control_services_pb' require 'socket' require 'optparse' require 'thread' +require 'timeout' # GreeterServer is simple server that implements the Helloworld Greeter server. class EchoServerImpl < Echo::EchoServer::Service @@ -88,7 +89,7 @@ def start_client(client_main, server_port) return control_stub, client_pid end -def cleanup(control_stub, server_runner) +def cleanup(control_stub, client_pid, server_runner) control_stub.shutdown(ClientControl::Void.new) Process.wait(client_pid) diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb index bda5c03c168..4d205da9ae3 100755 --- a/src/ruby/end2end/sig_handling_driver.rb +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -54,7 +54,7 @@ def main count += 1 end - cleanup(control_stub, server_runner) + cleanup(control_stub, client_pid, server_runner) end main diff --git a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh index 7ccbcfca70d..518848b9508 100755 --- a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh +++ b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh @@ -33,4 +33,7 @@ set -ex # change to grpc repo root cd $(dirname $0)/../../.. -ruby src/ruby/end2end/sig_handling_driver.rb +EXIT_CODE=0 +ruby src/ruby/end2end/sig_handling_driver.rb || EXIT_CODE=1 +ruby src/ruby/end2end/channel_state_driver.rb || EXIT_CODE=1 +exit $EXIT_CODE From f3147b3a7c92212ca6e2289222d9a7d52e0e0f78 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 15 Mar 2017 11:34:08 -0700 Subject: [PATCH 13/25] watch channel state without the gil to fix deadlock on abrupt SIGTERM --- src/ruby/end2end/channel_state_driver.rb | 6 ++--- src/ruby/ext/grpc/rb_channel.c | 33 ++++++++++++++++++++---- src/ruby/qps/worker.rb | 2 -- tools/run_tests/run_tests.py | 1 - 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb index cab0147e1f9..8ef32acfffe 100755 --- a/src/ruby/end2end/channel_state_driver.rb +++ b/src/ruby/end2end/channel_state_driver.rb @@ -29,8 +29,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# smoke test for a grpc-using app that receives and -# handles process-ending signals +# make sure that the client doesn't hang when process ended abruptly require_relative './end2end_common' @@ -54,7 +53,8 @@ def main STDERR.puts "timeout wait for client pid #{client_pid}" Process.kill('SIGKILL', client_pid) Process.wait(client_pid) - raise 'Timed out waiting for client process. It likely hangs' + STDERR.puts "killed client child" + raise 'Timed out waiting for client process. It likely hangs when ended abruptly' end server_runner.stop diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index d143c54d21b..08d48f2a047 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -191,7 +191,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); @@ -241,6 +241,26 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, grpc_channel_check_connectivity_state(ch, grpc_try_to_connect)); } +typedef struct watch_state_stack { + grpc_rb_channel *wrapper; + gpr_timespec deadline; +} watch_state_stack; + +static void *watch_channel_state_without_gvl(void *arg) { + gpr_timespec deadline = ((watch_state_stack*)arg)->deadline; + grpc_rb_channel *wrapper = ((watch_state_stack*)arg)->wrapper; + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); + return NULL; +} + +static void watch_channel_state_unblocking_func(void *arg) { + grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); + gpr_mu_lock(&wrapper->channel_mu); + gpr_cv_broadcast(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); +} + /* Wait until the channel's connectivity state becomes different from * "last_state", or "deadline" expires. * Returns true if the the channel's connectivity state becomes @@ -252,6 +272,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, VALUE deadline) { grpc_rb_channel *wrapper = NULL; + watch_state_stack stack; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -279,7 +300,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, gpr_mu_unlock(&wrapper->channel_mu); return Qfalse; } - gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0)); + stack.wrapper = wrapper; + stack.deadline = grpc_rb_time_timeval(deadline, 0); + rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); if (wrapper->request_safe_destroy) { gpr_mu_unlock(&wrapper->channel_mu); rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state"); @@ -403,7 +426,7 @@ static void grpc_rb_channel_try_register_connection_polling( gpr_mu_lock(&wrapper->channel_mu); if (wrapper->request_safe_destroy) { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); return; } @@ -412,7 +435,7 @@ static void grpc_rb_channel_try_register_connection_polling( conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); if (conn_state != wrapper->current_connectivity_state) { wrapper->current_connectivity_state = conn_state; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); } // avoid posting work to the channel polling cq if it's been shutdown if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) { @@ -420,7 +443,7 @@ static void grpc_rb_channel_try_register_connection_polling( wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper); } else { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); } gpr_mu_unlock(&global_connection_polling_mu); gpr_mu_unlock(&wrapper->channel_mu); diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 318c1f9e22c..61a0b723a3f 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -36,8 +36,6 @@ lib_dir = File.join(File.dirname(this_dir), 'lib') $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) -puts $LOAD_PATH - require 'grpc' require 'optparse' require 'histogram' diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index dc17e738e6e..573196fbb7d 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -695,7 +695,6 @@ class RubyLanguage(object): tests = [self.config.job_spec(['tools/run_tests/helper_scripts/run_ruby.sh'], timeout_seconds=10*60, environ=_FORCE_ENVIRON_FOR_WRAPPERS)] - # note these aren't getting ran on windows since no workers tests.append(self.config.job_spec(['tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh'], timeout_seconds=10*60, environ=_FORCE_ENVIRON_FOR_WRAPPERS)) From 70bc4921e15f813c118477e26ea4bc5267b5c7e0 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 15 Mar 2017 14:00:46 -0700 Subject: [PATCH 14/25] add another currently failing test related to channels and deadlock --- src/ruby/end2end/channel_closing_client.rb | 80 +++++++++++++++++++ src/ruby/end2end/channel_closing_driver.rb | 65 +++++++++++++++ .../helper_scripts/run_ruby_end2end_tests.sh | 1 + 3 files changed, 146 insertions(+) create mode 100755 src/ruby/end2end/channel_closing_client.rb create mode 100755 src/ruby/end2end/channel_closing_driver.rb diff --git a/src/ruby/end2end/channel_closing_client.rb b/src/ruby/end2end/channel_closing_client.rb new file mode 100755 index 00000000000..88fa9ae5e66 --- /dev/null +++ b/src/ruby/end2end/channel_closing_client.rb @@ -0,0 +1,80 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +class ChannelClosingClientController < ClientControl::ClientController::Service + def initialize(ch) + @ch = ch + end + def shutdown(_, _) + STDERR.puts "about to close channel" + @ch.close + STDERR.puts "just closed channel" + end +end + +def main + client_control_port = '' + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do |p| + client_control_port = p + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + ch = GRPC::Core::Channel.new("localhost:#{server_port}", {}, :this_channel_is_insecure) + + srv = GRPC::RpcServer.new + thd = Thread.new do + srv.add_http2_port("0.0.0.0:#{client_control_port}", :this_port_is_insecure) + srv.handle(ChannelClosingClientController.new(ch)) + srv.run + end + + # this should break out once the channel is closed + loop do + state = ch.connectivity_state(true) + begin + ch.watch_connectivity_state(state, Time.now + 360) + rescue RuntimeException => e + break + end + end + + srv.stop + thd.join +end + +main diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb new file mode 100755 index 00000000000..924fbe3bee2 --- /dev/null +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -0,0 +1,65 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# make sure that the client doesn't hang when process ended abruptly + +require_relative './end2end_common' + +def main + STDERR.puts "start server" + server_runner = ServerRunner.new + server_port = server_runner.run + + sleep 1 + + STDERR.puts "start client" + control_stub, client_pid = start_client("channel_closing_client.rb", server_port) + + sleep 3 + + + begin + Timeout.timeout(10) do + control_stub.shutdown(ClientControl::Void.new) + Process.wait(client_pid) + end + rescue Timeout::Error + STDERR.puts "timeout wait for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts "killed client child" + raise 'Timed out waiting for client process. It likely hangs when a channel is closed while connectivity is watched' + end + + server_runner.stop +end + +main diff --git a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh index 518848b9508..eb75878caf9 100755 --- a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh +++ b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh @@ -36,4 +36,5 @@ cd $(dirname $0)/../../.. EXIT_CODE=0 ruby src/ruby/end2end/sig_handling_driver.rb || EXIT_CODE=1 ruby src/ruby/end2end/channel_state_driver.rb || EXIT_CODE=1 +ruby src/ruby/end2end/channel_closing_driver.rb || EXIT_CODE=1 exit $EXIT_CODE From 563ec5324f60b8ea521b41a1a440735b8bf6d2a6 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 15 Mar 2017 15:54:49 -0700 Subject: [PATCH 15/25] stop mixing gpr mutexes and the ruby gil to fix channel closing deadlock --- src/ruby/end2end/channel_closing_client.rb | 10 ++-- src/ruby/ext/grpc/rb_channel.c | 69 +++++++++++++--------- 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/src/ruby/end2end/channel_closing_client.rb b/src/ruby/end2end/channel_closing_client.rb index 88fa9ae5e66..a9df0784217 100755 --- a/src/ruby/end2end/channel_closing_client.rb +++ b/src/ruby/end2end/channel_closing_client.rb @@ -36,9 +36,8 @@ class ChannelClosingClientController < ClientControl::ClientController::Service @ch = ch end def shutdown(_, _) - STDERR.puts "about to close channel" @ch.close - STDERR.puts "just closed channel" + ClientControl::Void.new end end @@ -63,12 +62,13 @@ def main srv.run end - # this should break out once the channel is closed + # this should break out with an exception once the channel is closed loop do - state = ch.connectivity_state(true) begin + state = ch.connectivity_state(true) ch.watch_connectivity_state(state, Time.now + 360) - rescue RuntimeException => e + rescue RuntimeError => e + STDERR.puts "(expected) error occurred: #{e.inspect}" break end end diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 08d48f2a047..94a10faf3fd 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -78,6 +78,7 @@ typedef struct grpc_rb_channel { int safe_to_destroy; grpc_connectivity_state current_connectivity_state; + int mu_init_done; gpr_mu channel_mu; gpr_cv channel_cv; } grpc_rb_channel; @@ -106,6 +107,11 @@ static void grpc_rb_channel_free(void *p) { ch->wrapped = NULL; } + if (ch->mu_init_done) { + gpr_mu_destroy(&ch->channel_mu); + gpr_cv_destroy(&ch->channel_cv); + } + xfree(p); } @@ -164,6 +170,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); + wrapper->mu_init_done = 0; target_chars = StringValueCStr(target); grpc_rb_hash_convert_to_channel_args(channel_args, &args); if (TYPE(credentials) == T_SYMBOL) { @@ -185,6 +192,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { gpr_mu_init(&wrapper->channel_mu); gpr_cv_init(&wrapper->channel_cv); + wrapper->mu_init_done = 1; gpr_mu_lock(&wrapper->channel_mu); wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); @@ -244,13 +252,38 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, typedef struct watch_state_stack { grpc_rb_channel *wrapper; gpr_timespec deadline; + int last_state; } watch_state_stack; static void *watch_channel_state_without_gvl(void *arg) { - gpr_timespec deadline = ((watch_state_stack*)arg)->deadline; - grpc_rb_channel *wrapper = ((watch_state_stack*)arg)->wrapper; + watch_state_stack *stack = (watch_state_stack*)arg; + + gpr_timespec deadline = stack->deadline; + grpc_rb_channel *wrapper = stack->wrapper; + int last_state = stack->last_state; + + gpr_mu_lock(&wrapper->channel_mu); + if (wrapper->current_connectivity_state != last_state) { + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)0; + } + if (wrapper->request_safe_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)0; + } + if (wrapper->safe_to_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)0; + } + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); - return NULL; + + if (wrapper->current_connectivity_state != last_state) { + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)1; + } + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)0; } static void watch_channel_state_unblocking_func(void *arg) { @@ -273,6 +306,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE deadline) { grpc_rb_channel *wrapper = NULL; watch_state_stack stack; + void* out; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -286,33 +320,13 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, return Qnil; } - gpr_mu_lock(&wrapper->channel_mu); - if (wrapper->current_connectivity_state != NUM2LONG(last_state)) { - gpr_mu_unlock(&wrapper->channel_mu); - return Qtrue; - } - if (wrapper->request_safe_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - rb_raise(rb_eRuntimeError, "watch_connectivity_state called on closed channel"); - return Qfalse; - } - if (wrapper->safe_to_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - return Qfalse; - } stack.wrapper = wrapper; stack.deadline = grpc_rb_time_timeval(deadline, 0); - rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); - if (wrapper->request_safe_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state"); - return Qfalse; - } - if (wrapper->current_connectivity_state != NUM2LONG(last_state)) { - gpr_mu_unlock(&wrapper->channel_mu); + stack.last_state = NUM2LONG(last_state); + out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); + if (out) { return Qtrue; } - gpr_mu_unlock(&wrapper->channel_mu); return Qfalse; } @@ -460,9 +474,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { GPR_ASSERT(wrapper->safe_to_destroy); gpr_mu_unlock(&wrapper->channel_mu); - gpr_mu_destroy(&wrapper->channel_mu); - gpr_cv_destroy(&wrapper->channel_cv); - grpc_channel_destroy(wrapper->wrapped); } From 4109c23734d572cc19cd7e54571757c854db27f7 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 15 Mar 2017 16:07:03 -0700 Subject: [PATCH 16/25] add a README for new test directory --- src/ruby/end2end/README | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 src/ruby/end2end/README diff --git a/src/ruby/end2end/README b/src/ruby/end2end/README new file mode 100644 index 00000000000..f0dc14fe9b6 --- /dev/null +++ b/src/ruby/end2end/README @@ -0,0 +1,18 @@ +This directory contains some grpc-ruby end to end tests. + +Each test here involves two files: a "driver" and a "client". For example, +the "channel_closing" test involves channel_closing_driver.rb +and channel_closing_client.rb. + +Typically, the "driver will start up a simple "echo" server, and then +spawn a client. It gives the client the address of the "echo" server as +well as an address to listen on for control rpcs. Depending on the test, the +client usually starts up a "ClientControl" grpc server for the driver to +interact with (the driver can tell the client process to do strange things at +different times, depending on the test). + +So far these tests are mostly useful for testing process-shutdown related +situations, since the client's run in separate processes. + +These tests are invoked through the "tools/run_tests/run_tests.py" script (the +Rakefile doesn't start these). From af3213f38bb5c3737c94a685ee508d289552b5a1 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 15 Mar 2017 16:44:52 -0700 Subject: [PATCH 17/25] remove now-unused channel completion queues --- src/ruby/ext/grpc/rb_channel.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 94a10faf3fd..3b1111e5e2c 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -73,7 +73,6 @@ typedef struct grpc_rb_channel { /* The actual channel */ grpc_channel *wrapped; - grpc_completion_queue *queue; int request_safe_destroy; int safe_to_destroy; grpc_connectivity_state current_connectivity_state; @@ -103,7 +102,6 @@ static void grpc_rb_channel_free(void *p) { if (ch->wrapped != NULL) { grpc_rb_channel_safe_destroy(ch); - grpc_rb_completion_queue_destroy(ch->queue); ch->wrapped = NULL; } @@ -215,7 +213,6 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { } rb_ivar_set(self, id_target, target); wrapper->wrapped = ch; - wrapper->queue = grpc_completion_queue_create(NULL); return self; } @@ -404,8 +401,6 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { ch = wrapper->wrapped; if (ch != NULL) { grpc_rb_channel_safe_destroy(wrapper); - GPR_ASSERT(wrapper->queue != NULL); - grpc_rb_completion_queue_destroy(wrapper->queue); wrapper->wrapped = NULL; } From 071f74f6f13bc806169e0d3c5ca947e3b9b0605d Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 15 Mar 2017 17:32:40 -0700 Subject: [PATCH 18/25] add copyright header to fix failing sanity tests --- src/ruby/end2end/{README => README.md} | 2 +- src/ruby/end2end/channel_closing_driver.rb | 2 +- src/ruby/end2end/gen_protos.sh | 30 ++++++++++++++++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) rename src/ruby/end2end/{README => README.md} (91%) diff --git a/src/ruby/end2end/README b/src/ruby/end2end/README.md similarity index 91% rename from src/ruby/end2end/README rename to src/ruby/end2end/README.md index f0dc14fe9b6..ea5ab6d4bcb 100644 --- a/src/ruby/end2end/README +++ b/src/ruby/end2end/README.md @@ -4,7 +4,7 @@ Each test here involves two files: a "driver" and a "client". For example, the "channel_closing" test involves channel_closing_driver.rb and channel_closing_client.rb. -Typically, the "driver will start up a simple "echo" server, and then +Typically, the "driver" will start up a simple "echo" server, and then spawn a client. It gives the client the address of the "echo" server as well as an address to listen on for control rpcs. Depending on the test, the client usually starts up a "ClientControl" grpc server for the driver to diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb index 924fbe3bee2..98c8eaa1cdf 100755 --- a/src/ruby/end2end/channel_closing_driver.rb +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -29,7 +29,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# make sure that the client doesn't hang when process ended abruptly +# make sure that the client doesn't hang when channel is closed explictly while it's used require_relative './end2end_common' diff --git a/src/ruby/end2end/gen_protos.sh b/src/ruby/end2end/gen_protos.sh index c26b5572da1..f78d9ad394a 100644 --- a/src/ruby/end2end/gen_protos.sh +++ b/src/ruby/end2end/gen_protos.sh @@ -1,2 +1,32 @@ #!/bin/bash + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + grpc_tools_ruby_protoc -I protos --ruby_out=lib --grpc_out=lib protos/echo.proto protos/client_control.proto From b862b6ae205628d7713ee2e4cbb752206cf0b1f6 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Sun, 19 Mar 2017 23:36:21 -0700 Subject: [PATCH 19/25] use RTEST with channel watch arg to capture larger set of truthy args --- src/ruby/ext/grpc/rb_channel.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 3b1111e5e2c..1fe825efd64 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -234,7 +234,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */ rb_scan_args(argc, argv, "01", &try_to_connect_param); - grpc_try_to_connect = try_to_connect_param == Qtrue ? 1 : 0; + grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; From 4e606751db54e289fae86aed7e1b1c2f35478469 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Sun, 19 Mar 2017 23:32:54 -0700 Subject: [PATCH 20/25] add end2end tests to formatter and adjust to formatter --- Rakefile | 3 ++- src/ruby/.rubocop.yml | 1 + src/ruby/end2end/channel_closing_client.rb | 6 +++++- src/ruby/end2end/channel_closing_driver.rb | 16 ++++++++------ src/ruby/end2end/channel_state_client.rb | 7 +++--- src/ruby/end2end/channel_state_driver.rb | 11 +++++----- src/ruby/end2end/end2end_common.rb | 22 ++++++++++++------- src/ruby/end2end/sig_handling_client.rb | 25 ++++++++++++++-------- src/ruby/end2end/sig_handling_driver.rb | 9 ++++---- 9 files changed, 62 insertions(+), 38 deletions(-) diff --git a/Rakefile b/Rakefile index 12ac12710fb..cc02aa130ab 100755 --- a/Rakefile +++ b/Rakefile @@ -12,7 +12,8 @@ load 'tools/distrib/docker_for_windows.rb' # Add rubocop style checking tasks RuboCop::RakeTask.new(:rubocop) do |task| task.options = ['-c', 'src/ruby/.rubocop.yml'] - task.patterns = ['src/ruby/{lib,spec}/**/*.rb'] + # add end2end tests to formatter but don't add generated proto _pb.rb's + task.patterns = ['src/ruby/{lib,spec}/**/*.rb', 'src/ruby/end2end/*.rb'] end spec = Gem::Specification.load('grpc.gemspec') diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml index 0f61ccfa812..7174f3b15af 100644 --- a/src/ruby/.rubocop.yml +++ b/src/ruby/.rubocop.yml @@ -9,6 +9,7 @@ AllCops: - 'bin/math_services_pb.rb' - 'pb/grpc/health/v1/*' - 'pb/test/**/*' + - 'end2end/lib/*' Metrics/CyclomaticComplexity: Max: 9 diff --git a/src/ruby/end2end/channel_closing_client.rb b/src/ruby/end2end/channel_closing_client.rb index a9df0784217..84497973607 100755 --- a/src/ruby/end2end/channel_closing_client.rb +++ b/src/ruby/end2end/channel_closing_client.rb @@ -31,10 +31,13 @@ require_relative './end2end_common' +# Calls '#close' on a Channel when "shutdown" called. This tries to +# trigger a hang or crash bug by closing a channel actively being watched class ChannelClosingClientController < ClientControl::ClientController::Service def initialize(ch) @ch = ch end + def shutdown(_, _) @ch.close ClientControl::Void.new @@ -53,7 +56,8 @@ def main end end.parse! - ch = GRPC::Core::Channel.new("localhost:#{server_port}", {}, :this_channel_is_insecure) + ch = GRPC::Core::Channel.new("localhost:#{server_port}", {}, + :this_channel_is_insecure) srv = GRPC::RpcServer.new thd = Thread.new do diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb index 98c8eaa1cdf..43e2fe8cbbd 100755 --- a/src/ruby/end2end/channel_closing_driver.rb +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -29,23 +29,24 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# make sure that the client doesn't hang when channel is closed explictly while it's used +# make sure that the client doesn't hang when channel is closed +# explictly while it's used require_relative './end2end_common' def main - STDERR.puts "start server" + STDERR.puts 'start server' server_runner = ServerRunner.new server_port = server_runner.run sleep 1 - STDERR.puts "start client" - control_stub, client_pid = start_client("channel_closing_client.rb", server_port) + STDERR.puts 'start client' + control_stub, client_pid = start_client('channel_closing_client.rb', + server_port) sleep 3 - begin Timeout.timeout(10) do control_stub.shutdown(ClientControl::Void.new) @@ -55,8 +56,9 @@ def main STDERR.puts "timeout wait for client pid #{client_pid}" Process.kill('SIGKILL', client_pid) Process.wait(client_pid) - STDERR.puts "killed client child" - raise 'Timed out waiting for client process. It likely hangs when a channel is closed while connectivity is watched' + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. It likely hangs when a ' \ + 'channel is closed while connectivity is watched' end server_runner.stop diff --git a/src/ruby/end2end/channel_state_client.rb b/src/ruby/end2end/channel_state_client.rb index 476329ff73a..08c21bbb8e8 100755 --- a/src/ruby/end2end/channel_state_client.rb +++ b/src/ruby/end2end/channel_state_client.rb @@ -34,15 +34,16 @@ require_relative './end2end_common' def main server_port = '' OptionParser.new do |opts| - opts.on('--client_control_port=P', String) do |p| - STDERR.puts "client_control_port ignored" + opts.on('--client_control_port=P', String) do + STDERR.puts 'client_control_port ignored' end opts.on('--server_port=P', String) do |p| server_port = p end end.parse! - ch = GRPC::Core::Channel.new("localhost:#{server_port}", {}, :this_channel_is_insecure) + ch = GRPC::Core::Channel.new("localhost:#{server_port}", {}, + :this_channel_is_insecure) loop do state = ch.connectivity_state diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb index 8ef32acfffe..c3184bf9392 100755 --- a/src/ruby/end2end/channel_state_driver.rb +++ b/src/ruby/end2end/channel_state_driver.rb @@ -34,14 +34,14 @@ require_relative './end2end_common' def main - STDERR.puts "start server" + STDERR.puts 'start server' server_runner = ServerRunner.new server_port = server_runner.run sleep 1 - STDERR.puts "start client" - _, client_pid = start_client("channel_state_client.rb", server_port) + STDERR.puts 'start client' + _, client_pid = start_client('channel_state_client.rb', server_port) sleep 3 @@ -53,8 +53,9 @@ def main STDERR.puts "timeout wait for client pid #{client_pid}" Process.kill('SIGKILL', client_pid) Process.wait(client_pid) - STDERR.puts "killed client child" - raise 'Timed out waiting for client process. It likely hangs when ended abruptly' + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. ' \ + 'It likely hangs when ended abruptly' end server_runner.stop diff --git a/src/ruby/end2end/end2end_common.rb b/src/ruby/end2end/end2end_common.rb index d98e41f6421..9534bb20787 100755 --- a/src/ruby/end2end/end2end_common.rb +++ b/src/ruby/end2end/end2end_common.rb @@ -43,6 +43,7 @@ require 'socket' require 'optparse' require 'thread' require 'timeout' +require 'English' # see https://github.com/bbatsov/rubocop/issues/1747 # GreeterServer is simple server that implements the Helloworld Greeter server. class EchoServerImpl < Echo::EchoServer::Service @@ -52,9 +53,11 @@ class EchoServerImpl < Echo::EchoServer::Service end end +# ServerRunner starts an "echo server" that test clients can make calls to class ServerRunner def initialize end + def run @srv = GRPC::RpcServer.new port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) @@ -66,10 +69,11 @@ class ServerRunner @srv.wait_till_running port end + def stop @srv.stop @thd.join - raise "server not stopped" unless @srv.stopped? + fail 'server not stopped' unless @srv.stopped? end end @@ -81,22 +85,24 @@ def start_client(client_main, server_port) tmp_server.close client_path = File.join(this_dir, client_main) - client_pid = Process.spawn(RbConfig.ruby, client_path, - "--client_control_port=#{client_control_port}", - "--server_port=#{server_port}") + client_pid = Process.spawn(RbConfig.ruby, + client_path, + "--client_control_port=#{client_control_port}", + "--server_port=#{server_port}") sleep 1 - control_stub = ClientControl::ClientController::Stub.new("localhost:#{client_control_port}", :this_channel_is_insecure) - return control_stub, client_pid + control_stub = ClientControl::ClientController::Stub.new( + "localhost:#{client_control_port}", :this_channel_is_insecure) + [control_stub, client_pid] end def cleanup(control_stub, client_pid, server_runner) control_stub.shutdown(ClientControl::Void.new) Process.wait(client_pid) - client_exit_code = $?.exitstatus + client_exit_code = $CHILD_STATUS if client_exit_code != 0 - raise "term sig test failure: client exit code: #{client_exit_code}" + fail "term sig test failure: client exit code: #{client_exit_code}" end server_runner.stop diff --git a/src/ruby/end2end/sig_handling_client.rb b/src/ruby/end2end/sig_handling_client.rb index f0b2ba61caf..5b1e54718ec 100755 --- a/src/ruby/end2end/sig_handling_client.rb +++ b/src/ruby/end2end/sig_handling_client.rb @@ -31,20 +31,25 @@ require_relative './end2end_common' +# Test client. Sends RPC's as normal but process also has signal handlers class SigHandlingClientController < ClientControl::ClientController::Service def initialize(srv, stub) @srv = srv @stub = stub end + def do_echo_rpc(req, _) response = @stub.echo(Echo::EchoRequest.new(request: req.request)) - raise "bad response" unless response.response == req.request + fail 'bad response' unless response.response == req.request ClientControl::Void.new end + def shutdown(_, _) Thread.new do - #TODO(apolcyn) There is a race between stopping the server and the "shutdown" rpc completing, - # See if stop method on server can end active RPC cleanly, to avoid this sleep. + # TODO(apolcyn) There is a race between stopping the + # server and the "shutdown" rpc completing, + # See if stop method on server can end active RPC cleanly, to + # avoid this sleep. sleep 3 @srv.stop end @@ -64,17 +69,19 @@ def main end end.parse! - Signal.trap("TERM") do - STDERR.puts "SIGTERM received" + Signal.trap('TERM') do + STDERR.puts 'SIGTERM received' end - Signal.trap("INT") do - STDERR.puts "SIGINT received" + Signal.trap('INT') do + STDERR.puts 'SIGINT received' end srv = GRPC::RpcServer.new - srv.add_http2_port("0.0.0.0:#{client_control_port}", :this_port_is_insecure) - stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", :this_channel_is_insecure) + srv.add_http2_port("0.0.0.0:#{client_control_port}", + :this_port_is_insecure) + stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", + :this_channel_is_insecure) srv.handle(SigHandlingClientController.new(srv, stub)) srv.run end diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb index 4d205da9ae3..c5d46e074cb 100755 --- a/src/ruby/end2end/sig_handling_driver.rb +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -35,20 +35,21 @@ require_relative './end2end_common' def main - STDERR.puts "start server" + STDERR.puts 'start server' server_runner = ServerRunner.new server_port = server_runner.run sleep 1 - STDERR.puts "start client" - control_stub, client_pid = start_client("sig_handling_client.rb", server_port) + STDERR.puts 'start client' + control_stub, client_pid = start_client('sig_handling_client.rb', server_port) sleep 1 count = 0 while count < 5 - control_stub.do_echo_rpc(ClientControl::DoEchoRpcRequest.new(request: 'hello')) + control_stub.do_echo_rpc( + ClientControl::DoEchoRpcRequest.new(request: 'hello')) Process.kill('SIGTERM', client_pid) Process.kill('SIGINT', client_pid) count += 1 From ea282e9c4cd422a1edcbdd30a81b8f58c8523063 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Mon, 20 Mar 2017 20:53:34 -0700 Subject: [PATCH 21/25] add passing test that kills a process while there are active watch chan state calls --- .../sig_int_during_channel_watch_client.rb | 70 +++++++++++++++++++ .../sig_int_during_channel_watch_driver.rb | 67 ++++++++++++++++++ .../helper_scripts/run_ruby_end2end_tests.sh | 1 + 3 files changed, 138 insertions(+) create mode 100755 src/ruby/end2end/sig_int_during_channel_watch_client.rb create mode 100755 src/ruby/end2end/sig_int_during_channel_watch_driver.rb diff --git a/src/ruby/end2end/sig_int_during_channel_watch_client.rb b/src/ruby/end2end/sig_int_during_channel_watch_client.rb new file mode 100755 index 00000000000..389fc5ba332 --- /dev/null +++ b/src/ruby/end2end/sig_int_during_channel_watch_client.rb @@ -0,0 +1,70 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +# Start polling the channel state in both the main thread +# and a child thread. Try to get the driver to send process-ending +# interrupt while both a child thread and the main thread are in the +# middle of a blocking connectivity_state call. +def main + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do + STDERR.puts 'client_control_port not used' + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + thd = Thread.new do + child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}", + {}, + :this_channel_is_insecure) + loop do + state = child_thread_channel.connectivity_state(false) + child_thread_channel.watch_connectivity_state(state, Time.now + 360) + end + end + + main_channel = GRPC::Core::Channel.new("localhost:#{server_port}", + {}, + :this_channel_is_insecure) + loop do + state = main_channel.connectivity_state(false) + main_channel.watch_connectivity_state(state, Time.now + 360) + end + + thd.join +end + +main diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb new file mode 100755 index 00000000000..8c9fecd5493 --- /dev/null +++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb @@ -0,0 +1,67 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# abruptly end a process that has active calls to +# Channel.watch_connectivity_state + +require_relative './end2end_common' + +def main + STDERR.puts 'start server' + server_runner = ServerRunner.new + server_port = server_runner.run + + sleep 1 + + STDERR.puts 'start client' + _, client_pid = start_client('sig_int_during_channel_watch_client.rb', + server_port) + + # give time for the client to get into the middle + # of a channel state watch call + sleep 1 + Process.kill('SIGINT', client_pid) + + begin + Timeout.timeout(10) do + Process.wait(client_pid) + end + rescue Timeout::Error + STDERR.puts "timeout wait for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. It likely hangs when a ' \ + 'SIGINT is sent while there is an active connectivity_state call' + end +end + +main diff --git a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh index eb75878caf9..d7da6364d8f 100755 --- a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh +++ b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh @@ -37,4 +37,5 @@ EXIT_CODE=0 ruby src/ruby/end2end/sig_handling_driver.rb || EXIT_CODE=1 ruby src/ruby/end2end/channel_state_driver.rb || EXIT_CODE=1 ruby src/ruby/end2end/channel_closing_driver.rb || EXIT_CODE=1 +ruby src/ruby/end2end/sig_int_during_channel_watch_driver.rb || EXIT_CODE=1 exit $EXIT_CODE From 5b881460d24bc930339d1cfd37805a7eadeee5c0 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 21 Mar 2017 18:31:29 -0700 Subject: [PATCH 22/25] make fewer lock/unlock calls and loop on cv_wait in watch conn state --- src/ruby/ext/grpc/rb_channel.c | 40 +++++++++++------------- src/ruby/spec/channel_connection_spec.rb | 2 +- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 1fe825efd64..c12ea921c98 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -78,6 +78,7 @@ typedef struct grpc_rb_channel { grpc_connectivity_state current_connectivity_state; int mu_init_done; + int abort_watch_connectivity_state; gpr_mu channel_mu; gpr_cv channel_cv; } grpc_rb_channel; @@ -193,6 +194,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { wrapper->mu_init_done = 1; gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 0; wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; @@ -242,8 +244,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - return LONG2NUM( - grpc_channel_check_connectivity_state(ch, grpc_try_to_connect)); + return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect)); } typedef struct watch_state_stack { @@ -254,39 +255,35 @@ typedef struct watch_state_stack { static void *watch_channel_state_without_gvl(void *arg) { watch_state_stack *stack = (watch_state_stack*)arg; - gpr_timespec deadline = stack->deadline; grpc_rb_channel *wrapper = stack->wrapper; int last_state = stack->last_state; + void *return_value = (void*)0; + gpr_timespec time_check_increment = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); + gpr_mu_lock(&wrapper->channel_mu); - if (wrapper->current_connectivity_state != last_state) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; - } - if (wrapper->request_safe_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; + while(wrapper->current_connectivity_state == last_state && + !wrapper->request_safe_destroy && + !wrapper->safe_to_destroy && + !wrapper->abort_watch_connectivity_state && + gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, time_check_increment); } - if (wrapper->safe_to_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; - } - - gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); - if (wrapper->current_connectivity_state != last_state) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)1; + return_value = (void*)1; } gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; + + return return_value; } static void watch_channel_state_unblocking_func(void *arg) { grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 1; gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); } @@ -461,8 +458,9 @@ static void grpc_rb_channel_try_register_connection_polling( // Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { gpr_mu_lock(&wrapper->channel_mu); + wrapper->request_safe_destroy = 1; + while (!wrapper->safe_to_destroy) { - wrapper->request_safe_destroy = 1; gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb index b344052a215..940d68b9b04 100644 --- a/src/ruby/spec/channel_connection_spec.rb +++ b/src/ruby/spec/channel_connection_spec.rb @@ -92,7 +92,7 @@ describe 'channel connection behavior' do end it 'observably connects and reconnects to transient server' \ - 'when using the channel state API' do + ' when using the channel state API' do port = start_server ch = GRPC::Core::Channel.new("localhost:#{port}", {}, :this_channel_is_insecure) From 06d4edd28355877e71733a0d246a9b5b4c67452b Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 21 Mar 2017 19:24:07 -0700 Subject: [PATCH 23/25] fix setting of time_check_increment in watch conn state loop --- src/ruby/ext/grpc/rb_channel.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index c12ea921c98..acf22dd46be 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -259,9 +259,7 @@ static void *watch_channel_state_without_gvl(void *arg) { grpc_rb_channel *wrapper = stack->wrapper; int last_state = stack->last_state; void *return_value = (void*)0; - gpr_timespec time_check_increment = gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); - + gpr_timespec time_check_increment; gpr_mu_lock(&wrapper->channel_mu); while(wrapper->current_connectivity_state == last_state && @@ -269,6 +267,8 @@ static void *watch_channel_state_without_gvl(void *arg) { !wrapper->safe_to_destroy && !wrapper->abort_watch_connectivity_state && gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { + time_check_increment = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, time_check_increment); } if (wrapper->current_connectivity_state != last_state) { From 513070cf20c973727c7f5899ac9018f7fd349b1f Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 22 Mar 2017 10:27:31 -0700 Subject: [PATCH 24/25] get rid of time check increment in watch connectivity state loop --- src/ruby/ext/grpc/rb_channel.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index acf22dd46be..ba230525217 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -259,7 +259,6 @@ static void *watch_channel_state_without_gvl(void *arg) { grpc_rb_channel *wrapper = stack->wrapper; int last_state = stack->last_state; void *return_value = (void*)0; - gpr_timespec time_check_increment; gpr_mu_lock(&wrapper->channel_mu); while(wrapper->current_connectivity_state == last_state && @@ -267,9 +266,7 @@ static void *watch_channel_state_without_gvl(void *arg) { !wrapper->safe_to_destroy && !wrapper->abort_watch_connectivity_state && gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { - time_check_increment = gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); - gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, time_check_increment); + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); } if (wrapper->current_connectivity_state != last_state) { return_value = (void*)1; From 02d131b5ef69e5427b078caa68c49da8eda91bac Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 22 Mar 2017 15:12:32 -0700 Subject: [PATCH 25/25] fix mac crash on abruptly ended server thread --- src/ruby/end2end/sig_int_during_channel_watch_driver.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb index 8c9fecd5493..84d039bf19d 100755 --- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb +++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb @@ -62,6 +62,8 @@ def main raise 'Timed out waiting for client process. It likely hangs when a ' \ 'SIGINT is sent while there is an active connectivity_state call' end + + server_runner.stop end main