From be9b81424075fbd9bc77956c3eb882929b7bb5a8 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Fri, 4 Aug 2017 10:42:03 -0700 Subject: [PATCH 01/19] Add ChannelConnectivityWatcher --- include/grpc++/channel.h | 5 +++ src/cpp/client/channel_cc.cc | 64 ++++++++++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index c50091d6ac1..73f28a182c7 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -29,6 +29,10 @@ struct grpc_channel; +namespace grpc { +class ChannelConnectivityWatcher; +} + namespace grpc { /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, @@ -71,6 +75,7 @@ class Channel final : public ChannelInterface, bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) override; + std::unique_ptr connectivity_watcher_; const grpc::string host_; grpc_channel* const c_channel_; // owned }; diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f2d9bb07c95..27bd75f3cde 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -35,17 +35,77 @@ #include #include #include +#include #include "src/core/lib/profiling/timers.h" namespace grpc { +namespace { +void WatchStateChange(void* arg); +} // namespace + +// Constantly watches channel connectivity status to reconnect a transiently +// disconnected channel. This is a temporary work-around before we have retry +// support. +class ChannelConnectivityWatcher { + public: + ChannelConnectivityWatcher(Channel* channel) + : channel_(channel), thd_id_(0), being_destroyed_(0) {} + + void WatchStateChangeImpl() { + grpc_connectivity_state state = GRPC_CHANNEL_IDLE; + while (state != GRPC_CHANNEL_SHUTDOWN) { + if (gpr_atm_no_barrier_load(&being_destroyed_) == 1) { + break; + } + channel_->WaitForStateChange( + state, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(1, GPR_TIMESPAN))); + state = channel_->GetState(false); + } + } + void StartWatching() { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + + void Destroy() { + if (thd_id_ != 0) { + gpr_atm_no_barrier_store(&being_destroyed_, 1); + gpr_thd_join(thd_id_); + } + } + + private: + Channel* channel_; + gpr_thd_id thd_id_; + gpr_atm being_destroyed_; +}; + +namespace { +void WatchStateChange(void* arg) { + ChannelConnectivityWatcher* watcher = + static_cast(arg); + watcher->WatchStateChangeImpl(); +} +} // namespace + static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) - : host_(host), c_channel_(channel) { + : connectivity_watcher_(new ChannelConnectivityWatcher(this)), + host_(host), + c_channel_(channel) { g_gli_initializer.summon(); + if (host != "inproc") { + connectivity_watcher_->StartWatching(); + } } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + connectivity_watcher_->Destroy(); + grpc_channel_destroy(c_channel_); +} namespace { From ee3e3310bb22aa0422d22a6adb138ab0692541eb Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Fri, 4 Aug 2017 13:54:04 -0700 Subject: [PATCH 02/19] Add reconnect channel tests --- src/cpp/client/channel_cc.cc | 7 +- test/cpp/end2end/async_end2end_test.cc | 129 ++++++++++++++----------- test/cpp/end2end/end2end_test.cc | 22 +++++ 3 files changed, 102 insertions(+), 56 deletions(-) diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 27bd75f3cde..38977cb4e7e 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -41,6 +41,7 @@ namespace grpc { namespace { +const int kWaitForStateChangeTimeoutMsec = 100; void WatchStateChange(void* arg); } // namespace @@ -59,8 +60,10 @@ class ChannelConnectivityWatcher { break; } channel_->WaitForStateChange( - state, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(1, GPR_TIMESPAN))); + state, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(kWaitForStateChangeTimeoutMsec, + GPR_TIMESPAN))); state = channel_->GetState(false); } } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 7cb7b262de5..c1227a5a1c7 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -260,11 +260,30 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { server_address_ << "localhost:" << port_; // Setup server + BuildAndStartServer(); + + gpr_tls_set(&g_is_async_end2end_test, 1); + } + + void TearDown() override { + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cq_->Shutdown(); + while (cq_->Next(&ignored_tag, &ignored_ok)) + ; + poll_overrider_.reset(); + gpr_tls_set(&g_is_async_end2end_test, 0); + grpc_recycle_unused_port(port_); + } + + void BuildAndStartServer() { ServerBuilder builder; auto server_creds = GetCredentialsProvider()->GetServerCredentials( GetParam().credentials_type); builder.AddListeningPort(server_address_.str(), server_creds); - builder.RegisterService(&service_); + service_.reset(new grpc::testing::EchoTestService::AsyncService()); + builder.RegisterService(service_.get()); if (GetParam().health_check_service) { builder.RegisterService(&health_check_); } @@ -276,20 +295,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { new ServerBuilderSyncPluginDisabler()); builder.SetOption(move(sync_plugin_disabler)); server_ = builder.BuildAndStart(); - - gpr_tls_set(&g_is_async_end2end_test, 1); - } - - void TearDown() override { - server_->Shutdown(); - void* ignored_tag; - bool ignored_ok; - cq_->Shutdown(); - while (cq_->Next(&ignored_tag, &ignored_ok)) - ; - poll_overrider_.reset(); - gpr_tls_set(&g_is_async_end2end_test, 0); - grpc_recycle_unused_port(port_); } void ResetStub() { @@ -319,8 +324,8 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -341,7 +346,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { std::unique_ptr cq_; std::unique_ptr stub_; std::unique_ptr server_; - grpc::testing::EchoTestService::AsyncService service_; + std::unique_ptr service_; HealthCheck health_check_; std::ostringstream server_address_; int port_; @@ -359,6 +364,22 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) { SendRpc(10); } +TEST_P(AsyncEnd2endTest, ReconnectChannel) { + if (GetParam().inproc) { + return; + } + ResetStub(); + SendRpc(1); + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cq_->Shutdown(); + while (cq_->Next(&ignored_tag, &ignored_ok)) + ; + BuildAndStartServer(); + SendRpc(1); +} + // We do not need to protect notify because the use is synchronized. void ServerWait(Server* server, int* notify) { server->Wait(); @@ -407,8 +428,8 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking) .Expect(2, true) @@ -444,8 +465,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { std::unique_ptr> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); Verifier(GetParam().disable_blocking) .Expect(2, true) @@ -506,8 +527,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) { std::unique_ptr> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); cli_stream->Write(send_request, tag(3)); @@ -579,8 +600,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { std::unique_ptr> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - cq_.get(), cq_.get(), tag(2)); + service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking) .Expect(1, true) @@ -635,8 +656,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) { std::unique_ptr> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - cq_.get(), cq_.get(), tag(2)); + service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking) .Expect(1, true) @@ -687,8 +708,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) { std::unique_ptr> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - cq_.get(), cq_.get(), tag(2)); + service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking) .Expect(1, true) @@ -741,8 +762,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { std::unique_ptr> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); Verifier(GetParam().disable_blocking) .Expect(1, true) @@ -801,8 +822,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) { std::unique_ptr> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); @@ -869,8 +890,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) { std::unique_ptr> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); @@ -946,8 +967,8 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); @@ -991,8 +1012,8 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); @@ -1041,8 +1062,8 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); @@ -1104,8 +1125,8 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); @@ -1168,8 +1189,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); srv_ctx.AsyncNotifyWhenDone(tag(5)); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -1203,8 +1224,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); srv_ctx.AsyncNotifyWhenDone(tag(5)); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -1295,8 +1316,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); - service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); // Client sends 3 messages (tags 3, 4 and 5) @@ -1426,8 +1447,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - cq_.get(), cq_.get(), tag(2)); + service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -1562,8 +1583,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); - service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); // Client sends the first and the only message diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 8bada48a2b1..b145c506bc2 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -238,6 +238,18 @@ class End2endTest : public ::testing::TestWithParam { int port = grpc_pick_unused_port_or_die(); server_address_ << "127.0.0.1:" << port; // Setup server + BuildAndStartServer(processor); + } + + void RestartServer(const std::shared_ptr& processor) { + if (is_server_started_) { + server_->Shutdown(); + BuildAndStartServer(processor); + } + } + + void BuildAndStartServer( + const std::shared_ptr& processor) { ServerBuilder builder; ConfigureServerBuilder(&builder); auto server_creds = GetCredentialsProvider()->GetServerCredentials( @@ -685,6 +697,16 @@ TEST_P(End2endTest, MultipleRpcs) { } } +TEST_P(End2endTest, ReconnectChannel) { + if (GetParam().inproc) { + return; + } + ResetStub(); + SendRpc(stub_.get(), 1, false); + RestartServer(std::shared_ptr()); + SendRpc(stub_.get(), 1, false); +} + TEST_P(End2endTest, RequestStreamOneRequest) { ResetStub(); EchoRequest request; From 2411bacd048227a68336082067950933920ba1c1 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 22 Aug 2017 02:14:59 -0700 Subject: [PATCH 03/19] Address review comments --- include/grpc++/channel.h | 2 -- src/cpp/client/channel_cc.cc | 3 ++- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index 73f28a182c7..3849240574f 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -31,9 +31,7 @@ struct grpc_channel; namespace grpc { class ChannelConnectivityWatcher; -} -namespace grpc { /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, public CallHook, diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 38977cb4e7e..78f5de0d041 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -50,7 +50,7 @@ void WatchStateChange(void* arg); // support. class ChannelConnectivityWatcher { public: - ChannelConnectivityWatcher(Channel* channel) + explicit ChannelConnectivityWatcher(Channel* channel) : channel_(channel), thd_id_(0), being_destroyed_(0) {} void WatchStateChangeImpl() { @@ -67,6 +67,7 @@ class ChannelConnectivityWatcher { state = channel_->GetState(false); } } + void StartWatching() { gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); From 6a6d618034fe01b00aa192f2fb44bc9f305f0519 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 22 Aug 2017 13:43:38 -0700 Subject: [PATCH 04/19] Prevent watching unsuppoted channels --- include/grpc/grpc.h | 3 +++ src/core/ext/filters/client_channel/channel_connectivity.c | 6 ++++++ src/cpp/client/channel_cc.cc | 2 +- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 943d6e4891f..5b1406b4abf 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -178,6 +178,9 @@ GRPCAPI void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag); +/** Check whether a grpc channel support connectivity watcher */ +GRPCAPI int grpc_channel_support_connectivity_watcher(grpc_channel *channel); + /** Create a call given a grpc_channel, in order to call 'method'. All completions are sent to 'completion_queue'. 'method' and 'host' need only live through the invocation of this function. diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index b83c95275f3..0a9e90d12e7 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -191,6 +191,12 @@ static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(wa); } +int grpc_channel_support_connectivity_watcher(grpc_channel *channel) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + return client_channel_elem->filter != &grpc_client_channel_filter ? 0 : 1; +} + void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 78f5de0d041..7c52752bd9c 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -101,7 +101,7 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) host_(host), c_channel_(channel) { g_gli_initializer.summon(); - if (host != "inproc") { + if (grpc_channel_support_connectivity_watcher(channel)) { connectivity_watcher_->StartWatching(); } } From a2e506e8d9a8a8a72f19e160c971edabf234bbf6 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 22 Aug 2017 16:45:44 -0700 Subject: [PATCH 05/19] Avoid using timers --- include/grpc/grpc.h | 2 +- src/cpp/client/channel_cc.cc | 29 +++++++++++++------------- test/cpp/end2end/async_end2end_test.cc | 1 + 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 5b1406b4abf..b562167b5f7 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -178,7 +178,7 @@ GRPCAPI void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag); -/** Check whether a grpc channel support connectivity watcher */ +/** Check whether a grpc channel supports connectivity watcher */ GRPCAPI int grpc_channel_support_connectivity_watcher(grpc_channel *channel); /** Create a call given a grpc_channel, in order to call 'method'. All diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 7c52752bd9c..5b696baf819 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -41,7 +41,6 @@ namespace grpc { namespace { -const int kWaitForStateChangeTimeoutMsec = 100; void WatchStateChange(void* arg); } // namespace @@ -51,32 +50,33 @@ void WatchStateChange(void* arg); class ChannelConnectivityWatcher { public: explicit ChannelConnectivityWatcher(Channel* channel) - : channel_(channel), thd_id_(0), being_destroyed_(0) {} + : channel_(channel), thd_id_(0), shutting_down_(0) {} void WatchStateChangeImpl() { grpc_connectivity_state state = GRPC_CHANNEL_IDLE; while (state != GRPC_CHANNEL_SHUTDOWN) { - if (gpr_atm_no_barrier_load(&being_destroyed_) == 1) { + channel_->WaitForStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME)); + if (gpr_atm_no_barrier_load(&shutting_down_) == 1) { break; } - channel_->WaitForStateChange( - state, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kWaitForStateChangeTimeoutMsec, - GPR_TIMESPAN))); state = channel_->GetState(false); } } void StartWatching() { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + const char* disabled_str = + std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + if (disabled_str == nullptr || strcmp(disabled_str, "1")) { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } } + void Shutdown() { gpr_atm_no_barrier_store(&shutting_down_, 1); } + void Destroy() { if (thd_id_ != 0) { - gpr_atm_no_barrier_store(&being_destroyed_, 1); gpr_thd_join(thd_id_); } } @@ -84,7 +84,7 @@ class ChannelConnectivityWatcher { private: Channel* channel_; gpr_thd_id thd_id_; - gpr_atm being_destroyed_; + gpr_atm shutting_down_; }; namespace { @@ -107,8 +107,9 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) } Channel::~Channel() { - connectivity_watcher_->Destroy(); + connectivity_watcher_->Shutdown(); grpc_channel_destroy(c_channel_); + connectivity_watcher_->Destroy(); } namespace { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index c1227a5a1c7..1d1e97a8204 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -272,6 +272,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { cq_->Shutdown(); while (cq_->Next(&ignored_tag, &ignored_ok)) ; + stub_.reset(); poll_overrider_.reset(); gpr_tls_set(&g_is_async_end2end_test, 0); grpc_recycle_unused_port(port_); From 4d88416c1126c0596f2964a4bcd482cd758d2793 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Wed, 23 Aug 2017 15:20:01 -0700 Subject: [PATCH 06/19] Remove atm in ChannelConnectivityWatcher --- src/cpp/client/channel_cc.cc | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 5b696baf819..f3e7470f5a5 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -50,15 +50,16 @@ void WatchStateChange(void* arg); class ChannelConnectivityWatcher { public: explicit ChannelConnectivityWatcher(Channel* channel) - : channel_(channel), thd_id_(0), shutting_down_(0) {} + : channel_(channel), thd_id_(0) {} void WatchStateChangeImpl() { grpc_connectivity_state state = GRPC_CHANNEL_IDLE; + bool ok = false; + void* tag = NULL; while (state != GRPC_CHANNEL_SHUTDOWN) { - channel_->WaitForStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME)); - if (gpr_atm_no_barrier_load(&shutting_down_) == 1) { - break; - } + channel_->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME), + &cq_, NULL); + cq_.Next(&tag, &ok); state = channel_->GetState(false); } } @@ -67,24 +68,33 @@ class ChannelConnectivityWatcher { const char* disabled_str = std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); if (disabled_str == nullptr || strcmp(disabled_str, "1")) { + // This NotifyOnstateChange() is not used to monitor the channel state + // change, but to hold a reference of the c channel. So that + // WatchStateChangeImpl() can observe state == GRPC_CHANNEL_SHUTDOWN + // without holding any lock on the channel object. + channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, + gpr_inf_future(GPR_CLOCK_REALTIME), + &shutdown_cq_, NULL); gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); } } - void Shutdown() { gpr_atm_no_barrier_store(&shutting_down_, 1); } - void Destroy() { if (thd_id_ != 0) { gpr_thd_join(thd_id_); + bool ok = false; + void* tag = NULL; + shutdown_cq_.Next(&tag, &ok); } } private: Channel* channel_; gpr_thd_id thd_id_; - gpr_atm shutting_down_; + CompletionQueue cq_; + CompletionQueue shutdown_cq_; }; namespace { @@ -107,7 +117,6 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) } Channel::~Channel() { - connectivity_watcher_->Shutdown(); grpc_channel_destroy(c_channel_); connectivity_watcher_->Destroy(); } From bfb4e06e3c209907e9e4e15f915b4a3cd318f42c Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Wed, 23 Aug 2017 23:34:32 -0700 Subject: [PATCH 07/19] Check connectivity intermittently --- src/cpp/client/channel_cc.cc | 17 +++++++++++++---- test/cpp/end2end/end2end_test.cc | 4 ++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f3e7470f5a5..ffe88df7327 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -35,12 +35,15 @@ #include #include #include +#include #include +#include #include "src/core/lib/profiling/timers.h" namespace grpc { namespace { +int kConnectivityCheckIntervalMsec = 100; void WatchStateChange(void* arg); } // namespace @@ -59,7 +62,13 @@ class ChannelConnectivityWatcher { while (state != GRPC_CHANNEL_SHUTDOWN) { channel_->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME), &cq_, NULL); - cq_.Next(&tag, &ok); + while (cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)) == + CompletionQueue::TIMEOUT) { + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(kConnectivityCheckIntervalMsec, + GPR_TIMESPAN))); + } state = channel_->GetState(false); } } @@ -84,10 +93,10 @@ class ChannelConnectivityWatcher { void Destroy() { if (thd_id_ != 0) { gpr_thd_join(thd_id_); - bool ok = false; - void* tag = NULL; - shutdown_cq_.Next(&tag, &ok); } + bool ok = false; + void* tag = NULL; + shutdown_cq_.Next(&tag, &ok); } private: diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index b145c506bc2..f316dd09404 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -704,6 +704,10 @@ TEST_P(End2endTest, ReconnectChannel) { ResetStub(); SendRpc(stub_.get(), 1, false); RestartServer(std::shared_ptr()); + // It needs more than 2 * kConnectivityCheckIntervalMsec time to reconnect + // the channel + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(210, GPR_TIMESPAN))); SendRpc(stub_.get(), 1, false); } From b4481a9a137b26d78228926922062f835d9a1606 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 24 Aug 2017 03:07:01 -0700 Subject: [PATCH 08/19] Share one monitoring thread between channels --- include/grpc++/channel.h | 3 - src/cpp/client/channel_cc.cc | 130 ++++++++++++++++++------------- test/cpp/end2end/end2end_test.cc | 6 +- 3 files changed, 79 insertions(+), 60 deletions(-) diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index 3849240574f..c50091d6ac1 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -30,8 +30,6 @@ struct grpc_channel; namespace grpc { -class ChannelConnectivityWatcher; - /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, public CallHook, @@ -73,7 +71,6 @@ class Channel final : public ChannelInterface, bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) override; - std::unique_ptr connectivity_watcher_; const grpc::string host_; grpc_channel* const c_channel_; // owned }; diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index ffe88df7327..f06d25b0d9b 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -43,8 +43,23 @@ namespace grpc { namespace { -int kConnectivityCheckIntervalMsec = 100; +int kConnectivityCheckIntervalMsec = 500; void WatchStateChange(void* arg); + +class TagSaver final : public CompletionQueueTag { + public: + explicit TagSaver(void* tag) : tag_(tag) {} + ~TagSaver() override {} + bool FinalizeResult(void** tag, bool* status) override { + *tag = tag_; + delete this; + return true; + } + + private: + void* tag_; +}; + } // namespace // Constantly watches channel connectivity status to reconnect a transiently @@ -52,55 +67,80 @@ void WatchStateChange(void* arg); // support. class ChannelConnectivityWatcher { public: - explicit ChannelConnectivityWatcher(Channel* channel) - : channel_(channel), thd_id_(0) {} + ChannelConnectivityWatcher() { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + + ~ChannelConnectivityWatcher() { + cq_.Shutdown(); + if (thd_id_ != 0) { + gpr_thd_join(thd_id_); + } + } void WatchStateChangeImpl() { - grpc_connectivity_state state = GRPC_CHANNEL_IDLE; bool ok = false; void* tag = NULL; - while (state != GRPC_CHANNEL_SHUTDOWN) { - channel_->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME), - &cq_, NULL); - while (cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)) == - CompletionQueue::TIMEOUT) { + CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; + while (status != CompletionQueue::SHUTDOWN) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + // Make sure we've seen 2 TIMEOUTs before going to sleep + if (status == CompletionQueue::TIMEOUT) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + } + if (status == CompletionQueue::TIMEOUT) { gpr_sleep_until( gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(kConnectivityCheckIntervalMsec, + gpr_time_from_millis(kConnectivityCheckIntervalMsec, GPR_TIMESPAN))); + } else if (status == CompletionQueue::GOT_EVENT) { + ChannelState* channel_state = static_cast(tag); + channel_state->state = grpc_channel_check_connectivity_state( + channel_state->channel, false); + if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { + void* shutdown_tag = NULL; + channel_state->shutdown_cq.Next(&shutdown_tag, &ok); + delete channel_state; + } else { + TagSaver* tag_saver = new TagSaver(channel_state); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); + } } - state = channel_->GetState(false); } } - void StartWatching() { + void StartWatching(grpc_channel* channel) { const char* disabled_str = std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); if (disabled_str == nullptr || strcmp(disabled_str, "1")) { - // This NotifyOnstateChange() is not used to monitor the channel state - // change, but to hold a reference of the c channel. So that - // WatchStateChangeImpl() can observe state == GRPC_CHANNEL_SHUTDOWN - // without holding any lock on the channel object. - channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, - gpr_inf_future(GPR_CLOCK_REALTIME), - &shutdown_cq_, NULL); - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + ChannelState* channel_state = new ChannelState(channel); + // The first grpc_channel_watch_connectivity_state() is not used to + // monitor the channel state change, but to hold a reference of the + // c channel. So that WatchStateChangeImpl() can observe state == + // GRPC_CHANNEL_SHUTDOWN without holding any lock on the channel object. + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(), + new TagSaver(nullptr)); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), + new TagSaver(channel_state)); } } - void Destroy() { - if (thd_id_ != 0) { - gpr_thd_join(thd_id_); - } - bool ok = false; - void* tag = NULL; - shutdown_cq_.Next(&tag, &ok); - } - private: - Channel* channel_; + struct ChannelState { + explicit ChannelState(grpc_channel* channel) + : channel(channel), state(GRPC_CHANNEL_IDLE){}; + grpc_channel* channel; + grpc_connectivity_state state; + CompletionQueue shutdown_cq; + }; gpr_thd_id thd_id_; CompletionQueue cq_; CompletionQueue shutdown_cq_; @@ -112,22 +152,21 @@ void WatchStateChange(void* arg) { static_cast(arg); watcher->WatchStateChangeImpl(); } + +ChannelConnectivityWatcher channel_connectivity_watcher; } // namespace static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) - : connectivity_watcher_(new ChannelConnectivityWatcher(this)), - host_(host), - c_channel_(channel) { + : host_(host), c_channel_(channel) { g_gli_initializer.summon(); if (grpc_channel_support_connectivity_watcher(channel)) { - connectivity_watcher_->StartWatching(); + channel_connectivity_watcher.StartWatching(channel); } } Channel::~Channel() { grpc_channel_destroy(c_channel_); - connectivity_watcher_->Destroy(); } namespace { @@ -213,23 +252,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { return grpc_channel_check_connectivity_state(c_channel_, try_to_connect); } -namespace { -class TagSaver final : public CompletionQueueTag { - public: - explicit TagSaver(void* tag) : tag_(tag) {} - ~TagSaver() override {} - bool FinalizeResult(void** tag, bool* status) override { - *tag = tag_; - delete this; - return true; - } - - private: - void* tag_; -}; - -} // namespace - void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, CompletionQueue* cq, void* tag) { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index f316dd09404..9954f9f9acf 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -704,10 +704,10 @@ TEST_P(End2endTest, ReconnectChannel) { ResetStub(); SendRpc(stub_.get(), 1, false); RestartServer(std::shared_ptr()); - // It needs more than 2 * kConnectivityCheckIntervalMsec time to reconnect - // the channel + // It needs more than kConnectivityCheckIntervalMsec time to reconnect the + // channel. gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(210, GPR_TIMESPAN))); + gpr_time_from_millis(510, GPR_TIMESPAN))); SendRpc(stub_.get(), 1, false); } From 33845d08d44483b20cb104d9f3a7355d912f6618 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 24 Aug 2017 03:28:51 -0700 Subject: [PATCH 09/19] Check env variable --- src/cpp/client/channel_cc.cc | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f06d25b0d9b..7b473d3ffb2 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -60,17 +60,19 @@ class TagSaver final : public CompletionQueueTag { void* tag_; }; -} // namespace - // Constantly watches channel connectivity status to reconnect a transiently // disconnected channel. This is a temporary work-around before we have retry // support. class ChannelConnectivityWatcher { public: - ChannelConnectivityWatcher() { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + ChannelConnectivityWatcher() : thd_id_(0) { + const char* disabled_str = + std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + if (disabled_str == nullptr || strcmp(disabled_str, "1")) { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } } ~ChannelConnectivityWatcher() { @@ -114,9 +116,7 @@ class ChannelConnectivityWatcher { } void StartWatching(grpc_channel* channel) { - const char* disabled_str = - std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); - if (disabled_str == nullptr || strcmp(disabled_str, "1")) { + if (thd_id_ != 0) { ChannelState* channel_state = new ChannelState(channel); // The first grpc_channel_watch_connectivity_state() is not used to // monitor the channel state change, but to hold a reference of the @@ -146,7 +146,6 @@ class ChannelConnectivityWatcher { CompletionQueue shutdown_cq_; }; -namespace { void WatchStateChange(void* arg) { ChannelConnectivityWatcher* watcher = static_cast(arg); From f1d50983ae61fe0be7a284b4cbd0beb287b0f6a8 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 24 Aug 2017 12:08:03 -0700 Subject: [PATCH 10/19] Document GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER --- doc/environment_variables.md | 6 ++++++ src/cpp/client/channel_cc.cc | 21 +++++++++++++++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/doc/environment_variables.md b/doc/environment_variables.md index a2cde927361..bb351cbdcce 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -113,3 +113,9 @@ some configuration as environment variables that can be set. - native (default)- a DNS resolver based around getaddrinfo(), creates a new thread to perform name resolution - ares - a DNS resolver based around the c-ares library + +* GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER + The channel connectivity watcher uses one extra thread to check the channel + state every 500 ms on the client side. It can help reconnect disconnected + client channels (mostly due to idleness), so that the next RPC on this channel + won't fail. Set to 1 to turn off this watcher and save a thread. diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 7b473d3ffb2..865fecfe225 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -38,7 +38,9 @@ #include #include #include +#include #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/env.h" namespace grpc { @@ -66,9 +68,20 @@ class TagSaver final : public CompletionQueueTag { class ChannelConnectivityWatcher { public: ChannelConnectivityWatcher() : thd_id_(0) { - const char* disabled_str = - std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); - if (disabled_str == nullptr || strcmp(disabled_str, "1")) { + char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + bool disabled = false; + if (env != nullptr) { + static const char* truthy[] = {"yes", "Yes", "YES", "true", + "True", "TRUE", "1"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == strcmp(env, truthy[i])) { + disabled = true; + break; + } + } + } + gpr_free(env); + if (!disabled) { gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); @@ -121,7 +134,7 @@ class ChannelConnectivityWatcher { // The first grpc_channel_watch_connectivity_state() is not used to // monitor the channel state change, but to hold a reference of the // c channel. So that WatchStateChangeImpl() can observe state == - // GRPC_CHANNEL_SHUTDOWN without holding any lock on the channel object. + // GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed. grpc_channel_watch_connectivity_state( channel_state->channel, channel_state->state, gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(), From ddaef3e32581ef9c3078f402d2a08d7dcc781272 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 24 Aug 2017 15:34:52 -0700 Subject: [PATCH 11/19] Remove non-POD global variables --- doc/environment_variables.md | 4 +- src/cpp/client/channel_cc.cc | 94 +++++++++++++++++++++++------------- 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/doc/environment_variables.md b/doc/environment_variables.md index bb351cbdcce..0123f3f25d6 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -118,4 +118,6 @@ some configuration as environment variables that can be set. The channel connectivity watcher uses one extra thread to check the channel state every 500 ms on the client side. It can help reconnect disconnected client channels (mostly due to idleness), so that the next RPC on this channel - won't fail. Set to 1 to turn off this watcher and save a thread. + won't fail. Set to 1 to turn off this watcher and save a thread. Please note + this is a temporary work-around, it will be removed in the future once we have + support for automatically reestablishing failed connections. diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 865fecfe225..418b4fa2fdf 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -41,12 +41,14 @@ #include #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/env.h" +#include "src/core/lib/support/string.h" namespace grpc { namespace { int kConnectivityCheckIntervalMsec = 500; void WatchStateChange(void* arg); +void InitConnectivityWatcherOnce(); class TagSaver final : public CompletionQueueTag { public: @@ -71,10 +73,9 @@ class ChannelConnectivityWatcher { char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); bool disabled = false; if (env != nullptr) { - static const char* truthy[] = {"yes", "Yes", "YES", "true", - "True", "TRUE", "1"}; + static const char* truthy[] = {"yes", "true", "1"}; for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { - if (0 == strcmp(env, truthy[i])) { + if (0 == gpr_stricmp(env, truthy[i])) { disabled = true; break; } @@ -82,54 +83,69 @@ class ChannelConnectivityWatcher { } gpr_free(env); if (!disabled) { + gpr_ref_init(&ref_, 0); gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); + gpr_thd_options_set_detached(&options); gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); } } - ~ChannelConnectivityWatcher() { - cq_.Shutdown(); - if (thd_id_ != 0) { - gpr_thd_join(thd_id_); - } - } - void WatchStateChangeImpl() { bool ok = false; void* tag = NULL; CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; - while (status != CompletionQueue::SHUTDOWN) { + while (true) { status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); // Make sure we've seen 2 TIMEOUTs before going to sleep if (status == CompletionQueue::TIMEOUT) { status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + if (status == CompletionQueue::TIMEOUT) { + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(kConnectivityCheckIntervalMsec, + GPR_TIMESPAN))); + continue; + } } - if (status == CompletionQueue::TIMEOUT) { - gpr_sleep_until( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kConnectivityCheckIntervalMsec, - GPR_TIMESPAN))); - } else if (status == CompletionQueue::GOT_EVENT) { - ChannelState* channel_state = static_cast(tag); - channel_state->state = grpc_channel_check_connectivity_state( - channel_state->channel, false); - if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { - void* shutdown_tag = NULL; - channel_state->shutdown_cq.Next(&shutdown_tag, &ok); - delete channel_state; - } else { - TagSaver* tag_saver = new TagSaver(channel_state); - grpc_channel_watch_connectivity_state( - channel_state->channel, channel_state->state, - gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); + ChannelState* channel_state = static_cast(tag); + channel_state->state = + grpc_channel_check_connectivity_state(channel_state->channel, false); + if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { + void* shutdown_tag = NULL; + channel_state->shutdown_cq.Next(&shutdown_tag, &ok); + delete channel_state; + if (gpr_unref(&ref_)) { + gpr_mu_lock(&g_watcher_mu_); + delete g_watcher_; + g_watcher_ = nullptr; + gpr_mu_unlock(&g_watcher_mu_); + break; } + } else { + TagSaver* tag_saver = new TagSaver(channel_state); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); } } } - void StartWatching(grpc_channel* channel) { + static void StartWatching(grpc_channel* channel) { + gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce); + gpr_mu_lock(&g_watcher_mu_); + if (g_watcher_ == nullptr) { + g_watcher_ = new ChannelConnectivityWatcher(); + } + g_watcher_->StartWatchingLocked(channel); + gpr_mu_unlock(&g_watcher_mu_); + } + + static void InitOnce() { gpr_mu_init(&g_watcher_mu_); } + + private: + void StartWatchingLocked(grpc_channel* channel) { if (thd_id_ != 0) { + gpr_ref(&ref_); ChannelState* channel_state = new ChannelState(channel); // The first grpc_channel_watch_connectivity_state() is not used to // monitor the channel state change, but to hold a reference of the @@ -146,7 +162,6 @@ class ChannelConnectivityWatcher { } } - private: struct ChannelState { explicit ChannelState(grpc_channel* channel) : channel(channel), state(GRPC_CHANNEL_IDLE){}; @@ -156,15 +171,26 @@ class ChannelConnectivityWatcher { }; gpr_thd_id thd_id_; CompletionQueue cq_; - CompletionQueue shutdown_cq_; + gpr_refcount ref_; + + static gpr_once g_connectivity_watcher_once_; + static gpr_mu g_watcher_mu_; + static ChannelConnectivityWatcher* g_watcher_; }; +gpr_once ChannelConnectivityWatcher::g_connectivity_watcher_once_ = + GPR_ONCE_INIT; +gpr_mu ChannelConnectivityWatcher::g_watcher_mu_; +ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr; + void WatchStateChange(void* arg) { ChannelConnectivityWatcher* watcher = static_cast(arg); watcher->WatchStateChangeImpl(); } +void InitConnectivityWatcherOnce() { ChannelConnectivityWatcher::InitOnce(); }; + ChannelConnectivityWatcher channel_connectivity_watcher; } // namespace @@ -173,7 +199,7 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) : host_(host), c_channel_(channel) { g_gli_initializer.summon(); if (grpc_channel_support_connectivity_watcher(channel)) { - channel_connectivity_watcher.StartWatching(channel); + ChannelConnectivityWatcher::StartWatching(channel); } } From 504be5c69379194b28aaa5eb6a1c92ad4e60739b Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 24 Aug 2017 16:42:19 -0700 Subject: [PATCH 12/19] Privatize ChannelConnectivityWatcher members --- src/cpp/client/channel_cc.cc | 42 +++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 418b4fa2fdf..1f1def4b64c 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -69,7 +69,7 @@ class TagSaver final : public CompletionQueueTag { // support. class ChannelConnectivityWatcher { public: - ChannelConnectivityWatcher() : thd_id_(0) { + static void StartWatching(grpc_channel* channel) { char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); bool disabled = false; if (env != nullptr) { @@ -83,13 +83,24 @@ class ChannelConnectivityWatcher { } gpr_free(env); if (!disabled) { - gpr_ref_init(&ref_, 0); - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_detached(&options); - gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce); + gpr_mu_lock(&g_watcher_mu_); + if (g_watcher_ == nullptr) { + g_watcher_ = new ChannelConnectivityWatcher(); + } + g_watcher_->StartWatchingLocked(channel); + gpr_mu_unlock(&g_watcher_mu_); } } + private: + ChannelConnectivityWatcher() { + gpr_ref_init(&ref_, 0); + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_detached(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + void WatchStateChangeImpl() { bool ok = false; void* tag = NULL; @@ -130,19 +141,6 @@ class ChannelConnectivityWatcher { } } - static void StartWatching(grpc_channel* channel) { - gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce); - gpr_mu_lock(&g_watcher_mu_); - if (g_watcher_ == nullptr) { - g_watcher_ = new ChannelConnectivityWatcher(); - } - g_watcher_->StartWatchingLocked(channel); - gpr_mu_unlock(&g_watcher_mu_); - } - - static void InitOnce() { gpr_mu_init(&g_watcher_mu_); } - - private: void StartWatchingLocked(grpc_channel* channel) { if (thd_id_ != 0) { gpr_ref(&ref_); @@ -162,6 +160,11 @@ class ChannelConnectivityWatcher { } } + static void InitOnce() { gpr_mu_init(&g_watcher_mu_); } + + friend void WatchStateChange(void* arg); + friend void InitConnectivityWatcherOnce(); + struct ChannelState { explicit ChannelState(grpc_channel* channel) : channel(channel), state(GRPC_CHANNEL_IDLE){}; @@ -175,6 +178,7 @@ class ChannelConnectivityWatcher { static gpr_once g_connectivity_watcher_once_; static gpr_mu g_watcher_mu_; + // protected under g_watcher_mu_ static ChannelConnectivityWatcher* g_watcher_; }; @@ -190,8 +194,6 @@ void WatchStateChange(void* arg) { } void InitConnectivityWatcherOnce() { ChannelConnectivityWatcher::InitOnce(); }; - -ChannelConnectivityWatcher channel_connectivity_watcher; } // namespace static internal::GrpcLibraryInitializer g_gli_initializer; From ba23e799dc1c4ea5f6c0139bf95cd04e0794e714 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 24 Aug 2017 23:13:09 -0700 Subject: [PATCH 13/19] Sanity fixes --- grpc.def | 1 + src/cpp/client/channel_cc.cc | 4 +--- src/ruby/ext/grpc/rb_grpc_imports.generated.c | 2 ++ src/ruby/ext/grpc/rb_grpc_imports.generated.h | 3 +++ 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/grpc.def b/grpc.def index af4bd1674f2..edb17c18e2e 100644 --- a/grpc.def +++ b/grpc.def @@ -67,6 +67,7 @@ EXPORTS grpc_channel_check_connectivity_state grpc_channel_num_external_connectivity_watchers grpc_channel_watch_connectivity_state + grpc_channel_support_connectivity_watcher grpc_channel_create_call grpc_channel_ping grpc_channel_register_call diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 1f1def4b64c..acd8271dcc2 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -205,9 +205,7 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) } } -Channel::~Channel() { - grpc_channel_destroy(c_channel_); -} +Channel::~Channel() { grpc_channel_destroy(c_channel_); } namespace { diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index e83c38841bf..169d318bca2 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -90,6 +90,7 @@ grpc_alarm_destroy_type grpc_alarm_destroy_import; grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import; grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; +grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import; grpc_channel_create_call_type grpc_channel_create_call_import; grpc_channel_ping_type grpc_channel_ping_import; grpc_channel_register_call_type grpc_channel_register_call_import; @@ -393,6 +394,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state"); grpc_channel_num_external_connectivity_watchers_import = (grpc_channel_num_external_connectivity_watchers_type) GetProcAddress(library, "grpc_channel_num_external_connectivity_watchers"); grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state"); + grpc_channel_support_connectivity_watcher_import = (grpc_channel_support_connectivity_watcher_type) GetProcAddress(library, "grpc_channel_support_connectivity_watcher"); grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call"); grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping"); grpc_channel_register_call_import = (grpc_channel_register_call_type) GetProcAddress(library, "grpc_channel_register_call"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 52887043502..1dd5ecf957f 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -251,6 +251,9 @@ extern grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_ext typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag); extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; #define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import +typedef int(*grpc_channel_support_connectivity_watcher_type)(grpc_channel *channel); +extern grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import; +#define grpc_channel_support_connectivity_watcher grpc_channel_support_connectivity_watcher_import typedef grpc_call *(*grpc_channel_create_call_type)(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, grpc_slice method, const grpc_slice *host, gpr_timespec deadline, void *reserved); extern grpc_channel_create_call_type grpc_channel_create_call_import; #define grpc_channel_create_call grpc_channel_create_call_import From 6514a0df72d9425ae5058eab6dd120179b2105d1 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Fri, 25 Aug 2017 14:09:47 -0700 Subject: [PATCH 14/19] Add gpr_is_true --- src/core/lib/iomgr/iomgr.c | 10 ++-------- src/core/lib/support/string.c | 13 +++++++++++++ src/core/lib/support/string.h | 3 +++ test/core/support/string_test.c | 16 ++++++++++++++++ 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 3d19953eebd..003ff9a21d5 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -164,13 +164,7 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { bool grpc_iomgr_abort_on_leaks(void) { char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS"); - if (env == NULL) return false; - static const char *truthy[] = {"yes", "Yes", "YES", "true", - "True", "TRUE", "1"}; - bool should_we = false; - for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { - if (0 == strcmp(env, truthy[i])) should_we = true; - } - gpr_free(env); + bool should_we = gpr_is_true(env); + grp_free(env); return should_we; } diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c index b65009754a7..ec93303024d 100644 --- a/src/core/lib/support/string.c +++ b/src/core/lib/support/string.c @@ -298,3 +298,16 @@ void *gpr_memrchr(const void *s, int c, size_t n) { } return NULL; } + +bool gpr_is_true(const char *s) { + if (s == NULL) { + return false; + } + static const char *truthy[] = {"yes", "true", "1"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == gpr_stricmp(s, truthy[i])) { + return true; + } + } + return false; +} diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h index e11df8439d9..5a56fa3a0a8 100644 --- a/src/core/lib/support/string.h +++ b/src/core/lib/support/string.h @@ -19,6 +19,7 @@ #ifndef GRPC_CORE_LIB_SUPPORT_STRING_H #define GRPC_CORE_LIB_SUPPORT_STRING_H +#include #include #include @@ -106,6 +107,8 @@ int gpr_stricmp(const char *a, const char *b); void *gpr_memrchr(const void *s, int c, size_t n); +/** Return true if lower(s) equals "true", "yes" or "1", otherwise false. */ +bool gpr_is_true(const char *s); #ifdef __cplusplus } #endif diff --git a/test/core/support/string_test.c b/test/core/support/string_test.c index a3c33c3fa4b..bee21394775 100644 --- a/test/core/support/string_test.c +++ b/test/core/support/string_test.c @@ -279,6 +279,21 @@ static void test_memrchr(void) { GPR_ASSERT(0 == strcmp((const char *)gpr_memrchr("hello", 'l', 5), "lo")); } +static void test_is_true(void) { + LOG_TEST_NAME("test_is_true"); + + GPR_ASSERT(true == gpr_is_true("True")); + GPR_ASSERT(true == gpr_is_true("true")); + GPR_ASSERT(true == gpr_is_true("TRUE")); + GPR_ASSERT(true == gpr_is_true("Yes")); + GPR_ASSERT(true == gpr_is_true("yes")); + GPR_ASSERT(true == gpr_is_true("YES")); + GPR_ASSERT(true == gpr_is_true("1")); + GPR_ASSERT(false == gpr_is_true(NULL)); + GPR_ASSERT(false == gpr_is_true("")); + GPR_ASSERT(false == gpr_is_true("0")); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); test_strdup(); @@ -292,5 +307,6 @@ int main(int argc, char **argv) { test_leftpad(); test_stricmp(); test_memrchr(); + test_is_true(); return 0; } From 4a11ecc076ad376833171e5f40105dc8e4320acd Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Fri, 25 Aug 2017 14:10:30 -0700 Subject: [PATCH 15/19] Add ChannelConnectivityWatcher::Ref()/Unref() --- src/core/lib/iomgr/iomgr.c | 2 +- src/cpp/client/channel_cc.cc | 32 +++++++++++++------------- test/cpp/end2end/async_end2end_test.cc | 4 ++++ 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 003ff9a21d5..1feea6d6288 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -165,6 +165,6 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { bool grpc_iomgr_abort_on_leaks(void) { char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS"); bool should_we = gpr_is_true(env); - grp_free(env); + gpr_free(env); return should_we; } diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index acd8271dcc2..0d2e834a7f4 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -71,16 +71,7 @@ class ChannelConnectivityWatcher { public: static void StartWatching(grpc_channel* channel) { char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); - bool disabled = false; - if (env != nullptr) { - static const char* truthy[] = {"yes", "true", "1"}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { - if (0 == gpr_stricmp(env, truthy[i])) { - disabled = true; - break; - } - } - } + bool disabled = gpr_is_true(env); gpr_free(env); if (!disabled) { gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce); @@ -125,11 +116,7 @@ class ChannelConnectivityWatcher { void* shutdown_tag = NULL; channel_state->shutdown_cq.Next(&shutdown_tag, &ok); delete channel_state; - if (gpr_unref(&ref_)) { - gpr_mu_lock(&g_watcher_mu_); - delete g_watcher_; - g_watcher_ = nullptr; - gpr_mu_unlock(&g_watcher_mu_); + if (Unref()) { break; } } else { @@ -143,7 +130,7 @@ class ChannelConnectivityWatcher { void StartWatchingLocked(grpc_channel* channel) { if (thd_id_ != 0) { - gpr_ref(&ref_); + Ref(); ChannelState* channel_state = new ChannelState(channel); // The first grpc_channel_watch_connectivity_state() is not used to // monitor the channel state change, but to hold a reference of the @@ -160,6 +147,19 @@ class ChannelConnectivityWatcher { } } + void Ref() { gpr_ref(&ref_); } + + bool Unref() { + if (gpr_unref(&ref_)) { + gpr_mu_lock(&g_watcher_mu_); + delete g_watcher_; + g_watcher_ = nullptr; + gpr_mu_unlock(&g_watcher_mu_); + return true; + } + return false; + } + static void InitOnce() { gpr_mu_init(&g_watcher_mu_); } friend void WatchStateChange(void* arg); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 1d1e97a8204..68b95077899 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -378,6 +378,10 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) { while (cq_->Next(&ignored_tag, &ignored_ok)) ; BuildAndStartServer(); + // It needs more than kConnectivityCheckIntervalMsec time to reconnect the + // channel. + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(510, GPR_TIMESPAN))); SendRpc(1); } From eeea43fa242659edabb52d34c0f17f59426ac277 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 28 Aug 2017 22:27:20 -0700 Subject: [PATCH 16/19] Increase the grace period in ReconnectChannel tests --- test/cpp/end2end/async_end2end_test.cc | 2 +- test/cpp/end2end/end2end_test.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 68b95077899..98e00ebb1fa 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -381,7 +381,7 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) { // It needs more than kConnectivityCheckIntervalMsec time to reconnect the // channel. gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(510, GPR_TIMESPAN))); + gpr_time_from_millis(1100, GPR_TIMESPAN))); SendRpc(1); } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 9954f9f9acf..364019b325d 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -707,7 +707,7 @@ TEST_P(End2endTest, ReconnectChannel) { // It needs more than kConnectivityCheckIntervalMsec time to reconnect the // channel. gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(510, GPR_TIMESPAN))); + gpr_time_from_millis(1100, GPR_TIMESPAN))); SendRpc(stub_.get(), 1, false); } From 5150cbd02d5a5d7cec64fa46225f2bb38611ba3b Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 29 Aug 2017 00:51:49 -0700 Subject: [PATCH 17/19] Fix timer shutdown process --- src/cpp/client/channel_cc.cc | 113 +++++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 45 deletions(-) diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 0d2e834a7f4..bad36a0b8c6 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -18,7 +18,10 @@ #include +#include +#include #include +#include #include #include @@ -48,7 +51,6 @@ namespace grpc { namespace { int kConnectivityCheckIntervalMsec = 500; void WatchStateChange(void* arg); -void InitConnectivityWatcherOnce(); class TagSaver final : public CompletionQueueTag { public: @@ -67,46 +69,64 @@ class TagSaver final : public CompletionQueueTag { // Constantly watches channel connectivity status to reconnect a transiently // disconnected channel. This is a temporary work-around before we have retry // support. -class ChannelConnectivityWatcher { +class ChannelConnectivityWatcher : private GrpcLibraryCodegen { public: static void StartWatching(grpc_channel* channel) { - char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); - bool disabled = gpr_is_true(env); - gpr_free(env); - if (!disabled) { - gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce); - gpr_mu_lock(&g_watcher_mu_); + if (!IsDisabled()) { + std::unique_lock lock(g_watcher_mu_); if (g_watcher_ == nullptr) { g_watcher_ = new ChannelConnectivityWatcher(); } g_watcher_->StartWatchingLocked(channel); - gpr_mu_unlock(&g_watcher_mu_); + } + } + + static void StopWatching() { + if (!IsDisabled()) { + std::unique_lock lock(g_watcher_mu_); + if (g_watcher_->StopWatchingLocked()) { + delete g_watcher_; + g_watcher_ = nullptr; + } } } private: - ChannelConnectivityWatcher() { + ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) { gpr_ref_init(&ref_, 0); gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_detached(&options); + gpr_thd_options_set_joinable(&options); gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); } + static bool IsDisabled() { + char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + bool disabled = gpr_is_true(env); + gpr_free(env); + return disabled; + } + void WatchStateChangeImpl() { bool ok = false; void* tag = NULL; CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; while (true) { - status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); - // Make sure we've seen 2 TIMEOUTs before going to sleep - if (status == CompletionQueue::TIMEOUT) { - status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); - if (status == CompletionQueue::TIMEOUT) { - gpr_sleep_until( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kConnectivityCheckIntervalMsec, - GPR_TIMESPAN))); - continue; + { + std::unique_lock lock(shutdown_mu_); + if (shutdown_) { + // Drain cq_ if the watcher is shutting down + status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)); + } else { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + // Make sure we've seen 2 TIMEOUTs before going to sleep + if (status == CompletionQueue::TIMEOUT) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + if (status == CompletionQueue::TIMEOUT) { + shutdown_cv_.wait_for(lock, std::chrono::milliseconds( + kConnectivityCheckIntervalMsec)); + continue; + } + } } } ChannelState* channel_state = static_cast(tag); @@ -116,7 +136,7 @@ class ChannelConnectivityWatcher { void* shutdown_tag = NULL; channel_state->shutdown_cq.Next(&shutdown_tag, &ok); delete channel_state; - if (Unref()) { + if (gpr_unref(&ref_)) { break; } } else { @@ -130,7 +150,8 @@ class ChannelConnectivityWatcher { void StartWatchingLocked(grpc_channel* channel) { if (thd_id_ != 0) { - Ref(); + gpr_ref(&ref_); + ++channel_count_; ChannelState* channel_state = new ChannelState(channel); // The first grpc_channel_watch_connectivity_state() is not used to // monitor the channel state change, but to hold a reference of the @@ -147,24 +168,20 @@ class ChannelConnectivityWatcher { } } - void Ref() { gpr_ref(&ref_); } - - bool Unref() { - if (gpr_unref(&ref_)) { - gpr_mu_lock(&g_watcher_mu_); - delete g_watcher_; - g_watcher_ = nullptr; - gpr_mu_unlock(&g_watcher_mu_); + bool StopWatchingLocked() { + if (--channel_count_ == 0) { + { + std::unique_lock lock(shutdown_mu_); + shutdown_ = true; + shutdown_cv_.notify_one(); + } + gpr_thd_join(thd_id_); return true; } return false; } - static void InitOnce() { gpr_mu_init(&g_watcher_mu_); } - friend void WatchStateChange(void* arg); - friend void InitConnectivityWatcherOnce(); - struct ChannelState { explicit ChannelState(grpc_channel* channel) : channel(channel), state(GRPC_CHANNEL_IDLE){}; @@ -175,16 +192,17 @@ class ChannelConnectivityWatcher { gpr_thd_id thd_id_; CompletionQueue cq_; gpr_refcount ref_; + int channel_count_; - static gpr_once g_connectivity_watcher_once_; - static gpr_mu g_watcher_mu_; - // protected under g_watcher_mu_ - static ChannelConnectivityWatcher* g_watcher_; + std::mutex shutdown_mu_; + std::condition_variable shutdown_cv_; // protected by shutdown_cv_ + bool shutdown_; // protected by shutdown_cv_ + + static std::mutex g_watcher_mu_; + static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_ }; -gpr_once ChannelConnectivityWatcher::g_connectivity_watcher_once_ = - GPR_ONCE_INIT; -gpr_mu ChannelConnectivityWatcher::g_watcher_mu_; +std::mutex ChannelConnectivityWatcher::g_watcher_mu_; ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr; void WatchStateChange(void* arg) { @@ -192,8 +210,6 @@ void WatchStateChange(void* arg) { static_cast(arg); watcher->WatchStateChangeImpl(); } - -void InitConnectivityWatcherOnce() { ChannelConnectivityWatcher::InitOnce(); }; } // namespace static internal::GrpcLibraryInitializer g_gli_initializer; @@ -205,7 +221,14 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) } } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + if (grpc_channel_support_connectivity_watcher(c_channel_)) { + grpc_channel_destroy(c_channel_); + ChannelConnectivityWatcher::StopWatching(); + } else { + grpc_channel_destroy(c_channel_); + } +} namespace { From 26b0a34fbef7ab43870f51a1971ea4578e211ac3 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 5 Sep 2017 13:32:44 -0700 Subject: [PATCH 18/19] Address review comments --- src/cpp/client/channel_cc.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index bad36a0b8c6..19a25c838fb 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -195,8 +195,8 @@ class ChannelConnectivityWatcher : private GrpcLibraryCodegen { int channel_count_; std::mutex shutdown_mu_; - std::condition_variable shutdown_cv_; // protected by shutdown_cv_ - bool shutdown_; // protected by shutdown_cv_ + std::condition_variable shutdown_cv_; // protected by shutdown_mu_ + bool shutdown_; // protected by shutdown_mu_ static std::mutex g_watcher_mu_; static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_ @@ -222,11 +222,11 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) } Channel::~Channel() { - if (grpc_channel_support_connectivity_watcher(c_channel_)) { - grpc_channel_destroy(c_channel_); + const bool stop_watching = + grpc_channel_support_connectivity_watcher(c_channel_); + grpc_channel_destroy(c_channel_); + if (stop_watching) { ChannelConnectivityWatcher::StopWatching(); - } else { - grpc_channel_destroy(c_channel_); } } From b5dd3abad9b38cbd39917543b3991acd6ec368a8 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Wed, 6 Sep 2017 11:33:22 -0700 Subject: [PATCH 19/19] Increase the grace period in ReconnectChannel tests --- test/cpp/end2end/async_end2end_test.cc | 2 +- test/cpp/end2end/end2end_test.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 98e00ebb1fa..e841a702d44 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -381,7 +381,7 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) { // It needs more than kConnectivityCheckIntervalMsec time to reconnect the // channel. gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(1100, GPR_TIMESPAN))); + gpr_time_from_millis(1600, GPR_TIMESPAN))); SendRpc(1); } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 364019b325d..1f4861a7e62 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -707,7 +707,7 @@ TEST_P(End2endTest, ReconnectChannel) { // It needs more than kConnectivityCheckIntervalMsec time to reconnect the // channel. gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(1100, GPR_TIMESPAN))); + gpr_time_from_millis(1600, GPR_TIMESPAN))); SendRpc(stub_.get(), 1, false); }