Merge pull request #24920 from markdroth/client_channel_split4

Add dynamic filters between name resolution and load balancing.
pull/25031/head
Mark D. Roth 4 years ago committed by GitHub
commit 8439ed152e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 2
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 2
      Makefile
  5. 4
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      grpc.gyp
  12. 2
      package.xml
  13. 390
      src/core/ext/filters/client_channel/client_channel.cc
  14. 4
      src/core/ext/filters/client_channel/config_selector.h
  15. 186
      src/core/ext/filters/client_channel/dynamic_filters.cc
  16. 99
      src/core/ext/filters/client_channel/dynamic_filters.h
  17. 22
      src/core/lib/surface/lame_client.cc
  18. 4
      src/core/lib/surface/lame_client.h
  19. 1
      src/python/grpcio/grpc_core_dependencies.py
  20. 2
      tools/doxygen/Doxyfile.c++.internal
  21. 2
      tools/doxygen/Doxyfile.core.internal

@ -1096,6 +1096,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
"src/core/ext/filters/client_channel/config_selector.cc",
"src/core/ext/filters/client_channel/dynamic_filters.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/health/health_check_client.cc",
"src/core/ext/filters/client_channel/http_connect_handshaker.cc",
@ -1124,6 +1125,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/config_selector.h",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/dynamic_filters.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",
"src/core/ext/filters/client_channel/health/health_check_client.h",
"src/core/ext/filters/client_channel/http_connect_handshaker.h",

@ -224,6 +224,8 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/config_selector.cc",
"src/core/ext/filters/client_channel/config_selector.h",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/dynamic_filters.cc",
"src/core/ext/filters/client_channel/dynamic_filters.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",
"src/core/ext/filters/client_channel/health/health_check_client.cc",

@ -1453,6 +1453,7 @@ add_library(grpc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/config_selector.cc
src/core/ext/filters/client_channel/dynamic_filters.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -2255,6 +2256,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/config_selector.cc
src/core/ext/filters/client_channel/dynamic_filters.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc

@ -1042,6 +1042,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/dynamic_filters.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@ -1697,6 +1698,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/dynamic_filters.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \

@ -391,6 +391,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_factory.h
- src/core/ext/filters/client_channel/config_selector.h
- src/core/ext/filters/client_channel/connector.h
- src/core/ext/filters/client_channel/dynamic_filters.h
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
- src/core/ext/filters/client_channel/http_connect_handshaker.h
@ -874,6 +875,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
- src/core/ext/filters/client_channel/config_selector.cc
- src/core/ext/filters/client_channel/dynamic_filters.cc
- src/core/ext/filters/client_channel/global_subchannel_pool.cc
- src/core/ext/filters/client_channel/health/health_check_client.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -1562,6 +1564,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_factory.h
- src/core/ext/filters/client_channel/config_selector.h
- src/core/ext/filters/client_channel/connector.h
- src/core/ext/filters/client_channel/dynamic_filters.h
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
- src/core/ext/filters/client_channel/http_connect_handshaker.h
@ -1805,6 +1808,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
- src/core/ext/filters/client_channel/config_selector.cc
- src/core/ext/filters/client_channel/dynamic_filters.cc
- src/core/ext/filters/client_channel/global_subchannel_pool.cc
- src/core/ext/filters/client_channel/health/health_check_client.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc

@ -48,6 +48,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/dynamic_filters.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \

@ -15,6 +15,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_plugin.cc " +
"src\\core\\ext\\filters\\client_channel\\config_selector.cc " +
"src\\core\\ext\\filters\\client_channel\\dynamic_filters.cc " +
"src\\core\\ext\\filters\\client_channel\\global_subchannel_pool.cc " +
"src\\core\\ext\\filters\\client_channel\\health\\health_check_client.cc " +
"src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " +

@ -207,6 +207,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/dynamic_filters.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_connect_handshaker.h',
@ -825,6 +826,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/dynamic_filters.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_connect_handshaker.h',

@ -205,6 +205,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/dynamic_filters.cc',
'src/core/ext/filters/client_channel/dynamic_filters.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
@ -1357,6 +1359,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/dynamic_filters.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_connect_handshaker.h',

@ -120,6 +120,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/config_selector.cc )
s.files += %w( src/core/ext/filters/client_channel/config_selector.h )
s.files += %w( src/core/ext/filters/client_channel/connector.h )
s.files += %w( src/core/ext/filters/client_channel/dynamic_filters.cc )
s.files += %w( src/core/ext/filters/client_channel/dynamic_filters.h )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.cc )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.h )
s.files += %w( src/core/ext/filters/client_channel/health/health_check_client.cc )

@ -461,6 +461,7 @@
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/dynamic_filters.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
@ -1092,6 +1093,7 @@
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/dynamic_filters.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',

@ -100,6 +100,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/config_selector.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/config_selector.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/connector.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/dynamic_filters.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/dynamic_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/global_subchannel_pool.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/global_subchannel_pool.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/health/health_check_client.cc" role="src" />

@ -42,6 +42,7 @@
#include "src/core/ext/filters/client_channel/backend_metric.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/dynamic_filters.h"
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
@ -99,9 +100,17 @@
// send_trailing_metadata
#define MAX_PENDING_BATCHES 6
// Channel arg containing a pointer to the ChannelData object.
#define GRPC_ARG_CLIENT_CHANNEL_DATA "grpc.internal.client_channel_data"
// Channel arg containing a pointer to the RetryThrottleData object.
#define GRPC_ARG_RETRY_THROTTLE_DATA "grpc.internal.retry_throttle_data"
namespace grpc_core {
using internal::ClientChannelGlobalParsedConfig;
using internal::ClientChannelMethodParsedConfig;
using internal::ClientChannelServiceConfigParser;
using internal::ServerRetryThrottleData;
TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
@ -113,7 +122,6 @@ namespace {
// ChannelData definition
//
class RetryingCall;
class LoadBalancedCall;
class ChannelData {
@ -159,13 +167,13 @@ class ChannelData {
grpc_error* resolver_transient_failure_error() const {
return resolver_transient_failure_error_;
}
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
return retry_throttle_data_;
}
RefCountedPtr<ServiceConfig> service_config() const {
return service_config_;
}
ConfigSelector* config_selector() const { return config_selector_.get(); }
RefCountedPtr<DynamicFilters> dynamic_filters() const {
return dynamic_filters_;
}
Mutex* data_plane_mu() const { return &data_plane_mu_; }
// These methods all require holding data_plane_mu_.
@ -334,9 +342,9 @@ class ChannelData {
// Data from service config.
grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE;
bool received_service_config_data_ = false;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
RefCountedPtr<ServiceConfig> service_config_;
RefCountedPtr<ConfigSelector> config_selector_;
RefCountedPtr<DynamicFilters> dynamic_filters_;
//
// Fields used in the data plane. Guarded by data_plane_mu_.
@ -479,7 +487,7 @@ class CallData {
void InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
grpc_transport_stream_op_batch* batch);
static void CreateLbCall(void* arg, grpc_error* error);
void CreateDynamicCall(grpc_call_element* elem);
// State for handling deadlines.
// The code in deadline_filter.c requires this to be the first field.
@ -496,7 +504,6 @@ class CallData {
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
bool enable_retries_;
grpc_polling_entity* pollent_ = nullptr;
@ -508,15 +515,13 @@ class CallData {
ChannelData::ResolverQueuedCall resolver_queued_call_;
ResolverQueuedCallCanceller* resolver_call_canceller_ = nullptr;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy_ = nullptr;
std::function<void()> on_call_committed_;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
RetryingCall* retrying_call_ = nullptr;
RefCountedPtr<LoadBalancedCall> lb_call_;
RefCountedPtr<DynamicFilters> dynamic_filters_;
RefCountedPtr<DynamicFilters::Call> dynamic_call_;
// Batches are added to this list when received from above.
// They are removed when we are done handling the batch (i.e., when
@ -1037,6 +1042,209 @@ class LoadBalancedCall {
grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
};
//
// dynamic termination filter
//
// Channel arg pointer vtable for GRPC_ARG_CLIENT_CHANNEL_DATA.
void* ChannelDataArgCopy(void* p) { return p; }
void ChannelDataArgDestroy(void* p) {}
int ChannelDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
const grpc_arg_pointer_vtable kChannelDataArgPointerVtable = {
ChannelDataArgCopy, ChannelDataArgDestroy, ChannelDataArgCmp};
// Channel arg pointer vtable for GRPC_ARG_RETRY_THROTTLE_DATA.
void* RetryThrottleDataArgCopy(void* p) {
auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
retry_throttle_data->Ref().release();
return p;
}
void RetryThrottleDataArgDestroy(void* p) {
auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
retry_throttle_data->Unref();
}
int RetryThrottleDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
const grpc_arg_pointer_vtable kRetryThrottleDataArgPointerVtable = {
RetryThrottleDataArgCopy, RetryThrottleDataArgDestroy,
RetryThrottleDataArgCmp};
class DynamicTerminationFilterChannelData {
public:
static grpc_error* Init(grpc_channel_element* elem,
grpc_channel_element_args* args);
static void Destroy(grpc_channel_element* elem) {
auto* chand =
static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
chand->~DynamicTerminationFilterChannelData();
}
// Will never be called.
static void StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op) {}
static void GetChannelInfo(grpc_channel_element* elem,
const grpc_channel_info* info) {}
ChannelData* chand() const { return chand_; }
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
return retry_throttle_data_;
}
private:
static RefCountedPtr<ServerRetryThrottleData> GetRetryThrottleDataFromArgs(
const grpc_channel_args* args) {
auto* retry_throttle_data =
grpc_channel_args_find_pointer<ServerRetryThrottleData>(
args, GRPC_ARG_RETRY_THROTTLE_DATA);
if (retry_throttle_data == nullptr) return nullptr;
return retry_throttle_data->Ref();
}
explicit DynamicTerminationFilterChannelData(const grpc_channel_args* args)
: chand_(grpc_channel_args_find_pointer<ChannelData>(
args, GRPC_ARG_CLIENT_CHANNEL_DATA)),
retry_throttle_data_(GetRetryThrottleDataFromArgs(args)) {}
ChannelData* chand_;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
};
class DynamicTerminationFilterCallData {
public:
static grpc_error* Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) DynamicTerminationFilterCallData(*args);
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure) {
auto* calld =
static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
auto* chand =
static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
RefCountedPtr<SubchannelCall> subchannel_call;
if (chand->chand()->enable_retries()) {
if (GPR_LIKELY(calld->retrying_call_ != nullptr)) {
subchannel_call = calld->retrying_call_->subchannel_call();
calld->retrying_call_->~RetryingCall();
}
} else {
if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
subchannel_call = calld->lb_call_->subchannel_call();
}
}
calld->~DynamicTerminationFilterCallData();
if (GPR_LIKELY(subchannel_call != nullptr)) {
subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
} else {
// TODO(yashkt) : This can potentially be a Closure::Run
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
}
}
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* calld =
static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
auto* chand =
static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
if (chand->chand()->enable_retries()) {
calld->retrying_call_->StartTransportStreamOpBatch(batch);
} else {
calld->lb_call_->StartTransportStreamOpBatch(batch);
}
}
static void SetPollent(grpc_call_element* elem,
grpc_polling_entity* pollent) {
auto* calld =
static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
auto* chand =
static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
ChannelData* client_channel = chand->chand();
grpc_call_element_args args = {
calld->owning_call_, nullptr,
calld->call_context_, calld->path_,
calld->call_start_time_, calld->deadline_,
calld->arena_, calld->call_combiner_};
if (client_channel->enable_retries()) {
// Get retry settings from service config.
auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
GPR_ASSERT(svc_cfg_call_data != nullptr);
auto* method_config = static_cast<const ClientChannelMethodParsedConfig*>(
svc_cfg_call_data->GetMethodParsedConfig(
ClientChannelServiceConfigParser::ParserIndex()));
// Create retrying call.
calld->retrying_call_ = calld->arena_->New<RetryingCall>(
client_channel, args, pollent, chand->retry_throttle_data(),
method_config == nullptr ? nullptr : method_config->retry_policy());
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(
GPR_INFO,
"chand=%p dymamic_termination_calld=%p: create retrying_call=%p",
client_channel, calld, calld->retrying_call_);
}
} else {
calld->lb_call_ =
LoadBalancedCall::Create(client_channel, args, pollent, 0);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p dynamic_termination_calld=%p: create lb_call=%p",
chand, client_channel, calld->lb_call_.get());
}
}
}
private:
explicit DynamicTerminationFilterCallData(const grpc_call_element_args& args)
: path_(grpc_slice_ref_internal(args.path)),
call_start_time_(args.start_time),
deadline_(args.deadline),
arena_(args.arena),
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
call_context_(args.context) {}
~DynamicTerminationFilterCallData() { grpc_slice_unref_internal(path_); }
grpc_slice path_; // Request path.
gpr_cycle_counter call_start_time_;
grpc_millis deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
RetryingCall* retrying_call_ = nullptr;
RefCountedPtr<LoadBalancedCall> lb_call_;
};
const grpc_channel_filter kDynamicTerminationFilterVtable = {
DynamicTerminationFilterCallData::StartTransportStreamOpBatch,
DynamicTerminationFilterChannelData::StartTransportOp,
sizeof(DynamicTerminationFilterCallData),
DynamicTerminationFilterCallData::Init,
DynamicTerminationFilterCallData::SetPollent,
DynamicTerminationFilterCallData::Destroy,
sizeof(DynamicTerminationFilterChannelData),
DynamicTerminationFilterChannelData::Init,
DynamicTerminationFilterChannelData::Destroy,
DynamicTerminationFilterChannelData::GetChannelInfo,
"dynamic_filter_termination",
};
grpc_error* DynamicTerminationFilterChannelData::Init(
grpc_channel_element* elem, grpc_channel_element_args* args) {
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &kDynamicTerminationFilterVtable);
new (elem->channel_data)
DynamicTerminationFilterChannelData(args->channel_args);
return GRPC_ERROR_NONE;
}
//
// ChannelData::SubchannelWrapper
//
@ -2073,6 +2281,18 @@ void ChannelData::UpdateServiceConfigInControlPlaneLocked(
}
void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
// Grab ref to service config.
RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
// Grab ref to config selector. Use default if resolver didn't supply one.
RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
saved_config_selector_.get());
}
if (config_selector == nullptr) {
config_selector =
MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
}
// Get retry throttle data from service config.
const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
static_cast<const internal::ClientChannelGlobalParsedConfig*>(
@ -2086,18 +2306,25 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
server_name_, retry_throttle_config.value().max_milli_tokens,
retry_throttle_config.value().milli_token_ratio);
}
// Grab ref to service config.
RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
// Grab ref to config selector. Use default if resolver didn't supply one.
RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
saved_config_selector_.get());
}
if (config_selector == nullptr) {
config_selector =
MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
}
// Construct per-LB filter stack.
std::vector<const grpc_channel_filter*> filters =
config_selector->GetFilters();
filters.push_back(&kDynamicTerminationFilterVtable);
absl::InlinedVector<grpc_arg, 2> args_to_add;
args_to_add.push_back(grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL_DATA), this,
&kChannelDataArgPointerVtable));
if (retry_throttle_data != nullptr) {
args_to_add.push_back(grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_RETRY_THROTTLE_DATA),
retry_throttle_data.get(), &kRetryThrottleDataArgPointerVtable));
}
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
channel_args_, args_to_add.data(), args_to_add.size());
RefCountedPtr<DynamicFilters> dynamic_filters =
DynamicFilters::Create(new_args, std::move(filters));
GPR_ASSERT(dynamic_filters != nullptr);
grpc_channel_args_destroy(new_args);
// Grab data plane lock to update service config.
//
// We defer unreffing the old values (and deallocating memory) until
@ -2110,9 +2337,9 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
// Update service config.
received_service_config_data_ = true;
// Old values will be unreffed after lock is released.
retry_throttle_data_.swap(retry_throttle_data);
service_config_.swap(service_config);
config_selector_.swap(config_selector);
dynamic_filters_.swap(dynamic_filters);
// Process calls that were queued waiting for the resolver result.
for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
call = call->next) {
@ -2194,13 +2421,13 @@ void ChannelData::UpdateStateAndPickerLocked(
// the refs until after we release the lock, and then unref them at
// that point. This includes the following:
// - refs to subchannel wrappers in the keys of pending_subchannel_updates_
// - ref stored in retry_throttle_data_
// - ref stored in service_config_
// - ref stored in config_selector_
// - ref stored in dynamic_filters_
// - ownership of the existing picker in picker_
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_to_unref;
RefCountedPtr<ServiceConfig> service_config_to_unref;
RefCountedPtr<ConfigSelector> config_selector_to_unref;
RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
{
MutexLock lock(&data_plane_mu_);
// Handle subchannel updates.
@ -2223,9 +2450,9 @@ void ChannelData::UpdateStateAndPickerLocked(
if (picker_ == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
received_service_config_data_ = false;
// Note: We save the objects to unref until after the lock is released.
retry_throttle_data_to_unref = std::move(retry_throttle_data_);
service_config_to_unref = std::move(service_config_);
config_selector_to_unref = std::move(config_selector_);
dynamic_filters_to_unref = std::move(dynamic_filters_);
}
// Re-process queued picks.
for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
@ -2431,8 +2658,7 @@ CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
arena_(args.arena),
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
call_context_(args.context),
enable_retries_(chand.enable_retries()) {}
call_context_(args.context) {}
CallData::~CallData() {
grpc_slice_unref_internal(path_);
@ -2454,20 +2680,11 @@ void CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
CallData* calld = static_cast<CallData*>(elem->call_data);
RefCountedPtr<SubchannelCall> subchannel_call;
if (calld->enable_retries_) {
if (GPR_LIKELY(calld->retrying_call_ != nullptr)) {
subchannel_call = calld->retrying_call_->subchannel_call();
calld->retrying_call_->~RetryingCall();
}
} else {
if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
subchannel_call = calld->lb_call_->subchannel_call();
}
}
RefCountedPtr<DynamicFilters::Call> dynamic_call =
std::move(calld->dynamic_call_);
calld->~CallData();
if (GPR_LIKELY(subchannel_call != nullptr)) {
subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
if (GPR_LIKELY(dynamic_call != nullptr)) {
dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
} else {
// TODO(yashkt) : This can potentially be a Closure::Run
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
@ -2511,10 +2728,10 @@ void CallData::StartTransportStreamOpBatch(
gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
calld, grpc_error_string(calld->cancel_error_));
}
// If we do not have an LB call (i.e., a pick has not yet been started),
// fail all pending batches. Otherwise, send the cancellation down to the
// LB call.
if (calld->lb_call_ == nullptr && calld->retrying_call_ == nullptr) {
// If we do not have a dynamic call (i.e., name resolution has not
// yet completed), fail all pending batches. Otherwise, send the
// cancellation down to the dynamic call.
if (calld->dynamic_call_ == nullptr) {
calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
NoYieldCallCombiner);
// Note: This will release the call combiner.
@ -2522,32 +2739,27 @@ void CallData::StartTransportStreamOpBatch(
batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
} else {
// Note: This will release the call combiner.
if (calld->lb_call_ != nullptr) {
calld->lb_call_->StartTransportStreamOpBatch(batch);
} else {
GPR_ASSERT(calld->retrying_call_ != nullptr);
calld->retrying_call_->StartTransportStreamOpBatch(batch);
}
calld->dynamic_call_->StartTransportStreamOpBatch(batch);
}
return;
}
// Add the batch to the pending list.
calld->PendingBatchesAdd(elem, batch);
// Check if we've already created an LB call.
// Note that once we have created an LB call, we do not need to acquire
// the channel's resolution mutex, which is more efficient (especially for
// streaming calls).
if (calld->lb_call_ != nullptr || calld->retrying_call_ != nullptr) {
// Check if we've already created a dynamic call.
// Note that once we have done so, we do not need to acquire the channel's
// resolution mutex, which is more efficient (especially for streaming calls).
if (calld->dynamic_call_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch", chand, calld);
gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
chand, calld, calld->dynamic_call_.get());
}
calld->PendingBatchesResume(elem);
return;
}
// We do not yet have an LB call.
// We do not yet have a dynamic call.
// For batches containing a send_initial_metadata op, acquire the
// channel's resolution mutex to apply the service config to the call,
// after which we will create an LB call.
// after which we will create a dynamic call.
if (GPR_LIKELY(batch->send_initial_metadata)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
@ -2659,11 +2871,7 @@ void CallData::ResumePendingBatchInCallCombiner(void* arg,
static_cast<grpc_call_element*>(batch->handler_private.extra_arg);
auto* calld = static_cast<CallData*>(elem->call_data);
// Note: This will release the call combiner.
if (calld->enable_retries_) {
calld->retrying_call_->StartTransportStreamOpBatch(batch);
} else {
calld->lb_call_->StartTransportStreamOpBatch(batch);
}
calld->dynamic_call_->StartTransportStreamOpBatch(batch);
}
// This is called via the call combiner, so access to calld is synchronized.
@ -2677,8 +2885,8 @@ void CallData::PendingBatchesResume(grpc_call_element* elem) {
}
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" pending batches: lb_call=%p retrying_call=%p",
chand, this, num_batches, lb_call_.get(), retrying_call_);
" pending batches on dynamic_call=%p",
chand, this, num_batches, dynamic_call_.get());
}
CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
@ -2824,11 +3032,9 @@ grpc_error* CallData::ApplyServiceConfigToCallLocked(
*send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
}
}
// Save retry policy.
retry_policy_ = method_params->retry_policy();
}
// Set retry throttle data for call.
retry_throttle_data_ = chand->retry_throttle_data();
// Set the dynamic filter stack.
dynamic_filters_ = chand->dynamic_filters();
}
return GRPC_ERROR_NONE;
}
@ -2876,7 +3082,7 @@ void CallData::ResolutionDone(void* arg, grpc_error* error) {
calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
return;
}
calld->CreateLbCall(elem, GRPC_ERROR_NONE);
calld->CreateDynamicCall(elem);
}
void CallData::CheckResolution(void* arg, grpc_error* error) {
@ -2955,31 +3161,29 @@ bool CallData::CheckResolutionLocked(grpc_call_element* elem,
return true;
}
void CallData::CreateLbCall(void* arg, grpc_error* /*error*/) {
auto* elem = static_cast<grpc_call_element*>(arg);
void CallData::CreateDynamicCall(grpc_call_element* elem) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
auto* calld = static_cast<CallData*>(elem->call_data);
grpc_call_element_args args = {
calld->owning_call_, nullptr,
calld->call_context_, calld->path_,
calld->call_start_time_, calld->deadline_,
calld->arena_, calld->call_combiner_};
if (calld->enable_retries_) {
calld->retrying_call_ = calld->arena_->New<RetryingCall>(
chand, args, calld->pollent_, std::move(calld->retry_throttle_data_),
calld->retry_policy_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: create retrying_call=%p", chand,
calld, calld->retrying_call_);
}
} else {
calld->lb_call_ = LoadBalancedCall::Create(chand, args, calld->pollent_, 0);
DynamicFilters::Call::Args args = {std::move(dynamic_filters_),
pollent_,
path_,
call_start_time_,
deadline_,
arena_,
call_context_,
call_combiner_};
grpc_error* error = GRPC_ERROR_NONE;
DynamicFilters* channel_stack = args.channel_stack.get();
dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
if (error != GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: create lb_call=%p", chand, calld,
calld->lb_call_.get());
gpr_log(GPR_INFO,
"chand=%p calld=%p: failed to create dynamic call: error=%s",
chand, this, grpc_error_string(error));
}
PendingBatchesFail(elem, error, YieldCallCombiner);
return;
}
calld->PendingBatchesResume(elem);
PendingBatchesResume(elem);
}
//

@ -21,6 +21,7 @@
#include <functional>
#include <map>
#include <vector>
#include "absl/strings/string_view.h"
@ -28,6 +29,7 @@
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/service_config_parser.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gprpp/arena.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -80,6 +82,8 @@ class ConfigSelector : public RefCounted<ConfigSelector> {
return cs1->Equals(cs2);
}
virtual std::vector<const grpc_channel_filter*> GetFilters() { return {}; }
virtual CallConfig GetCallConfig(GetCallConfigArgs args) = 0;
grpc_arg MakeChannelArg() const;

@ -0,0 +1,186 @@
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/dynamic_filters.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/surface/lame_client.h"
// Conversion between call and call stack.
#define CALL_TO_CALL_STACK(call) \
(grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
sizeof(DynamicFilters::Call)))
#define CALL_STACK_TO_CALL(callstack) \
(DynamicFilters::Call*)(((char*)(call_stack)) - \
GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
sizeof(DynamicFilters::Call)))
namespace grpc_core {
//
// DynamicFilters::Call
//
DynamicFilters::Call::Call(Args args, grpc_error** error)
: channel_stack_(std::move(args.channel_stack)) {
grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
const grpc_call_element_args call_args = {
call_stack, /* call_stack */
nullptr, /* server_transport_data */
args.context, /* context */
args.path, /* path */
args.start_time, /* start_time */
args.deadline, /* deadline */
args.arena, /* arena */
args.call_combiner /* call_combiner */
};
*error = grpc_call_stack_init(channel_stack_->channel_stack_, 1, Destroy,
this, &call_args);
if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) {
const char* error_string = grpc_error_string(*error);
gpr_log(GPR_ERROR, "error: %s", error_string);
return;
}
grpc_call_stack_set_pollset_or_pollset_set(call_stack, args.pollent);
}
void DynamicFilters::Call::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) {
grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
}
void DynamicFilters::Call::SetAfterCallStackDestroy(grpc_closure* closure) {
GPR_ASSERT(after_call_stack_destroy_ == nullptr);
GPR_ASSERT(closure != nullptr);
after_call_stack_destroy_ = closure;
}
RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref() {
IncrementRefCount();
return RefCountedPtr<DynamicFilters::Call>(this);
}
RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref(
const grpc_core::DebugLocation& location, const char* reason) {
IncrementRefCount(location, reason);
return RefCountedPtr<DynamicFilters::Call>(this);
}
void DynamicFilters::Call::Unref() {
GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), "");
}
void DynamicFilters::Call::Unref(const DebugLocation& /*location*/,
const char* reason) {
GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), reason);
}
void DynamicFilters::Call::Destroy(void* arg, grpc_error* /*error*/) {
DynamicFilters::Call* self = static_cast<DynamicFilters::Call*>(arg);
// Keep some members before destroying the subchannel call.
grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
RefCountedPtr<DynamicFilters> channel_stack = std::move(self->channel_stack_);
// Destroy the subchannel call.
self->~Call();
// Destroy the call stack. This should be after destroying the call, because
// call->after_call_stack_destroy(), if not null, will free the call arena.
grpc_call_stack_destroy(CALL_TO_CALL_STACK(self), nullptr,
after_call_stack_destroy);
// Automatically reset channel_stack. This should be after destroying the call
// stack, because destroying call stack needs access to the channel stack.
}
void DynamicFilters::Call::IncrementRefCount() {
GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), "");
}
void DynamicFilters::Call::IncrementRefCount(
const grpc_core::DebugLocation& /*location*/, const char* reason) {
GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), reason);
}
//
// DynamicFilters
//
namespace {
void DestroyChannelStack(void* arg, grpc_error* /*error*/) {
grpc_channel_stack* channel_stack = static_cast<grpc_channel_stack*>(arg);
grpc_channel_stack_destroy(channel_stack);
gpr_free(channel_stack);
}
std::pair<grpc_channel_stack*, grpc_error*> CreateChannelStack(
const grpc_channel_args* args,
std::vector<const grpc_channel_filter*> filters) {
// Allocate memory for channel stack.
const size_t channel_stack_size =
grpc_channel_stack_size(filters.data(), filters.size());
grpc_channel_stack* channel_stack =
reinterpret_cast<grpc_channel_stack*>(gpr_zalloc(channel_stack_size));
// Initialize stack.
grpc_error* error = grpc_channel_stack_init(
/*initial_refs=*/1, DestroyChannelStack, channel_stack, filters.data(),
filters.size(), args, /*optional_transport=*/nullptr, "DynamicFilters",
channel_stack);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "error initializing client internal stack: %s",
grpc_error_string(error));
grpc_channel_stack_destroy(channel_stack);
gpr_free(channel_stack);
return {nullptr, error};
}
return {channel_stack, GRPC_ERROR_NONE};
}
} // namespace
RefCountedPtr<DynamicFilters> DynamicFilters::Create(
const grpc_channel_args* args,
std::vector<const grpc_channel_filter*> filters) {
// Attempt to create channel stack from requested filters.
auto p = CreateChannelStack(args, std::move(filters));
if (p.second != GRPC_ERROR_NONE) {
// Initial pass failed. Create with lame filter.
grpc_error* error = p.second;
p = CreateChannelStack(args, {&grpc_lame_filter});
GPR_ASSERT(p.second == GRPC_ERROR_NONE);
grpc_channel_element* elem = grpc_channel_stack_element(p.first, 0);
SetLameFilterError(elem, error);
}
return MakeRefCounted<DynamicFilters>(p.first);
}
DynamicFilters::~DynamicFilters() {
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "~DynamicFilters");
}
RefCountedPtr<DynamicFilters::Call> DynamicFilters::CreateCall(
DynamicFilters::Call::Args args, grpc_error** error) {
size_t allocation_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(Call)) +
channel_stack_->call_stack_size;
Call* call = static_cast<Call*>(args.arena->Alloc(allocation_size));
new (call) Call(std::move(args), error);
return call;
}
} // namespace grpc_core

@ -0,0 +1,99 @@
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_DYNAMIC_FILTERS_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_DYNAMIC_FILTERS_H
#include <grpc/support/port_platform.h>
#include <vector>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/arena.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
namespace grpc_core {
class DynamicFilters : public RefCounted<DynamicFilters> {
public:
// Implements the interface of RefCounted<>.
class Call {
public:
struct Args {
RefCountedPtr<DynamicFilters> channel_stack;
grpc_polling_entity* pollent;
grpc_slice path;
gpr_cycle_counter start_time;
grpc_millis deadline;
Arena* arena;
grpc_call_context_element* context;
CallCombiner* call_combiner;
};
Call(Args args, grpc_error** error);
// Continues processing a transport stream op batch.
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Sets the 'then_schedule_closure' argument for call stack destruction.
// Must be called once per call.
void SetAfterCallStackDestroy(grpc_closure* closure);
// Interface of RefCounted<>.
RefCountedPtr<Call> Ref() GRPC_MUST_USE_RESULT;
RefCountedPtr<Call> Ref(const DebugLocation& location,
const char* reason) GRPC_MUST_USE_RESULT;
// When refcount drops to 0, destroys itself and the associated call stack,
// but does NOT free the memory because it's in the call arena.
void Unref();
void Unref(const DebugLocation& location, const char* reason);
private:
// Allow RefCountedPtr<> to access IncrementRefCount().
template <typename T>
friend class RefCountedPtr;
// Interface of RefCounted<>.
void IncrementRefCount();
void IncrementRefCount(const DebugLocation& location, const char* reason);
static void Destroy(void* arg, grpc_error* error);
RefCountedPtr<DynamicFilters> channel_stack_;
grpc_closure* after_call_stack_destroy_ = nullptr;
};
static RefCountedPtr<DynamicFilters> Create(
const grpc_channel_args* args,
std::vector<const grpc_channel_filter*> filters);
explicit DynamicFilters(grpc_channel_stack* channel_stack)
: channel_stack_(channel_stack) {}
~DynamicFilters() override;
RefCountedPtr<Call> CreateCall(Call::Args args, grpc_error** error);
private:
grpc_channel_stack* channel_stack_;
};
} // namespace grpc_core
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_DYNAMIC_FILTERS_H

@ -118,6 +118,12 @@ static void lame_destroy_channel_elem(grpc_channel_element* elem) {
} // namespace
void SetLameFilterError(grpc_channel_element* elem, grpc_error* error) {
GPR_ASSERT(elem->filter == &grpc_lame_filter);
auto chand = static_cast<grpc_core::ChannelData*>(elem->channel_data);
chand->error = error;
}
} // namespace grpc_core
const grpc_channel_filter grpc_lame_filter = {
@ -148,14 +154,12 @@ grpc_channel* grpc_lame_client_channel_create(const char* target,
"grpc_lame_client_channel_create(target=%s, error_code=%d, "
"error_message=%s)",
3, (target, (int)error_code, error_message));
GPR_ASSERT(elem->filter == &grpc_lame_filter);
auto chand = static_cast<grpc_core::ChannelData*>(elem->channel_data);
chand->error = grpc_error_set_str(
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
GRPC_ERROR_INT_GRPC_STATUS, error_code),
GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_from_static_string(error_message));
grpc_core::SetLameFilterError(
elem, grpc_error_set_str(
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
GRPC_ERROR_INT_GRPC_STATUS, error_code),
GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_from_static_string(error_message)));
return channel;
}

@ -25,4 +25,8 @@
extern const grpc_channel_filter grpc_lame_filter;
namespace grpc_core {
void SetLameFilterError(grpc_channel_element* elem, grpc_error* error);
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_SURFACE_LAME_CLIENT_H */

@ -24,6 +24,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/dynamic_filters.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',

@ -1055,6 +1055,8 @@ src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/config_selector.h \
src/core/ext/filters/client_channel/connector.h \
src/core/ext/filters/client_channel/dynamic_filters.cc \
src/core/ext/filters/client_channel/dynamic_filters.h \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.h \
src/core/ext/filters/client_channel/health/health_check_client.cc \

@ -881,6 +881,8 @@ src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/config_selector.h \
src/core/ext/filters/client_channel/connector.h \
src/core/ext/filters/client_channel/dynamic_filters.cc \
src/core/ext/filters/client_channel/dynamic_filters.h \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.h \
src/core/ext/filters/client_channel/health/health_check_client.cc \

Loading…
Cancel
Save