add client idle filter

pull/19634/head
Qiancheng Zhao 5 years ago
parent 970ae3c112
commit 29480c4f6b
  1. 12
      BUILD
  2. 1
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 2
      Makefile
  5. 8
      build.yaml
  6. 2
      config.m4
  7. 2
      config.w32
  8. 1
      gRPC-Core.podspec
  9. 1
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 13
      include/grpc/impl/codegen/grpc_types.h
  12. 1
      package.xml
  13. 47
      src/core/ext/filters/client_channel/client_channel.cc
  14. 183
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  15. 11
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  16. 264
      src/core/ext/filters/client_idle/client_idle_filter.cc
  17. 2
      src/core/ext/filters/max_age/max_age_filter.cc
  18. 2
      src/core/lib/iomgr/error.cc
  19. 2
      src/core/lib/iomgr/error.h
  20. 4
      src/core/plugin_registry/grpc_plugin_registry.cc
  21. 4
      src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
  22. 1
      src/python/grpcio/grpc_core_dependencies.py
  23. 28
      test/cpp/end2end/client_lb_end2end_test.cc
  24. 1
      tools/doxygen/Doxyfile.core.internal
  25. 17
      tools/run_tests/generated/sources_and_headers.json

12
BUILD

@ -998,6 +998,7 @@ grpc_cc_library(
"grpc_client_authority_filter", "grpc_client_authority_filter",
"grpc_lb_policy_pick_first", "grpc_lb_policy_pick_first",
"grpc_lb_policy_round_robin", "grpc_lb_policy_round_robin",
"grpc_client_idle_filter",
"grpc_max_age_filter", "grpc_max_age_filter",
"grpc_message_size_filter", "grpc_message_size_filter",
"grpc_resolver_dns_ares", "grpc_resolver_dns_ares",
@ -1085,6 +1086,17 @@ grpc_cc_library(
], ],
) )
grpc_cc_library(
name = "grpc_client_idle_filter",
srcs = [
"src/core/ext/filters/client_idle/client_idle_filter.cc",
],
language = "c++",
deps = [
"grpc_base",
],
)
grpc_cc_library( grpc_cc_library(
name = "grpc_max_age_filter", name = "grpc_max_age_filter",
srcs = [ srcs = [

@ -348,6 +348,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/subchannel_interface.h", "src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.cc", "src/core/ext/filters/client_channel/subchannel_pool_interface.cc",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h", "src/core/ext/filters/client_channel/subchannel_pool_interface.h",
"src/core/ext/filters/client_idle/client_idle_filter.cc",
"src/core/ext/filters/deadline/deadline_filter.cc", "src/core/ext/filters/deadline/deadline_filter.cc",
"src/core/ext/filters/deadline/deadline_filter.h", "src/core/ext/filters/deadline/deadline_filter.h",
"src/core/ext/filters/http/client/http_client_filter.cc", "src/core/ext/filters/http/client/http_client_filter.cc",

@ -1326,6 +1326,7 @@ add_library(grpc
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
src/core/ext/filters/census/grpc_context.cc src/core/ext/filters/census/grpc_context.cc
src/core/ext/filters/client_idle/client_idle_filter.cc
src/core/ext/filters/max_age/max_age_filter.cc src/core/ext/filters/max_age/max_age_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/client_authority_filter.cc
@ -2757,6 +2758,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
src/core/ext/filters/census/grpc_context.cc src/core/ext/filters/census/grpc_context.cc
src/core/ext/filters/client_idle/client_idle_filter.cc
src/core/ext/filters/max_age/max_age_filter.cc src/core/ext/filters/max_age/max_age_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/client_authority_filter.cc

@ -3843,6 +3843,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc \ src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/census/grpc_context.cc \ src/core/ext/filters/census/grpc_context.cc \
src/core/ext/filters/client_idle/client_idle_filter.cc \
src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/client_authority_filter.cc \
@ -5210,6 +5211,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/census/grpc_context.cc \ src/core/ext/filters/census/grpc_context.cc \
src/core/ext/filters/client_idle/client_idle_filter.cc \
src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/client_authority_filter.cc \

@ -634,6 +634,12 @@ filegroups:
- grpc_base - grpc_base
- grpc_deadline_filter - grpc_deadline_filter
- health_proto - health_proto
- name: grpc_client_idle_filter
src:
- src/core/ext/filters/client_idle/client_idle_filter.cc
plugin: grpc_client_idle_filter
uses:
- grpc_base
- name: grpc_codegen - name: grpc_codegen
public_headers: public_headers:
- include/grpc/impl/codegen/byte_buffer.h - include/grpc/impl/codegen/byte_buffer.h
@ -1606,6 +1612,7 @@ libs:
- grpc_resolver_fake - grpc_resolver_fake
- grpc_secure - grpc_secure
- census - census
- grpc_client_idle_filter
- grpc_max_age_filter - grpc_max_age_filter
- grpc_message_size_filter - grpc_message_size_filter
- grpc_deadline_filter - grpc_deadline_filter
@ -1679,6 +1686,7 @@ libs:
- grpc_lb_policy_pick_first - grpc_lb_policy_pick_first
- grpc_lb_policy_round_robin - grpc_lb_policy_round_robin
- census - census
- grpc_client_idle_filter
- grpc_max_age_filter - grpc_max_age_filter
- grpc_message_size_filter - grpc_message_size_filter
- grpc_deadline_filter - grpc_deadline_filter

@ -417,6 +417,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc \ src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/census/grpc_context.cc \ src/core/ext/filters/census/grpc_context.cc \
src/core/ext/filters/client_idle/client_idle_filter.cc \
src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/client_authority_filter.cc \
@ -702,6 +703,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/dns/native) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/dns/native)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/fake) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/fake)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/sockaddr) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/sockaddr)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_idle)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/deadline) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/deadline)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/client) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/client)

@ -392,6 +392,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\native\\dns_resolver.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\native\\dns_resolver.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr\\sockaddr_resolver.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr\\sockaddr_resolver.cc " +
"src\\core\\ext\\filters\\census\\grpc_context.cc " + "src\\core\\ext\\filters\\census\\grpc_context.cc " +
"src\\core\\ext\\filters\\client_idle\\client_idle_filter.cc " +
"src\\core\\ext\\filters\\max_age\\max_age_filter.cc " + "src\\core\\ext\\filters\\max_age\\max_age_filter.cc " +
"src\\core\\ext\\filters\\message_size\\message_size_filter.cc " + "src\\core\\ext\\filters\\message_size\\message_size_filter.cc " +
"src\\core\\ext\\filters\\http\\client_authority_filter.cc " + "src\\core\\ext\\filters\\http\\client_authority_filter.cc " +
@ -712,6 +713,7 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\resolver\\dns\\native"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\resolver\\dns\\native");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\resolver\\fake"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\resolver\\fake");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_idle");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\deadline"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\deadline");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\client"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\client");

@ -879,6 +879,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/client_idle/client_idle_filter.cc',
'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc',

@ -816,6 +816,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc )
s.files += %w( src/core/ext/filters/census/grpc_context.cc ) s.files += %w( src/core/ext/filters/census/grpc_context.cc )
s.files += %w( src/core/ext/filters/client_idle/client_idle_filter.cc )
s.files += %w( src/core/ext/filters/max_age/max_age_filter.cc ) s.files += %w( src/core/ext/filters/max_age/max_age_filter.cc )
s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc ) s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc )
s.files += %w( src/core/ext/filters/http/client_authority_filter.cc ) s.files += %w( src/core/ext/filters/http/client_authority_filter.cc )

@ -599,6 +599,7 @@
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/client_idle/client_idle_filter.cc',
'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc',
@ -1380,6 +1381,7 @@
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/client_idle/client_idle_filter.cc',
'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc',

@ -157,8 +157,9 @@ typedef struct {
/** Maximum message length that the channel can send. Int valued, bytes. /** Maximum message length that the channel can send. Int valued, bytes.
-1 means unlimited. */ -1 means unlimited. */
#define GRPC_ARG_MAX_SEND_MESSAGE_LENGTH "grpc.max_send_message_length" #define GRPC_ARG_MAX_SEND_MESSAGE_LENGTH "grpc.max_send_message_length"
/** Maximum time that a channel may have no outstanding rpcs. Int valued, /** Maximum time that a channel may have no outstanding rpcs, after which the
milliseconds. INT_MAX means unlimited. */ * server will close the connection. Int valued, milliseconds. INT_MAX means
* unlimited. */
#define GRPC_ARG_MAX_CONNECTION_IDLE_MS "grpc.max_connection_idle_ms" #define GRPC_ARG_MAX_CONNECTION_IDLE_MS "grpc.max_connection_idle_ms"
/** Maximum time that a channel may exist. Int valued, milliseconds. /** Maximum time that a channel may exist. Int valued, milliseconds.
* INT_MAX means unlimited. */ * INT_MAX means unlimited. */
@ -166,6 +167,14 @@ typedef struct {
/** Grace period after the channel reaches its max age. Int valued, /** Grace period after the channel reaches its max age. Int valued,
milliseconds. INT_MAX means unlimited. */ milliseconds. INT_MAX means unlimited. */
#define GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS "grpc.max_connection_age_grace_ms" #define GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS "grpc.max_connection_age_grace_ms"
/** Timeout after the last RPC finishes on the client channel at which the
* channel goes back into IDLE state. Int valued, milliseconds. INT_MAX means
* unlimited. */
/** TODO(qianchengz): Currently the default value is INT_MAX, which means the
* client idle filter is disabled by default. After the client idle filter
* proves no perfomance issue, we will change the default value to a reasonable
* value. */
#define GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS "grpc.client_idle_timeout_ms"
/** Enable/disable support for per-message compression. Defaults to 1, unless /** Enable/disable support for per-message compression. Defaults to 1, unless
GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */ GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */
#define GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION "grpc.per_message_compression" #define GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION "grpc.per_message_compression"

@ -821,6 +821,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/census/grpc_context.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/census/grpc_context.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_idle/client_idle_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/max_age/max_age_filter.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/max_age/max_age_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/message_size/message_size_filter.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/message_size/message_size_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/client_authority_filter.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/http/client_authority_filter.cc" role="src" />

@ -726,6 +726,13 @@ class ChannelData::ConnectivityStateAndPickerSetter {
ChannelData* chand, grpc_connectivity_state state, const char* reason, ChannelData* chand, grpc_connectivity_state state, const char* reason,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
: chand_(chand), picker_(std::move(picker)) { : chand_(chand), picker_(std::move(picker)) {
// Clean the control plane when entering IDLE, while holding control plane
// combiner.
if (picker_ == nullptr) {
chand->health_check_service_name_.reset();
chand->saved_service_config_.reset();
chand->received_first_resolver_result_ = false;
}
// Update connectivity state here, while holding control plane combiner. // Update connectivity state here, while holding control plane combiner.
grpc_connectivity_state_set(&chand->state_tracker_, state, reason); grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
if (chand->channelz_node_ != nullptr) { if (chand->channelz_node_ != nullptr) {
@ -749,6 +756,12 @@ class ChannelData::ConnectivityStateAndPickerSetter {
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg); auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
// Update picker. // Update picker.
self->chand_->picker_ = std::move(self->picker_); self->chand_->picker_ = std::move(self->picker_);
// Clean the data plane if the updated picker is nullptr.
if (self->chand_->picker_ == nullptr) {
self->chand_->received_service_config_data_ = false;
self->chand_->retry_throttle_data_.reset();
self->chand_->service_config_.reset();
}
// Re-process queued picks. // Re-process queued picks.
for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr; for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
pick = pick->next) { pick = pick->next) {
@ -1486,19 +1499,31 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
chand->resolving_lb_policy_->ResetBackoffLocked(); chand->resolving_lb_policy_->ResetBackoffLocked();
} }
} }
// Disconnect. // Disconnect or enter IDLE.
if (op->disconnect_with_error != GRPC_ERROR_NONE) { if (op->disconnect_with_error != GRPC_ERROR_NONE) {
grpc_error* error = GRPC_ERROR_NONE;
GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
&error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE));
chand->DestroyResolvingLoadBalancingPolicyLocked(); chand->DestroyResolvingLoadBalancingPolicyLocked();
// Will delete itself. intptr_t value;
New<ConnectivityStateAndPickerSetter>( if (grpc_error_get_int(op->disconnect_with_error,
chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API", GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
UniquePtr<LoadBalancingPolicy::SubchannelPicker>( static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
New<LoadBalancingPolicy::TransientFailurePicker>( if (chand->disconnect_error() == GRPC_ERROR_NONE) {
GRPC_ERROR_REF(op->disconnect_with_error)))); // Enter IDLE state.
New<ConnectivityStateAndPickerSetter>(chand, GRPC_CHANNEL_IDLE,
"channel entering IDLE", nullptr);
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
} else {
// Disconnect.
grpc_error* error = GRPC_ERROR_NONE;
GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
&error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE));
New<ConnectivityStateAndPickerSetter>(
chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
New<LoadBalancingPolicy::TransientFailurePicker>(
GRPC_ERROR_REF(op->disconnect_with_error))));
}
} }
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op"); GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);

@ -60,7 +60,13 @@ class FakeResolver : public Resolver {
virtual ~FakeResolver(); virtual ~FakeResolver();
void ShutdownLocked() override { active_ = false; } void ShutdownLocked() override {
shutdown_ = true;
if (response_generator_ != nullptr) {
response_generator_->SetFakeResolver(nullptr);
response_generator_.reset();
}
}
void MaybeSendResultLocked(); void MaybeSendResultLocked();
@ -68,6 +74,7 @@ class FakeResolver : public Resolver {
// passed-in parameters // passed-in parameters
grpc_channel_args* channel_args_ = nullptr; grpc_channel_args* channel_args_ = nullptr;
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
// If has_next_result_ is true, next_result_ is the next resolution result // If has_next_result_ is true, next_result_ is the next resolution result
// to be returned. // to be returned.
bool has_next_result_ = false; bool has_next_result_ = false;
@ -76,8 +83,10 @@ class FakeResolver : public Resolver {
// RequestReresolutionLocked(). // RequestReresolutionLocked().
bool has_reresolution_result_ = false; bool has_reresolution_result_ = false;
Result reresolution_result_; Result reresolution_result_;
// True between the calls to StartLocked() ShutdownLocked(). // True after the call to StartLocked().
bool active_ = false; bool started_ = false;
// True after the call to ShutdownLocked().
bool shutdown_ = false;
// if true, return failure // if true, return failure
bool return_failure_ = false; bool return_failure_ = false;
// pending re-resolution // pending re-resolution
@ -86,11 +95,11 @@ class FakeResolver : public Resolver {
}; };
FakeResolver::FakeResolver(ResolverArgs args) FakeResolver::FakeResolver(ResolverArgs args)
: Resolver(args.combiner, std::move(args.result_handler)) { : Resolver(args.combiner, std::move(args.result_handler)),
response_generator_(
FakeResolverResponseGenerator::GetFromArgs(args.args)) {
GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this, GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this,
grpc_combiner_scheduler(combiner())); grpc_combiner_scheduler(combiner()));
FakeResolverResponseGenerator* response_generator =
FakeResolverResponseGenerator::GetFromArgs(args.args);
// Channels sharing the same subchannels may have different resolver response // Channels sharing the same subchannels may have different resolver response
// generators. If we don't remove this arg, subchannel pool will create new // generators. If we don't remove this arg, subchannel pool will create new
// subchannels for the same address instead of reusing existing ones because // subchannels for the same address instead of reusing existing ones because
@ -98,19 +107,15 @@ FakeResolver::FakeResolver(ResolverArgs args)
const char* args_to_remove[] = {GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR}; const char* args_to_remove[] = {GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
channel_args_ = grpc_channel_args_copy_and_remove( channel_args_ = grpc_channel_args_copy_and_remove(
args.args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove)); args.args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove));
if (response_generator != nullptr) { if (response_generator_ != nullptr) {
response_generator->resolver_ = this; response_generator_->SetFakeResolver(Ref());
if (response_generator->has_result_) {
response_generator->SetResponse(std::move(response_generator->result_));
response_generator->has_result_ = false;
}
} }
} }
FakeResolver::~FakeResolver() { grpc_channel_args_destroy(channel_args_); } FakeResolver::~FakeResolver() { grpc_channel_args_destroy(channel_args_); }
void FakeResolver::StartLocked() { void FakeResolver::StartLocked() {
active_ = true; started_ = true;
MaybeSendResultLocked(); MaybeSendResultLocked();
} }
@ -130,7 +135,7 @@ void FakeResolver::RequestReresolutionLocked() {
} }
void FakeResolver::MaybeSendResultLocked() { void FakeResolver::MaybeSendResultLocked() {
if (!active_) return; if (!started_ || shutdown_) return;
if (return_failure_) { if (return_failure_) {
// TODO(roth): Change resolver result generator to be able to inject // TODO(roth): Change resolver result generator to be able to inject
// the error to be returned. // the error to be returned.
@ -165,9 +170,13 @@ void FakeResolver::ReturnReresolutionResult(void* arg, grpc_error* error) {
// FakeResolverResponseGenerator // FakeResolverResponseGenerator
// //
FakeResolverResponseGenerator::FakeResolverResponseGenerator() {}
FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {}
struct SetResponseClosureArg { struct SetResponseClosureArg {
grpc_closure set_response_closure; grpc_closure set_response_closure;
FakeResolverResponseGenerator* generator; RefCountedPtr<FakeResolver> resolver;
Resolver::Result result; Resolver::Result result;
bool has_result = false; bool has_result = false;
bool immediate = true; bool immediate = true;
@ -176,97 +185,146 @@ struct SetResponseClosureArg {
void FakeResolverResponseGenerator::SetResponseLocked(void* arg, void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
grpc_error* error) { grpc_error* error) {
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg); SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
FakeResolver* resolver = closure_arg->generator->resolver_; auto& resolver = closure_arg->resolver;
resolver->next_result_ = std::move(closure_arg->result); if (!resolver->shutdown_) {
resolver->has_next_result_ = true; resolver->next_result_ = std::move(closure_arg->result);
resolver->MaybeSendResultLocked(); resolver->has_next_result_ = true;
closure_arg->generator->Unref(); resolver->MaybeSendResultLocked();
}
Delete(closure_arg); Delete(closure_arg);
} }
void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) { void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
if (resolver_ != nullptr) { RefCountedPtr<FakeResolver> resolver;
Ref().release(); // ref to be held by closure {
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>(); MutexLock lock(&mu_);
closure_arg->generator = this; if (resolver_ == nullptr) {
closure_arg->result = std::move(result); has_result_ = true;
GRPC_CLOSURE_SCHED( result_ = std::move(result);
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked, return;
closure_arg, }
grpc_combiner_scheduler(resolver_->combiner())), resolver = resolver_->Ref();
GRPC_ERROR_NONE);
} else {
has_result_ = true;
result_ = std::move(result);
} }
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
closure_arg->resolver = std::move(resolver);
closure_arg->result = std::move(result);
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(
&closure_arg->set_response_closure, SetResponseLocked, closure_arg,
grpc_combiner_scheduler(closure_arg->resolver->combiner())),
GRPC_ERROR_NONE);
} }
void FakeResolverResponseGenerator::SetReresolutionResponseLocked( void FakeResolverResponseGenerator::SetReresolutionResponseLocked(
void* arg, grpc_error* error) { void* arg, grpc_error* error) {
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg); SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
FakeResolver* resolver = closure_arg->generator->resolver_; auto& resolver = closure_arg->resolver;
resolver->reresolution_result_ = std::move(closure_arg->result); if (!resolver->shutdown_) {
resolver->has_reresolution_result_ = closure_arg->has_result; resolver->reresolution_result_ = std::move(closure_arg->result);
resolver->has_reresolution_result_ = closure_arg->has_result;
}
Delete(closure_arg); Delete(closure_arg);
} }
void FakeResolverResponseGenerator::SetReresolutionResponse( void FakeResolverResponseGenerator::SetReresolutionResponse(
Resolver::Result result) { Resolver::Result result) {
GPR_ASSERT(resolver_ != nullptr); RefCountedPtr<FakeResolver> resolver;
{
MutexLock lock(&mu_);
GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref();
}
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>(); SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
closure_arg->generator = this; closure_arg->resolver = std::move(resolver);
closure_arg->result = std::move(result); closure_arg->result = std::move(result);
closure_arg->has_result = true; closure_arg->has_result = true;
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, GRPC_CLOSURE_INIT(
SetReresolutionResponseLocked, closure_arg, &closure_arg->set_response_closure, SetReresolutionResponseLocked,
grpc_combiner_scheduler(resolver_->combiner())), closure_arg,
grpc_combiner_scheduler(closure_arg->resolver->combiner())),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
void FakeResolverResponseGenerator::UnsetReresolutionResponse() { void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
GPR_ASSERT(resolver_ != nullptr); RefCountedPtr<FakeResolver> resolver;
{
MutexLock lock(&mu_);
GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref();
}
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>(); SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
closure_arg->generator = this; closure_arg->resolver = std::move(resolver);
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, GRPC_CLOSURE_INIT(
SetReresolutionResponseLocked, closure_arg, &closure_arg->set_response_closure, SetReresolutionResponseLocked,
grpc_combiner_scheduler(resolver_->combiner())), closure_arg,
grpc_combiner_scheduler(closure_arg->resolver->combiner())),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
void FakeResolverResponseGenerator::SetFailureLocked(void* arg, void FakeResolverResponseGenerator::SetFailureLocked(void* arg,
grpc_error* error) { grpc_error* error) {
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg); SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
FakeResolver* resolver = closure_arg->generator->resolver_; auto& resolver = closure_arg->resolver;
resolver->return_failure_ = true; if (!resolver->shutdown_) {
if (closure_arg->immediate) resolver->MaybeSendResultLocked(); resolver->return_failure_ = true;
if (closure_arg->immediate) resolver->MaybeSendResultLocked();
}
Delete(closure_arg); Delete(closure_arg);
} }
void FakeResolverResponseGenerator::SetFailure() { void FakeResolverResponseGenerator::SetFailure() {
GPR_ASSERT(resolver_ != nullptr); RefCountedPtr<FakeResolver> resolver;
{
MutexLock lock(&mu_);
GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref();
}
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>(); SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
closure_arg->generator = this; closure_arg->resolver = std::move(resolver);
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked, GRPC_CLOSURE_INIT(
closure_arg, &closure_arg->set_response_closure, SetFailureLocked, closure_arg,
grpc_combiner_scheduler(resolver_->combiner())), grpc_combiner_scheduler(closure_arg->resolver->combiner())),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
void FakeResolverResponseGenerator::SetFailureOnReresolution() { void FakeResolverResponseGenerator::SetFailureOnReresolution() {
GPR_ASSERT(resolver_ != nullptr); RefCountedPtr<FakeResolver> resolver;
{
MutexLock lock(&mu_);
GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref();
}
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>(); SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
closure_arg->generator = this; closure_arg->resolver = std::move(resolver);
closure_arg->immediate = false; closure_arg->immediate = false;
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked, GRPC_CLOSURE_INIT(
closure_arg, &closure_arg->set_response_closure, SetFailureLocked, closure_arg,
grpc_combiner_scheduler(resolver_->combiner())), grpc_combiner_scheduler(closure_arg->resolver->combiner())),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
void FakeResolverResponseGenerator::SetFakeResolver(
RefCountedPtr<FakeResolver> resolver) {
MutexLock lock(&mu_);
resolver_ = std::move(resolver);
if (resolver_ == nullptr) return;
if (has_result_) {
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
closure_arg->resolver = resolver_->Ref();
closure_arg->result = std::move(result_);
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked,
closure_arg,
grpc_combiner_scheduler(resolver_->combiner())),
GRPC_ERROR_NONE);
has_result_ = false;
}
}
namespace { namespace {
static void* response_generator_arg_copy(void* p) { static void* response_generator_arg_copy(void* p) {
@ -304,12 +362,13 @@ grpc_arg FakeResolverResponseGenerator::MakeChannelArg(
return arg; return arg;
} }
FakeResolverResponseGenerator* FakeResolverResponseGenerator::GetFromArgs( RefCountedPtr<FakeResolverResponseGenerator>
const grpc_channel_args* args) { FakeResolverResponseGenerator::GetFromArgs(const grpc_channel_args* args) {
const grpc_arg* arg = const grpc_arg* arg =
grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR); grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) return nullptr; if (arg == nullptr || arg->type != GRPC_ARG_POINTER) return nullptr;
return static_cast<FakeResolverResponseGenerator*>(arg->value.pointer.p); return static_cast<FakeResolverResponseGenerator*>(arg->value.pointer.p)
->Ref();
} }
// //

@ -42,7 +42,8 @@ class FakeResolver;
class FakeResolverResponseGenerator class FakeResolverResponseGenerator
: public RefCounted<FakeResolverResponseGenerator> { : public RefCounted<FakeResolverResponseGenerator> {
public: public:
FakeResolverResponseGenerator() {} FakeResolverResponseGenerator();
~FakeResolverResponseGenerator();
// Instructs the fake resolver associated with the response generator // Instructs the fake resolver associated with the response generator
// instance to trigger a new resolution with the specified result. If the // instance to trigger a new resolution with the specified result. If the
@ -71,17 +72,21 @@ class FakeResolverResponseGenerator
static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator); static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator);
// Returns the response generator in \a args, or null if not found. // Returns the response generator in \a args, or null if not found.
static FakeResolverResponseGenerator* GetFromArgs( static RefCountedPtr<FakeResolverResponseGenerator> GetFromArgs(
const grpc_channel_args* args); const grpc_channel_args* args);
private: private:
friend class FakeResolver; friend class FakeResolver;
// Set the corresponding FakeResolver to this generator.
void SetFakeResolver(RefCountedPtr<FakeResolver> resolver);
static void SetResponseLocked(void* arg, grpc_error* error); static void SetResponseLocked(void* arg, grpc_error* error);
static void SetReresolutionResponseLocked(void* arg, grpc_error* error); static void SetReresolutionResponseLocked(void* arg, grpc_error* error);
static void SetFailureLocked(void* arg, grpc_error* error); static void SetFailureLocked(void* arg, grpc_error* error);
FakeResolver* resolver_ = nullptr; // Do not own. // Mutex protecting the members below.
Mutex mu_;
RefCountedPtr<FakeResolver> resolver_;
Resolver::Result result_; Resolver::Result result_;
bool has_result_ = false; bool has_result_ = false;
}; };

@ -0,0 +1,264 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <limits.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/http2_errors.h"
// The idle filter is disabled in client channel by default.
// To enable the idle filte, set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [0, INT_MAX)
// in channel args.
// TODO(qianchengz): Find a reasonable default value. Maybe check what deault
// value Java uses.
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX
namespace grpc_core {
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
#define GRPC_IDLE_FILTER_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
} \
} while (0)
namespace {
grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) {
return grpc_channel_arg_get_integer(
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS),
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX});
}
class ChannelData {
public:
static grpc_error* Init(grpc_channel_element* elem,
grpc_channel_element_args* args);
static void Destroy(grpc_channel_element* elem);
static void StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op);
void IncreaseCallCount();
void DecreaseCallCount();
private:
ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args,
grpc_error** error);
~ChannelData() = default;
static void IdleTimerCallback(void* arg, grpc_error* error);
static void IdleTransportOpCompleteCallback(void* arg, grpc_error* error);
void StartIdleTimer();
void EnterIdle();
grpc_channel_element* elem_;
// The channel stack to which we take refs for pending callbacks.
grpc_channel_stack* channel_stack_;
// Timeout after the last RPC finishes on the client channel at which the
// channel goes back into IDLE state.
const grpc_millis client_idle_timeout_;
// Member data used to track the state of channel.
Mutex call_count_mu_;
size_t call_count_;
// Idle timer and its callback closure.
grpc_timer idle_timer_;
grpc_closure idle_timer_callback_;
// The transport op telling the client channel to enter IDLE.
grpc_transport_op idle_transport_op_;
grpc_closure idle_transport_op_complete_callback_;
};
grpc_error* ChannelData::Init(grpc_channel_element* elem,
grpc_channel_element_args* args) {
grpc_error* error = GRPC_ERROR_NONE;
new (elem->channel_data) ChannelData(elem, args, &error);
return error;
}
void ChannelData::Destroy(grpc_channel_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
}
void ChannelData::StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Catch the disconnect_with_error transport op.
if (op->disconnect_with_error != nullptr) {
// Disconnect. Cancel the timer if we set it before.
// IncreaseCallCount() introduces a dummy call. It will cancel the timer and
// prevent the timer from being reset by other threads.
chand->IncreaseCallCount();
}
// Pass the op to the next filter.
grpc_channel_next_op(elem, op);
}
void ChannelData::IncreaseCallCount() {
MutexLock lock(&call_count_mu_);
if (call_count_++ == 0) {
grpc_timer_cancel(&idle_timer_);
}
GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR, call_count_);
}
void ChannelData::DecreaseCallCount() {
MutexLock lock(&call_count_mu_);
if (call_count_-- == 1) {
StartIdleTimer();
}
GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR, call_count_);
}
ChannelData::ChannelData(grpc_channel_element* elem,
grpc_channel_element_args* args, grpc_error** error)
: elem_(elem),
channel_stack_(args->channel_stack),
client_idle_timeout_(GetClientIdleTimeout(args->channel_args)),
call_count_(0) {
// If the idle filter is explicitly disabled in channel args, this ctor should
// not get called.
GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE);
GRPC_IDLE_FILTER_LOG("created with max_leisure_time = %" PRId64 " ms",
client_idle_timeout_);
// Initialize the idle timer without setting it.
grpc_timer_init_unset(&idle_timer_);
// Initialize the idle timer callback closure.
GRPC_CLOSURE_INIT(&idle_timer_callback_, IdleTimerCallback, this,
grpc_schedule_on_exec_ctx);
// Initialize the idle transport op complete callback.
GRPC_CLOSURE_INIT(&idle_transport_op_complete_callback_,
IdleTransportOpCompleteCallback, this,
grpc_schedule_on_exec_ctx);
}
void ChannelData::IdleTimerCallback(void* arg, grpc_error* error) {
GRPC_IDLE_FILTER_LOG("timer alarms");
ChannelData* chand = static_cast<ChannelData*>(arg);
{
MutexLock lock(&chand->call_count_mu_);
if (error == GRPC_ERROR_NONE && chand->call_count_ == 0) {
chand->EnterIdle();
}
}
GRPC_IDLE_FILTER_LOG("timer finishes");
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
}
void ChannelData::IdleTransportOpCompleteCallback(void* arg,
grpc_error* error) {
ChannelData* chand = static_cast<ChannelData*>(arg);
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "idle transport op");
}
void ChannelData::StartIdleTimer() {
GRPC_IDLE_FILTER_LOG("timer has started");
// Hold a ref to the channel stack for the timer callback.
GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback");
grpc_timer_init(&idle_timer_, ExecCtx::Get()->Now() + client_idle_timeout_,
&idle_timer_callback_);
}
void ChannelData::EnterIdle() {
GRPC_IDLE_FILTER_LOG("the channel will enter IDLE");
// Hold a ref to the channel stack for the transport op.
GRPC_CHANNEL_STACK_REF(channel_stack_, "idle transport op");
// Initialize the transport op.
memset(&idle_transport_op_, 0, sizeof(idle_transport_op_));
idle_transport_op_.disconnect_with_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"),
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE);
idle_transport_op_.on_consumed = &idle_transport_op_complete_callback_;
// Pass the transport op down to the channel stack.
grpc_channel_next_op(elem_, &idle_transport_op_);
}
class CallData {
public:
static grpc_error* Init(grpc_call_element* elem,
const grpc_call_element_args* args);
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure);
};
grpc_error* CallData::Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->IncreaseCallCount();
return GRPC_ERROR_NONE;
}
void CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->DecreaseCallCount();
}
const grpc_channel_filter grpc_client_idle_filter = {
grpc_call_next_op,
ChannelData::StartTransportOp,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(ChannelData),
ChannelData::Init,
ChannelData::Destroy,
grpc_channel_next_get_info,
"client_idle"};
static bool MaybeAddClientIdleFilter(grpc_channel_stack_builder* builder,
void* arg) {
const grpc_channel_args* channel_args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (!grpc_channel_args_want_minimal_stack(channel_args) &&
GetClientIdleTimeout(channel_args) != INT_MAX) {
return grpc_channel_stack_builder_prepend_filter(
builder, &grpc_client_idle_filter, nullptr, nullptr);
} else {
return true;
}
}
} // namespace
} // namespace grpc_core
void grpc_client_idle_filter_init(void) {
grpc_channel_init_register_stage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_core::MaybeAddClientIdleFilter, nullptr);
}
void grpc_client_idle_filter_shutdown(void) {}

@ -47,7 +47,7 @@
namespace { namespace {
struct channel_data { struct channel_data {
/* We take a reference to the channel stack for the timer callback */ /* The channel stack to which we take refs for pending callbacks. */
grpc_channel_stack* channel_stack; grpc_channel_stack* channel_stack;
/* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer /* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
and max_age_grace_timer_pending */ and max_age_grace_timer_pending */

@ -73,6 +73,8 @@ static const char* error_int_name(grpc_error_ints key) {
return "limit"; return "limit";
case GRPC_ERROR_INT_OCCURRED_DURING_WRITE: case GRPC_ERROR_INT_OCCURRED_DURING_WRITE:
return "occurred_during_write"; return "occurred_during_write";
case GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE:
return "channel_connectivity_state";
case GRPC_ERROR_INT_MAX: case GRPC_ERROR_INT_MAX:
GPR_UNREACHABLE_CODE(return "unknown"); GPR_UNREACHABLE_CODE(return "unknown");
} }

@ -73,6 +73,8 @@ typedef enum {
GRPC_ERROR_INT_LIMIT, GRPC_ERROR_INT_LIMIT,
/// chttp2: did the error occur while a write was in progress /// chttp2: did the error occur while a write was in progress
GRPC_ERROR_INT_OCCURRED_DURING_WRITE, GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
/// channel connectivity state associated with the error
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE,
/// Must always be last /// Must always be last
GRPC_ERROR_INT_MAX, GRPC_ERROR_INT_MAX,

@ -46,6 +46,8 @@ void grpc_resolver_dns_native_init(void);
void grpc_resolver_dns_native_shutdown(void); void grpc_resolver_dns_native_shutdown(void);
void grpc_resolver_sockaddr_init(void); void grpc_resolver_sockaddr_init(void);
void grpc_resolver_sockaddr_shutdown(void); void grpc_resolver_sockaddr_shutdown(void);
void grpc_client_idle_filter_init(void);
void grpc_client_idle_filter_shutdown(void);
void grpc_max_age_filter_init(void); void grpc_max_age_filter_init(void);
void grpc_max_age_filter_shutdown(void); void grpc_max_age_filter_shutdown(void);
void grpc_message_size_filter_init(void); void grpc_message_size_filter_init(void);
@ -82,6 +84,8 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_dns_native_shutdown); grpc_resolver_dns_native_shutdown);
grpc_register_plugin(grpc_resolver_sockaddr_init, grpc_register_plugin(grpc_resolver_sockaddr_init,
grpc_resolver_sockaddr_shutdown); grpc_resolver_sockaddr_shutdown);
grpc_register_plugin(grpc_client_idle_filter_init,
grpc_client_idle_filter_shutdown);
grpc_register_plugin(grpc_max_age_filter_init, grpc_register_plugin(grpc_max_age_filter_init,
grpc_max_age_filter_shutdown); grpc_max_age_filter_shutdown);
grpc_register_plugin(grpc_message_size_filter_init, grpc_register_plugin(grpc_message_size_filter_init,

@ -46,6 +46,8 @@ void grpc_lb_policy_pick_first_init(void);
void grpc_lb_policy_pick_first_shutdown(void); void grpc_lb_policy_pick_first_shutdown(void);
void grpc_lb_policy_round_robin_init(void); void grpc_lb_policy_round_robin_init(void);
void grpc_lb_policy_round_robin_shutdown(void); void grpc_lb_policy_round_robin_shutdown(void);
void grpc_client_idle_filter_init(void);
void grpc_client_idle_filter_shutdown(void);
void grpc_max_age_filter_init(void); void grpc_max_age_filter_init(void);
void grpc_max_age_filter_shutdown(void); void grpc_max_age_filter_shutdown(void);
void grpc_message_size_filter_init(void); void grpc_message_size_filter_init(void);
@ -82,6 +84,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_pick_first_shutdown); grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init, grpc_register_plugin(grpc_lb_policy_round_robin_init,
grpc_lb_policy_round_robin_shutdown); grpc_lb_policy_round_robin_shutdown);
grpc_register_plugin(grpc_client_idle_filter_init,
grpc_client_idle_filter_shutdown);
grpc_register_plugin(grpc_max_age_filter_init, grpc_register_plugin(grpc_max_age_filter_init,
grpc_max_age_filter_shutdown); grpc_max_age_filter_shutdown);
grpc_register_plugin(grpc_message_size_filter_init, grpc_register_plugin(grpc_message_size_filter_init,

@ -391,6 +391,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/client_idle/client_idle_filter.cc',
'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc',

@ -209,7 +209,6 @@ class ClientLbEnd2endTest : public ::testing::Test {
// Explicitly destroy all the members so that we can make sure grpc_shutdown // Explicitly destroy all the members so that we can make sure grpc_shutdown
// has finished by the end of this function, and thus all the registered // has finished by the end of this function, and thus all the registered
// LB policy factories are removed. // LB policy factories are removed.
stub_.reset();
servers_.clear(); servers_.clear();
creds_.reset(); creds_.reset();
grpc_shutdown_blocking(); grpc_shutdown_blocking();
@ -421,7 +420,6 @@ class ClientLbEnd2endTest : public ::testing::Test {
} }
const grpc::string server_host_; const grpc::string server_host_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<ServerData>> servers_; std::vector<std::unique_ptr<ServerData>> servers_;
const grpc::string kRequestMessage_; const grpc::string kRequestMessage_;
std::shared_ptr<ChannelCredentials> creds_; std::shared_ptr<ChannelCredentials> creds_;
@ -1467,6 +1465,32 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) {
EnableDefaultHealthCheckService(false); EnableDefaultHealthCheckService(false);
} }
TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
// Start server.
const int kNumServers = 1;
StartServers(kNumServers);
// Set max idle time and build the channel.
ChannelArguments args;
args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 100);
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("", response_generator, args);
auto stub = BuildStub(channel);
// The initial channel state should be IDLE.
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
// After sending RPC, channel state should be READY.
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// After a period time not using the channel, the channel state should switch
// to IDLE.
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(120));
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
// Sending a new RPC should awake the IDLE channel.
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
}
class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest { class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
protected: protected:
void SetUp() override { void SetUp() override {

@ -978,6 +978,7 @@ src/core/ext/filters/client_channel/subchannel.h \
src/core/ext/filters/client_channel/subchannel_interface.h \ src/core/ext/filters/client_channel/subchannel_interface.h \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.h \ src/core/ext/filters/client_channel/subchannel_pool_interface.h \
src/core/ext/filters/client_idle/client_idle_filter.cc \
src/core/ext/filters/deadline/deadline_filter.cc \ src/core/ext/filters/deadline/deadline_filter.cc \
src/core/ext/filters/deadline/deadline_filter.h \ src/core/ext/filters/deadline/deadline_filter.h \
src/core/ext/filters/http/client/http_client_filter.cc \ src/core/ext/filters/http/client/http_client_filter.cc \

@ -6680,6 +6680,7 @@
"gpr", "gpr",
"grpc_base", "grpc_base",
"grpc_client_authority_filter", "grpc_client_authority_filter",
"grpc_client_idle_filter",
"grpc_deadline_filter", "grpc_deadline_filter",
"grpc_lb_policy_grpclb_secure", "grpc_lb_policy_grpclb_secure",
"grpc_lb_policy_pick_first", "grpc_lb_policy_pick_first",
@ -6772,6 +6773,7 @@
"gpr", "gpr",
"grpc_base", "grpc_base",
"grpc_client_authority_filter", "grpc_client_authority_filter",
"grpc_client_idle_filter",
"grpc_deadline_filter", "grpc_deadline_filter",
"grpc_lb_policy_grpclb", "grpc_lb_policy_grpclb",
"grpc_lb_policy_pick_first", "grpc_lb_policy_pick_first",
@ -9132,6 +9134,21 @@
"third_party": false, "third_party": false,
"type": "filegroup" "type": "filegroup"
}, },
{
"deps": [
"gpr",
"grpc_base"
],
"headers": [],
"is_filegroup": true,
"language": "c",
"name": "grpc_client_idle_filter",
"src": [
"src/core/ext/filters/client_idle/client_idle_filter.cc"
],
"third_party": false,
"type": "filegroup"
},
{ {
"deps": [ "deps": [
"gpr_codegen" "gpr_codegen"

Loading…
Cancel
Save