Merge pull request #12732 from y-zeng/connectivity_watcher

Client channel backup poller
pull/13069/head
Yuchen Zeng 7 years ago committed by GitHub
commit 9e6837d397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 6
      CMakeLists.txt
  3. 6
      Makefile
  4. 1
      binding.gyp
  5. 2
      build.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 14
      doc/environment_variables.md
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 4
      grpc.gyp
  12. 2
      package.xml
  13. 158
      src/core/ext/filters/client_channel/backup_poller.cc
  14. 34
      src/core/ext/filters/client_channel/backup_poller.h
  15. 3
      src/core/ext/filters/client_channel/client_channel.cc
  16. 194
      src/cpp/client/channel_cc.cc
  17. 1
      src/python/grpcio/grpc_core_dependencies.py
  18. 22
      test/cpp/end2end/async_end2end_test.cc
  19. 22
      test/cpp/end2end/end2end_test.cc
  20. 2
      tools/doxygen/Doxyfile.core.internal
  21. 3
      tools/run_tests/generated/sources_and_headers.json

@ -872,6 +872,7 @@ grpc_cc_library(
grpc_cc_library(
name = "grpc_client_channel",
srcs = [
"src/core/ext/filters/client_channel/backup_poller.cc",
"src/core/ext/filters/client_channel/channel_connectivity.cc",
"src/core/ext/filters/client_channel/client_channel.cc",
"src/core/ext/filters/client_channel/client_channel_factory.cc",
@ -894,6 +895,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/uri_parser.cc",
],
hdrs = [
"src/core/ext/filters/client_channel/backup_poller.h",
"src/core/ext/filters/client_channel/client_channel.h",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/connector.h",

@ -1153,6 +1153,7 @@ add_library(grpc
src/core/tsi/transport_security_adapter.cc
src/core/ext/transport/chttp2/server/chttp2_server.cc
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
@ -1478,6 +1479,7 @@ add_library(grpc_cronet
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/message_compress_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
@ -1768,6 +1770,7 @@ add_library(grpc_test_util
src/core/lib/transport/transport.cc
src/core/lib/transport/transport_op_string.cc
src/core/lib/debug/trace.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
@ -2033,6 +2036,7 @@ add_library(grpc_test_util_unsecure
src/core/lib/transport/transport.cc
src/core/lib/transport/transport_op_string.cc
src/core/lib/debug/trace.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
@ -2317,6 +2321,7 @@ add_library(grpc_unsecure
src/core/ext/transport/chttp2/client/insecure/channel_create.cc
src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
src/core/ext/transport/chttp2/client/chttp2_connector.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
@ -3053,6 +3058,7 @@ add_library(grpc++_cronet
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/message_compress_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc

@ -3153,6 +3153,7 @@ LIBGRPC_SRC = \
src/core/tsi/transport_security_adapter.cc \
src/core/ext/transport/chttp2/server/chttp2_server.cc \
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
@ -3477,6 +3478,7 @@ LIBGRPC_CRONET_SRC = \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
@ -3765,6 +3767,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/transport/transport.cc \
src/core/lib/transport/transport_op_string.cc \
src/core/lib/debug/trace.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
@ -4020,6 +4023,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/lib/transport/transport.cc \
src/core/lib/transport/transport_op_string.cc \
src/core/lib/debug/trace.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
@ -4281,6 +4285,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/chttp2/client/insecure/channel_create.cc \
src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc \
src/core/ext/transport/chttp2/client/chttp2_connector.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
@ -4995,6 +5000,7 @@ LIBGRPC++_CRONET_SRC = \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \

@ -853,6 +853,7 @@
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',

@ -463,6 +463,7 @@ filegroups:
- grpc_trace_headers
- name: grpc_client_channel
headers:
- src/core/ext/filters/client_channel/backup_poller.h
- src/core/ext/filters/client_channel/client_channel.h
- src/core/ext/filters/client_channel/client_channel_factory.h
- src/core/ext/filters/client_channel/connector.h
@ -482,6 +483,7 @@ filegroups:
- src/core/ext/filters/client_channel/subchannel_index.h
- src/core/ext/filters/client_channel/uri_parser.h
src:
- src/core/ext/filters/client_channel/backup_poller.cc
- src/core/ext/filters/client_channel/channel_connectivity.cc
- src/core/ext/filters/client_channel/client_channel.cc
- src/core/ext/filters/client_channel/client_channel_factory.cc

@ -278,6 +278,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/tsi/transport_security_adapter.cc \
src/core/ext/transport/chttp2/server/chttp2_server.cc \
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \

@ -255,6 +255,7 @@ if (PHP_GRPC != "no") {
"src\\core\\tsi\\transport_security_adapter.cc " +
"src\\core\\ext\\transport\\chttp2\\server\\chttp2_server.cc " +
"src\\core\\ext\\transport\\chttp2\\client\\secure\\secure_channel_create.cc " +
"src\\core\\ext\\filters\\client_channel\\backup_poller.cc " +
"src\\core\\ext\\filters\\client_channel\\channel_connectivity.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " +

@ -120,10 +120,10 @@ some configuration as environment variables that can be set.
perform name resolution
- ares - a DNS resolver based around the c-ares library
* GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER
The channel connectivity watcher uses one extra thread to check the channel
state every 500 ms on the client side. It can help reconnect disconnected
client channels (mostly due to idleness), so that the next RPC on this channel
won't fail. Set to 1 to turn off this watcher and save a thread. Please note
this is a temporary work-around, it will be removed in the future once we have
support for automatically reestablishing failed connections.
* GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS
Default: 5000
Declares the interval between two backup polls on client channels. These polls
are run in the timer thread so that gRPC can process connection failures while
there is no active polling thread. They help reconnect disconnected client
channels (mostly due to idleness), so that the next RPC on this channel won't
fail. Set to 0 to turn off the backup polls.

@ -299,6 +299,7 @@ Pod::Spec.new do |s|
'src/core/tsi/transport_security_adapter.h',
'src/core/tsi/transport_security_interface.h',
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/filters/client_channel/backup_poller.h',
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/connector.h',
@ -669,6 +670,7 @@ Pod::Spec.new do |s|
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
@ -804,6 +806,7 @@ Pod::Spec.new do |s|
'src/core/tsi/transport_security_adapter.h',
'src/core/tsi/transport_security_interface.h',
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/filters/client_channel/backup_poller.h',
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/connector.h',

@ -230,6 +230,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/tsi/transport_security_adapter.h )
s.files += %w( src/core/tsi/transport_security_interface.h )
s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.h )
s.files += %w( src/core/ext/filters/client_channel/backup_poller.h )
s.files += %w( src/core/ext/filters/client_channel/client_channel.h )
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.h )
s.files += %w( src/core/ext/filters/client_channel/connector.h )
@ -604,6 +605,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/tsi/transport_security_adapter.cc )
s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.cc )
s.files += %w( src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc )
s.files += %w( src/core/ext/filters/client_channel/backup_poller.cc )
s.files += %w( src/core/ext/filters/client_channel/channel_connectivity.cc )
s.files += %w( src/core/ext/filters/client_channel/client_channel.cc )
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.cc )

@ -419,6 +419,7 @@
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
@ -662,6 +663,7 @@
'src/core/lib/transport/transport.cc',
'src/core/lib/transport/transport_op_string.cc',
'src/core/lib/debug/trace.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
@ -869,6 +871,7 @@
'src/core/lib/transport/transport.cc',
'src/core/lib/transport/transport_op_string.cc',
'src/core/lib/debug/trace.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
@ -1094,6 +1097,7 @@
'src/core/ext/transport/chttp2/client/insecure/channel_create.cc',
'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc',
'src/core/ext/transport/chttp2/client/chttp2_connector.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',

@ -242,6 +242,7 @@
<file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security_interface.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/chttp2_server.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/backup_poller.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/connector.h" role="src" />
@ -616,6 +617,7 @@
<file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/chttp2_server.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/backup_poller.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/channel_connectivity.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.cc" role="src" />

@ -0,0 +1,158 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#define DEFAULT_POLL_INTERVAL_MS 5000
typedef struct backup_poller {
grpc_timer polling_timer;
grpc_closure run_poller_closure;
grpc_closure shutdown_closure;
gpr_mu* pollset_mu;
grpc_pollset* pollset; // guarded by pollset_mu
bool shutting_down; // guarded by pollset_mu
gpr_refcount refs;
gpr_refcount shutdown_refs;
} backup_poller;
static gpr_once g_once = GPR_ONCE_INIT;
static gpr_mu g_poller_mu;
static backup_poller* g_poller = NULL; // guarded by g_poller_mu
// g_poll_interval_ms is set only once at the first time
// grpc_client_channel_start_backup_polling() is called, after that it is
// treated as const.
static int g_poll_interval_ms = DEFAULT_POLL_INTERVAL_MS;
static void init_globals() {
gpr_mu_init(&g_poller_mu);
char* env = gpr_getenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS");
if (env != NULL) {
int poll_interval_ms = gpr_parse_nonnegative_int(env);
if (poll_interval_ms == -1) {
gpr_log(GPR_ERROR,
"Invalid GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS: %s, "
"default value %d will be used.",
env, g_poll_interval_ms);
} else {
g_poll_interval_ms = poll_interval_ms;
}
}
gpr_free(env);
}
static void backup_poller_shutdown_unref(grpc_exec_ctx* exec_ctx,
backup_poller* p) {
if (gpr_unref(&p->shutdown_refs)) {
grpc_pollset_destroy(exec_ctx, p->pollset);
gpr_free(p->pollset);
gpr_free(p);
}
}
static void done_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
backup_poller_shutdown_unref(exec_ctx, (backup_poller*)arg);
}
static void g_poller_unref(grpc_exec_ctx* exec_ctx) {
if (gpr_unref(&g_poller->refs)) {
gpr_mu_lock(&g_poller_mu);
backup_poller* p = g_poller;
g_poller = NULL;
gpr_mu_unlock(&g_poller_mu);
gpr_mu_lock(p->pollset_mu);
p->shutting_down = true;
grpc_pollset_shutdown(exec_ctx, p->pollset,
GRPC_CLOSURE_INIT(&p->shutdown_closure, done_poller,
p, grpc_schedule_on_exec_ctx));
gpr_mu_unlock(p->pollset_mu);
grpc_timer_cancel(exec_ctx, &p->polling_timer);
}
}
static void run_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
backup_poller* p = (backup_poller*)arg;
if (error != GRPC_ERROR_NONE) {
if (error != GRPC_ERROR_CANCELLED) {
GRPC_LOG_IF_ERROR("run_poller", GRPC_ERROR_REF(error));
}
backup_poller_shutdown_unref(exec_ctx, p);
return;
}
gpr_mu_lock(p->pollset_mu);
if (p->shutting_down) {
gpr_mu_unlock(p->pollset_mu);
backup_poller_shutdown_unref(exec_ctx, p);
return;
}
grpc_error* err = grpc_pollset_work(exec_ctx, p->pollset, NULL,
grpc_exec_ctx_now(exec_ctx));
gpr_mu_unlock(p->pollset_mu);
GRPC_LOG_IF_ERROR("Run client channel backup poller", err);
grpc_timer_init(exec_ctx, &p->polling_timer,
grpc_exec_ctx_now(exec_ctx) + g_poll_interval_ms,
&p->run_poller_closure);
}
void grpc_client_channel_start_backup_polling(
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) {
gpr_once_init(&g_once, init_globals);
if (g_poll_interval_ms == 0) {
return;
}
gpr_mu_lock(&g_poller_mu);
if (g_poller == NULL) {
g_poller = (backup_poller*)gpr_zalloc(sizeof(backup_poller));
g_poller->pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
g_poller->shutting_down = false;
grpc_pollset_init(g_poller->pollset, &g_poller->pollset_mu);
gpr_ref_init(&g_poller->refs, 0);
// one for timer cancellation, one for pollset shutdown
gpr_ref_init(&g_poller->shutdown_refs, 2);
GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &g_poller->polling_timer,
grpc_exec_ctx_now(exec_ctx) + g_poll_interval_ms,
&g_poller->run_poller_closure);
}
gpr_ref(&g_poller->refs);
gpr_mu_unlock(&g_poller_mu);
grpc_pollset_set_add_pollset(exec_ctx, interested_parties, g_poller->pollset);
}
void grpc_client_channel_stop_backup_polling(
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) {
if (g_poll_interval_ms == 0) {
return;
}
grpc_pollset_set_del_pollset(exec_ctx, interested_parties, g_poller->pollset);
g_poller_unref(exec_ctx);
}

@ -0,0 +1,34 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/exec_ctx.h"
/* Start polling \a interested_parties periodically in the timer thread */
void grpc_client_channel_start_backup_polling(
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties);
/* Stop polling \a interested_parties */
void grpc_client_channel_stop_backup_polling(
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H */

@ -31,6 +31,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
@ -712,6 +713,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
grpc_client_channel_start_backup_polling(exec_ctx, chand->interested_parties);
// Record client channel factory.
const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
@ -790,6 +792,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
grpc_client_channel_stop_backup_polling(exec_ctx, chand->interested_parties);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");

@ -48,187 +48,13 @@
namespace grpc {
namespace {
int kConnectivityCheckIntervalMsec = 500;
void WatchStateChange(void* arg);
class TagSaver final : public internal::CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() override {}
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
delete this;
return true;
}
private:
void* tag_;
};
// Constantly watches channel connectivity status to reconnect a transiently
// disconnected channel. This is a temporary work-around before we have retry
// support.
class ChannelConnectivityWatcher : private GrpcLibraryCodegen {
public:
static void StartWatching(grpc_channel* channel) {
if (!IsDisabled()) {
std::unique_lock<std::mutex> lock(g_watcher_mu_);
if (g_watcher_ == nullptr) {
g_watcher_ = new ChannelConnectivityWatcher();
}
g_watcher_->StartWatchingLocked(channel);
}
}
static void StopWatching() {
if (!IsDisabled()) {
std::unique_lock<std::mutex> lock(g_watcher_mu_);
if (g_watcher_->StopWatchingLocked()) {
delete g_watcher_;
g_watcher_ = nullptr;
}
}
}
private:
ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) {
gpr_ref_init(&ref_, 0);
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
}
static bool IsDisabled() {
char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
bool disabled = gpr_is_true(env);
gpr_free(env);
return disabled;
}
void WatchStateChangeImpl() {
bool ok = false;
void* tag = NULL;
CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
while (true) {
{
std::unique_lock<std::mutex> lock(shutdown_mu_);
if (shutdown_) {
// Drain cq_ if the watcher is shutting down
status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME));
} else {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
// Make sure we've seen 2 TIMEOUTs before going to sleep
if (status == CompletionQueue::TIMEOUT) {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
if (status == CompletionQueue::TIMEOUT) {
shutdown_cv_.wait_for(lock, std::chrono::milliseconds(
kConnectivityCheckIntervalMsec));
continue;
}
}
}
}
ChannelState* channel_state = static_cast<ChannelState*>(tag);
channel_state->state =
grpc_channel_check_connectivity_state(channel_state->channel, false);
if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
void* shutdown_tag = NULL;
channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
delete channel_state;
if (gpr_unref(&ref_)) {
break;
}
} else {
TagSaver* tag_saver = new TagSaver(channel_state);
grpc_channel_watch_connectivity_state(
channel_state->channel, channel_state->state,
gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
}
}
}
void StartWatchingLocked(grpc_channel* channel) {
if (thd_id_ != 0) {
gpr_ref(&ref_);
++channel_count_;
ChannelState* channel_state = new ChannelState(channel);
// The first grpc_channel_watch_connectivity_state() is not used to
// monitor the channel state change, but to hold a reference of the
// c channel. So that WatchStateChangeImpl() can observe state ==
// GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed.
grpc_channel_watch_connectivity_state(
channel_state->channel, channel_state->state,
gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(),
new TagSaver(nullptr));
grpc_channel_watch_connectivity_state(
channel_state->channel, channel_state->state,
gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(),
new TagSaver(channel_state));
}
}
bool StopWatchingLocked() {
if (--channel_count_ == 0) {
{
std::unique_lock<std::mutex> lock(shutdown_mu_);
shutdown_ = true;
shutdown_cv_.notify_one();
}
gpr_thd_join(thd_id_);
return true;
}
return false;
}
friend void WatchStateChange(void* arg);
struct ChannelState {
explicit ChannelState(grpc_channel* channel)
: channel(channel), state(GRPC_CHANNEL_IDLE){};
grpc_channel* channel;
grpc_connectivity_state state;
CompletionQueue shutdown_cq;
};
gpr_thd_id thd_id_;
CompletionQueue cq_;
gpr_refcount ref_;
int channel_count_;
std::mutex shutdown_mu_;
std::condition_variable shutdown_cv_; // protected by shutdown_mu_
bool shutdown_; // protected by shutdown_mu_
static std::mutex g_watcher_mu_;
static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_
};
std::mutex ChannelConnectivityWatcher::g_watcher_mu_;
ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr;
void WatchStateChange(void* arg) {
ChannelConnectivityWatcher* watcher =
static_cast<ChannelConnectivityWatcher*>(arg);
watcher->WatchStateChangeImpl();
}
} // namespace
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {
g_gli_initializer.summon();
if (grpc_channel_support_connectivity_watcher(channel)) {
ChannelConnectivityWatcher::StartWatching(channel);
}
}
Channel::~Channel() {
const bool stop_watching =
grpc_channel_support_connectivity_watcher(c_channel_);
grpc_channel_destroy(c_channel_);
if (stop_watching) {
ChannelConnectivityWatcher::StopWatching();
}
}
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
namespace {
@ -315,6 +141,24 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
namespace {
class TagSaver final : public internal::CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() override {}
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
delete this;
return true;
}
private:
void* tag_;
};
} // namespace
void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {

@ -254,6 +254,7 @@ CORE_SOURCE_FILES = [
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',

@ -28,12 +28,14 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/tls.h>
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/support/env.h"
#include "src/proto/grpc/health/v1/health.grpc.pb.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -459,6 +461,15 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) {
if (GetParam().inproc) {
return;
}
gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200");
int poller_slowdown_factor = 1;
// It needs 2 pollset_works to reconnect the channel with polling engine
// "poll"
char* s = gpr_getenv("GRPC_POLL_STRATEGY");
if (s != NULL && 0 == strcmp(s, "poll")) {
poller_slowdown_factor = 2;
}
gpr_free(s);
ResetStub();
SendRpc(1);
server_->Shutdown();
@ -468,10 +479,13 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) {
while (cq_->Next(&ignored_tag, &ignored_ok))
;
BuildAndStartServer();
// It needs more than kConnectivityCheckIntervalMsec time to reconnect the
// channel.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1600, GPR_TIMESPAN)));
// It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
// reconnect the channel.
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(
300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
GPR_TIMESPAN)));
SendRpc(1);
}

@ -30,11 +30,13 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/support/env.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@ -704,13 +706,25 @@ TEST_P(End2endTest, ReconnectChannel) {
if (GetParam().inproc) {
return;
}
gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200");
int poller_slowdown_factor = 1;
// It needs 2 pollset_works to reconnect the channel with polling engine
// "poll"
char* s = gpr_getenv("GRPC_POLL_STRATEGY");
if (s != NULL && 0 == strcmp(s, "poll")) {
poller_slowdown_factor = 2;
}
gpr_free(s);
ResetStub();
SendRpc(stub_.get(), 1, false);
RestartServer(std::shared_ptr<AuthMetadataProcessor>());
// It needs more than kConnectivityCheckIntervalMsec time to reconnect the
// channel.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1600, GPR_TIMESPAN)));
// It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
// reconnect the channel.
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(
300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
GPR_TIMESPAN)));
SendRpc(stub_.get(), 1, false);
}

@ -907,6 +907,8 @@ src/core/ext/census/trace_string.h \
src/core/ext/census/tracing.cc \
src/core/ext/census/tracing.h \
src/core/ext/filters/client_channel/README.md \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/backup_poller.h \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel.h \

@ -8467,6 +8467,7 @@
"grpc_deadline_filter"
],
"headers": [
"src/core/ext/filters/client_channel/backup_poller.h",
"src/core/ext/filters/client_channel/client_channel.h",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/connector.h",
@ -8490,6 +8491,8 @@
"language": "c",
"name": "grpc_client_channel",
"src": [
"src/core/ext/filters/client_channel/backup_poller.cc",
"src/core/ext/filters/client_channel/backup_poller.h",
"src/core/ext/filters/client_channel/channel_connectivity.cc",
"src/core/ext/filters/client_channel/client_channel.cc",
"src/core/ext/filters/client_channel/client_channel.h",

Loading…
Cancel
Save