diff --git a/BUILD b/BUILD
index 04044db7231..1e86dfe1341 100644
--- a/BUILD
+++ b/BUILD
@@ -1096,6 +1096,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
"src/core/ext/filters/client_channel/config_selector.cc",
+ "src/core/ext/filters/client_channel/dynamic_filters.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/health/health_check_client.cc",
"src/core/ext/filters/client_channel/http_connect_handshaker.cc",
@@ -1124,6 +1125,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/config_selector.h",
"src/core/ext/filters/client_channel/connector.h",
+ "src/core/ext/filters/client_channel/dynamic_filters.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",
"src/core/ext/filters/client_channel/health/health_check_client.h",
"src/core/ext/filters/client_channel/http_connect_handshaker.h",
diff --git a/BUILD.gn b/BUILD.gn
index 9dc473d92ab..3a315e32a28 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -224,6 +224,8 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/config_selector.cc",
"src/core/ext/filters/client_channel/config_selector.h",
"src/core/ext/filters/client_channel/connector.h",
+ "src/core/ext/filters/client_channel/dynamic_filters.cc",
+ "src/core/ext/filters/client_channel/dynamic_filters.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",
"src/core/ext/filters/client_channel/health/health_check_client.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 893f27ef8f0..e72ebcd3932 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1453,6 +1453,7 @@ add_library(grpc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/config_selector.cc
+ src/core/ext/filters/client_channel/dynamic_filters.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -2255,6 +2256,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/config_selector.cc
+ src/core/ext/filters/client_channel/dynamic_filters.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
diff --git a/Makefile b/Makefile
index aeaadfcd7ea..f4f61721c8d 100644
--- a/Makefile
+++ b/Makefile
@@ -1042,6 +1042,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
+ src/core/ext/filters/client_channel/dynamic_filters.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@@ -1697,6 +1698,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
+ src/core/ext/filters/client_channel/dynamic_filters.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 493d062c251..1de7965efb7 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -391,6 +391,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_factory.h
- src/core/ext/filters/client_channel/config_selector.h
- src/core/ext/filters/client_channel/connector.h
+ - src/core/ext/filters/client_channel/dynamic_filters.h
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
- src/core/ext/filters/client_channel/http_connect_handshaker.h
@@ -874,6 +875,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
- src/core/ext/filters/client_channel/config_selector.cc
+ - src/core/ext/filters/client_channel/dynamic_filters.cc
- src/core/ext/filters/client_channel/global_subchannel_pool.cc
- src/core/ext/filters/client_channel/health/health_check_client.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -1562,6 +1564,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_factory.h
- src/core/ext/filters/client_channel/config_selector.h
- src/core/ext/filters/client_channel/connector.h
+ - src/core/ext/filters/client_channel/dynamic_filters.h
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
- src/core/ext/filters/client_channel/http_connect_handshaker.h
@@ -1805,6 +1808,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
- src/core/ext/filters/client_channel/config_selector.cc
+ - src/core/ext/filters/client_channel/dynamic_filters.cc
- src/core/ext/filters/client_channel/global_subchannel_pool.cc
- src/core/ext/filters/client_channel/health/health_check_client.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc
diff --git a/config.m4 b/config.m4
index 444da9d5a34..8ef3efa2866 100644
--- a/config.m4
+++ b/config.m4
@@ -48,6 +48,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
+ src/core/ext/filters/client_channel/dynamic_filters.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
diff --git a/config.w32 b/config.w32
index a91c88311e5..0bbe80a74ba 100644
--- a/config.w32
+++ b/config.w32
@@ -15,6 +15,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_plugin.cc " +
"src\\core\\ext\\filters\\client_channel\\config_selector.cc " +
+ "src\\core\\ext\\filters\\client_channel\\dynamic_filters.cc " +
"src\\core\\ext\\filters\\client_channel\\global_subchannel_pool.cc " +
"src\\core\\ext\\filters\\client_channel\\health\\health_check_client.cc " +
"src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index a589a09380a..63688266d14 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -207,6 +207,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
+ 'src/core/ext/filters/client_channel/dynamic_filters.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_connect_handshaker.h',
@@ -825,6 +826,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
+ 'src/core/ext/filters/client_channel/dynamic_filters.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_connect_handshaker.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 3d6240dd505..ecb31dfdcf6 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -205,6 +205,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
+ 'src/core/ext/filters/client_channel/dynamic_filters.cc',
+ 'src/core/ext/filters/client_channel/dynamic_filters.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
@@ -1357,6 +1359,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
+ 'src/core/ext/filters/client_channel/dynamic_filters.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_connect_handshaker.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 8a62afb1939..5051220e16a 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -120,6 +120,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/config_selector.cc )
s.files += %w( src/core/ext/filters/client_channel/config_selector.h )
s.files += %w( src/core/ext/filters/client_channel/connector.h )
+ s.files += %w( src/core/ext/filters/client_channel/dynamic_filters.cc )
+ s.files += %w( src/core/ext/filters/client_channel/dynamic_filters.h )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.cc )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.h )
s.files += %w( src/core/ext/filters/client_channel/health/health_check_client.cc )
diff --git a/grpc.gyp b/grpc.gyp
index 782f748f0ab..7839c8bfc27 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -461,6 +461,7 @@
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/config_selector.cc',
+ 'src/core/ext/filters/client_channel/dynamic_filters.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
@@ -1092,6 +1093,7 @@
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/config_selector.cc',
+ 'src/core/ext/filters/client_channel/dynamic_filters.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
diff --git a/package.xml b/package.xml
index 3bbf28d8e79..c3e1c87a6ac 100644
--- a/package.xml
+++ b/package.xml
@@ -100,6 +100,8 @@
+
+
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 1934db07ade..9da28b9535b 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -42,6 +42,7 @@
#include "src/core/ext/filters/client_channel/backend_metric.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
+#include "src/core/ext/filters/client_channel/dynamic_filters.h"
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
@@ -99,9 +100,17 @@
// send_trailing_metadata
#define MAX_PENDING_BATCHES 6
+// Channel arg containing a pointer to the ChannelData object.
+#define GRPC_ARG_CLIENT_CHANNEL_DATA "grpc.internal.client_channel_data"
+
+// Channel arg containing a pointer to the RetryThrottleData object.
+#define GRPC_ARG_RETRY_THROTTLE_DATA "grpc.internal.retry_throttle_data"
+
namespace grpc_core {
+using internal::ClientChannelGlobalParsedConfig;
using internal::ClientChannelMethodParsedConfig;
+using internal::ClientChannelServiceConfigParser;
using internal::ServerRetryThrottleData;
TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
@@ -113,7 +122,6 @@ namespace {
// ChannelData definition
//
-class RetryingCall;
class LoadBalancedCall;
class ChannelData {
@@ -159,13 +167,13 @@ class ChannelData {
grpc_error* resolver_transient_failure_error() const {
return resolver_transient_failure_error_;
}
- RefCountedPtr retry_throttle_data() const {
- return retry_throttle_data_;
- }
RefCountedPtr service_config() const {
return service_config_;
}
ConfigSelector* config_selector() const { return config_selector_.get(); }
+ RefCountedPtr dynamic_filters() const {
+ return dynamic_filters_;
+ }
Mutex* data_plane_mu() const { return &data_plane_mu_; }
// These methods all require holding data_plane_mu_.
@@ -334,9 +342,9 @@ class ChannelData {
// Data from service config.
grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE;
bool received_service_config_data_ = false;
- RefCountedPtr retry_throttle_data_;
RefCountedPtr service_config_;
RefCountedPtr config_selector_;
+ RefCountedPtr dynamic_filters_;
//
// Fields used in the data plane. Guarded by data_plane_mu_.
@@ -479,7 +487,7 @@ class CallData {
void InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
grpc_transport_stream_op_batch* batch);
- static void CreateLbCall(void* arg, grpc_error* error);
+ void CreateDynamicCall(grpc_call_element* elem);
// State for handling deadlines.
// The code in deadline_filter.c requires this to be the first field.
@@ -496,7 +504,6 @@ class CallData {
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
- bool enable_retries_;
grpc_polling_entity* pollent_ = nullptr;
@@ -508,15 +515,13 @@ class CallData {
ChannelData::ResolverQueuedCall resolver_queued_call_;
ResolverQueuedCallCanceller* resolver_call_canceller_ = nullptr;
- RefCountedPtr retry_throttle_data_;
- const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy_ = nullptr;
std::function on_call_committed_;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
- RetryingCall* retrying_call_ = nullptr;
- RefCountedPtr lb_call_;
+ RefCountedPtr dynamic_filters_;
+ RefCountedPtr dynamic_call_;
// Batches are added to this list when received from above.
// They are removed when we are done handling the batch (i.e., when
@@ -1037,6 +1042,209 @@ class LoadBalancedCall {
grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
};
+//
+// dynamic termination filter
+//
+
+// Channel arg pointer vtable for GRPC_ARG_CLIENT_CHANNEL_DATA.
+void* ChannelDataArgCopy(void* p) { return p; }
+void ChannelDataArgDestroy(void* p) {}
+int ChannelDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
+const grpc_arg_pointer_vtable kChannelDataArgPointerVtable = {
+ ChannelDataArgCopy, ChannelDataArgDestroy, ChannelDataArgCmp};
+
+// Channel arg pointer vtable for GRPC_ARG_RETRY_THROTTLE_DATA.
+void* RetryThrottleDataArgCopy(void* p) {
+ auto* retry_throttle_data = static_cast(p);
+ retry_throttle_data->Ref().release();
+ return p;
+}
+void RetryThrottleDataArgDestroy(void* p) {
+ auto* retry_throttle_data = static_cast(p);
+ retry_throttle_data->Unref();
+}
+int RetryThrottleDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
+const grpc_arg_pointer_vtable kRetryThrottleDataArgPointerVtable = {
+ RetryThrottleDataArgCopy, RetryThrottleDataArgDestroy,
+ RetryThrottleDataArgCmp};
+
+class DynamicTerminationFilterChannelData {
+ public:
+ static grpc_error* Init(grpc_channel_element* elem,
+ grpc_channel_element_args* args);
+
+ static void Destroy(grpc_channel_element* elem) {
+ auto* chand =
+ static_cast(elem->channel_data);
+ chand->~DynamicTerminationFilterChannelData();
+ }
+
+ // Will never be called.
+ static void StartTransportOp(grpc_channel_element* elem,
+ grpc_transport_op* op) {}
+ static void GetChannelInfo(grpc_channel_element* elem,
+ const grpc_channel_info* info) {}
+
+ ChannelData* chand() const { return chand_; }
+ RefCountedPtr retry_throttle_data() const {
+ return retry_throttle_data_;
+ }
+
+ private:
+ static RefCountedPtr GetRetryThrottleDataFromArgs(
+ const grpc_channel_args* args) {
+ auto* retry_throttle_data =
+ grpc_channel_args_find_pointer(
+ args, GRPC_ARG_RETRY_THROTTLE_DATA);
+ if (retry_throttle_data == nullptr) return nullptr;
+ return retry_throttle_data->Ref();
+ }
+
+ explicit DynamicTerminationFilterChannelData(const grpc_channel_args* args)
+ : chand_(grpc_channel_args_find_pointer(
+ args, GRPC_ARG_CLIENT_CHANNEL_DATA)),
+ retry_throttle_data_(GetRetryThrottleDataFromArgs(args)) {}
+
+ ChannelData* chand_;
+ RefCountedPtr retry_throttle_data_;
+};
+
+class DynamicTerminationFilterCallData {
+ public:
+ static grpc_error* Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ new (elem->call_data) DynamicTerminationFilterCallData(*args);
+ return GRPC_ERROR_NONE;
+ }
+
+ static void Destroy(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_schedule_closure) {
+ auto* calld =
+ static_cast(elem->call_data);
+ auto* chand =
+ static_cast(elem->channel_data);
+ RefCountedPtr subchannel_call;
+ if (chand->chand()->enable_retries()) {
+ if (GPR_LIKELY(calld->retrying_call_ != nullptr)) {
+ subchannel_call = calld->retrying_call_->subchannel_call();
+ calld->retrying_call_->~RetryingCall();
+ }
+ } else {
+ if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
+ subchannel_call = calld->lb_call_->subchannel_call();
+ }
+ }
+ calld->~DynamicTerminationFilterCallData();
+ if (GPR_LIKELY(subchannel_call != nullptr)) {
+ subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
+ } else {
+ // TODO(yashkt) : This can potentially be a Closure::Run
+ ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
+ }
+ }
+
+ static void StartTransportStreamOpBatch(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ auto* calld =
+ static_cast(elem->call_data);
+ auto* chand =
+ static_cast(elem->channel_data);
+ if (chand->chand()->enable_retries()) {
+ calld->retrying_call_->StartTransportStreamOpBatch(batch);
+ } else {
+ calld->lb_call_->StartTransportStreamOpBatch(batch);
+ }
+ }
+
+ static void SetPollent(grpc_call_element* elem,
+ grpc_polling_entity* pollent) {
+ auto* calld =
+ static_cast(elem->call_data);
+ auto* chand =
+ static_cast(elem->channel_data);
+ ChannelData* client_channel = chand->chand();
+ grpc_call_element_args args = {
+ calld->owning_call_, nullptr,
+ calld->call_context_, calld->path_,
+ calld->call_start_time_, calld->deadline_,
+ calld->arena_, calld->call_combiner_};
+ if (client_channel->enable_retries()) {
+ // Get retry settings from service config.
+ auto* svc_cfg_call_data = static_cast(
+ calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
+ GPR_ASSERT(svc_cfg_call_data != nullptr);
+ auto* method_config = static_cast(
+ svc_cfg_call_data->GetMethodParsedConfig(
+ ClientChannelServiceConfigParser::ParserIndex()));
+ // Create retrying call.
+ calld->retrying_call_ = calld->arena_->New(
+ client_channel, args, pollent, chand->retry_throttle_data(),
+ method_config == nullptr ? nullptr : method_config->retry_policy());
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(
+ GPR_INFO,
+ "chand=%p dymamic_termination_calld=%p: create retrying_call=%p",
+ client_channel, calld, calld->retrying_call_);
+ }
+ } else {
+ calld->lb_call_ =
+ LoadBalancedCall::Create(client_channel, args, pollent, 0);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p dynamic_termination_calld=%p: create lb_call=%p",
+ chand, client_channel, calld->lb_call_.get());
+ }
+ }
+ }
+
+ private:
+ explicit DynamicTerminationFilterCallData(const grpc_call_element_args& args)
+ : path_(grpc_slice_ref_internal(args.path)),
+ call_start_time_(args.start_time),
+ deadline_(args.deadline),
+ arena_(args.arena),
+ owning_call_(args.call_stack),
+ call_combiner_(args.call_combiner),
+ call_context_(args.context) {}
+
+ ~DynamicTerminationFilterCallData() { grpc_slice_unref_internal(path_); }
+
+ grpc_slice path_; // Request path.
+ gpr_cycle_counter call_start_time_;
+ grpc_millis deadline_;
+ Arena* arena_;
+ grpc_call_stack* owning_call_;
+ CallCombiner* call_combiner_;
+ grpc_call_context_element* call_context_;
+
+ RetryingCall* retrying_call_ = nullptr;
+ RefCountedPtr lb_call_;
+};
+
+const grpc_channel_filter kDynamicTerminationFilterVtable = {
+ DynamicTerminationFilterCallData::StartTransportStreamOpBatch,
+ DynamicTerminationFilterChannelData::StartTransportOp,
+ sizeof(DynamicTerminationFilterCallData),
+ DynamicTerminationFilterCallData::Init,
+ DynamicTerminationFilterCallData::SetPollent,
+ DynamicTerminationFilterCallData::Destroy,
+ sizeof(DynamicTerminationFilterChannelData),
+ DynamicTerminationFilterChannelData::Init,
+ DynamicTerminationFilterChannelData::Destroy,
+ DynamicTerminationFilterChannelData::GetChannelInfo,
+ "dynamic_filter_termination",
+};
+
+grpc_error* DynamicTerminationFilterChannelData::Init(
+ grpc_channel_element* elem, grpc_channel_element_args* args) {
+ GPR_ASSERT(args->is_last);
+ GPR_ASSERT(elem->filter == &kDynamicTerminationFilterVtable);
+ new (elem->channel_data)
+ DynamicTerminationFilterChannelData(args->channel_args);
+ return GRPC_ERROR_NONE;
+}
+
//
// ChannelData::SubchannelWrapper
//
@@ -2073,6 +2281,18 @@ void ChannelData::UpdateServiceConfigInControlPlaneLocked(
}
void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
+ // Grab ref to service config.
+ RefCountedPtr service_config = saved_service_config_;
+ // Grab ref to config selector. Use default if resolver didn't supply one.
+ RefCountedPtr config_selector = saved_config_selector_;
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
+ saved_config_selector_.get());
+ }
+ if (config_selector == nullptr) {
+ config_selector =
+ MakeRefCounted(saved_service_config_);
+ }
// Get retry throttle data from service config.
const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
static_cast(
@@ -2086,18 +2306,25 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
server_name_, retry_throttle_config.value().max_milli_tokens,
retry_throttle_config.value().milli_token_ratio);
}
- // Grab ref to service config.
- RefCountedPtr service_config = saved_service_config_;
- // Grab ref to config selector. Use default if resolver didn't supply one.
- RefCountedPtr config_selector = saved_config_selector_;
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
- saved_config_selector_.get());
- }
- if (config_selector == nullptr) {
- config_selector =
- MakeRefCounted(saved_service_config_);
- }
+ // Construct per-LB filter stack.
+ std::vector filters =
+ config_selector->GetFilters();
+ filters.push_back(&kDynamicTerminationFilterVtable);
+ absl::InlinedVector args_to_add;
+ args_to_add.push_back(grpc_channel_arg_pointer_create(
+ const_cast(GRPC_ARG_CLIENT_CHANNEL_DATA), this,
+ &kChannelDataArgPointerVtable));
+ if (retry_throttle_data != nullptr) {
+ args_to_add.push_back(grpc_channel_arg_pointer_create(
+ const_cast(GRPC_ARG_RETRY_THROTTLE_DATA),
+ retry_throttle_data.get(), &kRetryThrottleDataArgPointerVtable));
+ }
+ grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
+ channel_args_, args_to_add.data(), args_to_add.size());
+ RefCountedPtr dynamic_filters =
+ DynamicFilters::Create(new_args, std::move(filters));
+ GPR_ASSERT(dynamic_filters != nullptr);
+ grpc_channel_args_destroy(new_args);
// Grab data plane lock to update service config.
//
// We defer unreffing the old values (and deallocating memory) until
@@ -2110,9 +2337,9 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
// Update service config.
received_service_config_data_ = true;
// Old values will be unreffed after lock is released.
- retry_throttle_data_.swap(retry_throttle_data);
service_config_.swap(service_config);
config_selector_.swap(config_selector);
+ dynamic_filters_.swap(dynamic_filters);
// Process calls that were queued waiting for the resolver result.
for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
call = call->next) {
@@ -2194,13 +2421,13 @@ void ChannelData::UpdateStateAndPickerLocked(
// the refs until after we release the lock, and then unref them at
// that point. This includes the following:
// - refs to subchannel wrappers in the keys of pending_subchannel_updates_
- // - ref stored in retry_throttle_data_
// - ref stored in service_config_
// - ref stored in config_selector_
+ // - ref stored in dynamic_filters_
// - ownership of the existing picker in picker_
- RefCountedPtr retry_throttle_data_to_unref;
RefCountedPtr service_config_to_unref;
RefCountedPtr config_selector_to_unref;
+ RefCountedPtr dynamic_filters_to_unref;
{
MutexLock lock(&data_plane_mu_);
// Handle subchannel updates.
@@ -2223,9 +2450,9 @@ void ChannelData::UpdateStateAndPickerLocked(
if (picker_ == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
received_service_config_data_ = false;
// Note: We save the objects to unref until after the lock is released.
- retry_throttle_data_to_unref = std::move(retry_throttle_data_);
service_config_to_unref = std::move(service_config_);
config_selector_to_unref = std::move(config_selector_);
+ dynamic_filters_to_unref = std::move(dynamic_filters_);
}
// Re-process queued picks.
for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
@@ -2431,8 +2658,7 @@ CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
arena_(args.arena),
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
- call_context_(args.context),
- enable_retries_(chand.enable_retries()) {}
+ call_context_(args.context) {}
CallData::~CallData() {
grpc_slice_unref_internal(path_);
@@ -2454,20 +2680,11 @@ void CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
CallData* calld = static_cast(elem->call_data);
- RefCountedPtr subchannel_call;
- if (calld->enable_retries_) {
- if (GPR_LIKELY(calld->retrying_call_ != nullptr)) {
- subchannel_call = calld->retrying_call_->subchannel_call();
- calld->retrying_call_->~RetryingCall();
- }
- } else {
- if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
- subchannel_call = calld->lb_call_->subchannel_call();
- }
- }
+ RefCountedPtr dynamic_call =
+ std::move(calld->dynamic_call_);
calld->~CallData();
- if (GPR_LIKELY(subchannel_call != nullptr)) {
- subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
+ if (GPR_LIKELY(dynamic_call != nullptr)) {
+ dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
} else {
// TODO(yashkt) : This can potentially be a Closure::Run
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
@@ -2511,10 +2728,10 @@ void CallData::StartTransportStreamOpBatch(
gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
calld, grpc_error_string(calld->cancel_error_));
}
- // If we do not have an LB call (i.e., a pick has not yet been started),
- // fail all pending batches. Otherwise, send the cancellation down to the
- // LB call.
- if (calld->lb_call_ == nullptr && calld->retrying_call_ == nullptr) {
+ // If we do not have a dynamic call (i.e., name resolution has not
+ // yet completed), fail all pending batches. Otherwise, send the
+ // cancellation down to the dynamic call.
+ if (calld->dynamic_call_ == nullptr) {
calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
NoYieldCallCombiner);
// Note: This will release the call combiner.
@@ -2522,32 +2739,27 @@ void CallData::StartTransportStreamOpBatch(
batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
} else {
// Note: This will release the call combiner.
- if (calld->lb_call_ != nullptr) {
- calld->lb_call_->StartTransportStreamOpBatch(batch);
- } else {
- GPR_ASSERT(calld->retrying_call_ != nullptr);
- calld->retrying_call_->StartTransportStreamOpBatch(batch);
- }
+ calld->dynamic_call_->StartTransportStreamOpBatch(batch);
}
return;
}
// Add the batch to the pending list.
calld->PendingBatchesAdd(elem, batch);
- // Check if we've already created an LB call.
- // Note that once we have created an LB call, we do not need to acquire
- // the channel's resolution mutex, which is more efficient (especially for
- // streaming calls).
- if (calld->lb_call_ != nullptr || calld->retrying_call_ != nullptr) {
+ // Check if we've already created a dynamic call.
+ // Note that once we have done so, we do not need to acquire the channel's
+ // resolution mutex, which is more efficient (especially for streaming calls).
+ if (calld->dynamic_call_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch", chand, calld);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
+ chand, calld, calld->dynamic_call_.get());
}
calld->PendingBatchesResume(elem);
return;
}
- // We do not yet have an LB call.
+ // We do not yet have a dynamic call.
// For batches containing a send_initial_metadata op, acquire the
// channel's resolution mutex to apply the service config to the call,
- // after which we will create an LB call.
+ // after which we will create a dynamic call.
if (GPR_LIKELY(batch->send_initial_metadata)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
@@ -2659,11 +2871,7 @@ void CallData::ResumePendingBatchInCallCombiner(void* arg,
static_cast(batch->handler_private.extra_arg);
auto* calld = static_cast(elem->call_data);
// Note: This will release the call combiner.
- if (calld->enable_retries_) {
- calld->retrying_call_->StartTransportStreamOpBatch(batch);
- } else {
- calld->lb_call_->StartTransportStreamOpBatch(batch);
- }
+ calld->dynamic_call_->StartTransportStreamOpBatch(batch);
}
// This is called via the call combiner, so access to calld is synchronized.
@@ -2677,8 +2885,8 @@ void CallData::PendingBatchesResume(grpc_call_element* elem) {
}
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
- " pending batches: lb_call=%p retrying_call=%p",
- chand, this, num_batches, lb_call_.get(), retrying_call_);
+ " pending batches on dynamic_call=%p",
+ chand, this, num_batches, dynamic_call_.get());
}
CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
@@ -2824,11 +3032,9 @@ grpc_error* CallData::ApplyServiceConfigToCallLocked(
*send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
}
}
- // Save retry policy.
- retry_policy_ = method_params->retry_policy();
}
- // Set retry throttle data for call.
- retry_throttle_data_ = chand->retry_throttle_data();
+ // Set the dynamic filter stack.
+ dynamic_filters_ = chand->dynamic_filters();
}
return GRPC_ERROR_NONE;
}
@@ -2876,7 +3082,7 @@ void CallData::ResolutionDone(void* arg, grpc_error* error) {
calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
return;
}
- calld->CreateLbCall(elem, GRPC_ERROR_NONE);
+ calld->CreateDynamicCall(elem);
}
void CallData::CheckResolution(void* arg, grpc_error* error) {
@@ -2955,31 +3161,29 @@ bool CallData::CheckResolutionLocked(grpc_call_element* elem,
return true;
}
-void CallData::CreateLbCall(void* arg, grpc_error* /*error*/) {
- auto* elem = static_cast(arg);
+void CallData::CreateDynamicCall(grpc_call_element* elem) {
auto* chand = static_cast(elem->channel_data);
- auto* calld = static_cast(elem->call_data);
- grpc_call_element_args args = {
- calld->owning_call_, nullptr,
- calld->call_context_, calld->path_,
- calld->call_start_time_, calld->deadline_,
- calld->arena_, calld->call_combiner_};
- if (calld->enable_retries_) {
- calld->retrying_call_ = calld->arena_->New(
- chand, args, calld->pollent_, std::move(calld->retry_throttle_data_),
- calld->retry_policy_);
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: create retrying_call=%p", chand,
- calld, calld->retrying_call_);
- }
- } else {
- calld->lb_call_ = LoadBalancedCall::Create(chand, args, calld->pollent_, 0);
+ DynamicFilters::Call::Args args = {std::move(dynamic_filters_),
+ pollent_,
+ path_,
+ call_start_time_,
+ deadline_,
+ arena_,
+ call_context_,
+ call_combiner_};
+ grpc_error* error = GRPC_ERROR_NONE;
+ DynamicFilters* channel_stack = args.channel_stack.get();
+ dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
+ if (error != GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: create lb_call=%p", chand, calld,
- calld->lb_call_.get());
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: failed to create dynamic call: error=%s",
+ chand, this, grpc_error_string(error));
}
+ PendingBatchesFail(elem, error, YieldCallCombiner);
+ return;
}
- calld->PendingBatchesResume(elem);
+ PendingBatchesResume(elem);
}
//
diff --git a/src/core/ext/filters/client_channel/config_selector.h b/src/core/ext/filters/client_channel/config_selector.h
index 54e6bd5d0c6..62e4ce5a38c 100644
--- a/src/core/ext/filters/client_channel/config_selector.h
+++ b/src/core/ext/filters/client_channel/config_selector.h
@@ -21,6 +21,7 @@
#include
#include