[client channel] rename ClientChannel to ClientChannelFilter

pull/35783/head
Mark D. Roth 10 months ago
parent c05a4356d0
commit ea8b74a33a
  1. 20
      src/core/ext/filters/client_channel/channel_connectivity.cc
  2. 410
      src/core/ext/filters/client_channel/client_channel.cc
  3. 60
      src/core/ext/filters/client_channel/client_channel.h
  4. 2
      src/core/ext/filters/client_channel/client_channel_internal.h
  5. 4
      src/core/ext/filters/client_channel/client_channel_plugin.cc
  6. 8
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  7. 8
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  8. 2
      src/core/ext/filters/client_channel/retry_filter.cc
  9. 4
      src/core/ext/filters/client_channel/retry_filter.h
  10. 5
      src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc
  11. 7
      src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h
  12. 8
      src/core/ext/xds/xds_transport_grpc.cc
  13. 12
      test/core/client_channel/client_channel_test.cc

@ -67,8 +67,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
(c_channel, try_to_connect)); (c_channel, try_to_connect));
grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel); grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel);
// Forward through to the underlying client channel. // Forward through to the underlying client channel.
grpc_core::ClientChannel* client_channel = grpc_core::ClientChannelFilter* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel); grpc_core::ClientChannelFilter::GetFromChannel(channel);
if (GPR_UNLIKELY(client_channel == nullptr)) { if (GPR_UNLIKELY(client_channel == nullptr)) {
if (grpc_core::IsLameChannel(channel)) { if (grpc_core::IsLameChannel(channel)) {
return GRPC_CHANNEL_TRANSIENT_FAILURE; return GRPC_CHANNEL_TRANSIENT_FAILURE;
@ -83,8 +83,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) { int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) {
grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel); grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel);
grpc_core::ClientChannel* client_channel = grpc_core::ClientChannelFilter* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel); grpc_core::ClientChannelFilter::GetFromChannel(channel);
if (client_channel == nullptr) { if (client_channel == nullptr) {
if (!grpc_core::IsLameChannel(channel)) { if (!grpc_core::IsLameChannel(channel)) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
@ -97,7 +97,7 @@ int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) {
} }
int grpc_channel_support_connectivity_watcher(grpc_channel* channel) { int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
return grpc_core::ClientChannel::GetFromChannel( return grpc_core::ClientChannelFilter::GetFromChannel(
grpc_core::Channel::FromC(channel)) != nullptr; grpc_core::Channel::FromC(channel)) != nullptr;
} }
@ -115,8 +115,8 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
state_(last_observed_state) { state_(last_observed_state) {
GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr); GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
ClientChannel* client_channel = ClientChannelFilter* client_channel =
ClientChannel::GetFromChannel(channel_.get()); ClientChannelFilter::GetFromChannel(channel_.get());
if (client_channel == nullptr) { if (client_channel == nullptr) {
// If the target URI used to create the channel was invalid, channel // If the target URI used to create the channel was invalid, channel
// stack initialization failed, and that caused us to create a lame // stack initialization failed, and that caused us to create a lame
@ -145,7 +145,7 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
private: private:
// A fire-and-forget object used to delay starting the timer until the // A fire-and-forget object used to delay starting the timer until the
// ClientChannel actually starts the watch. // ClientChannelFilter actually starts the watch.
class WatcherTimerInitState { class WatcherTimerInitState {
public: public:
WatcherTimerInitState(StateWatcher* state_watcher, Timestamp deadline) WatcherTimerInitState(StateWatcher* state_watcher, Timestamp deadline)
@ -201,8 +201,8 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
void TimeoutComplete() { void TimeoutComplete() {
timer_fired_ = true; timer_fired_ = true;
// If this is a client channel (not a lame channel), cancel the watch. // If this is a client channel (not a lame channel), cancel the watch.
ClientChannel* client_channel = ClientChannelFilter* client_channel =
ClientChannel::GetFromChannel(channel_.get()); ClientChannelFilter::GetFromChannel(channel_.get());
if (client_channel != nullptr) { if (client_channel != nullptr) {
client_channel->CancelExternalConnectivityWatcher(&on_complete_); client_channel->CancelExternalConnectivityWatcher(&on_complete_);
} }

File diff suppressed because it is too large Load Diff

@ -87,8 +87,8 @@
// Channel arg key for server URI string. // Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri" #define GRPC_ARG_SERVER_URI "grpc.server_uri"
// Channel arg containing a pointer to the ClientChannel object. // Channel arg containing a pointer to the ClientChannelFilter object.
#define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel" #define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel_filter"
// Max number of batches that can be pending on a call at any given // Max number of batches that can be pending on a call at any given
// time. This includes one batch for each of the following ops: // time. This includes one batch for each of the following ops:
@ -102,7 +102,7 @@
namespace grpc_core { namespace grpc_core {
class ClientChannel { class ClientChannelFilter {
public: public:
static const grpc_channel_filter kFilterVtableWithPromises; static const grpc_channel_filter kFilterVtableWithPromises;
static const grpc_channel_filter kFilterVtableWithoutPromises; static const grpc_channel_filter kFilterVtableWithoutPromises;
@ -115,9 +115,9 @@ class ClientChannel {
struct RawPointerChannelArgTag {}; struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() { return GRPC_ARG_CLIENT_CHANNEL; } static absl::string_view ChannelArgName() { return GRPC_ARG_CLIENT_CHANNEL; }
// Returns the ClientChannel object from channel, or null if channel // Returns the ClientChannelFilter object from channel, or null if channel
// is not a client channel. // is not a client channel.
static ClientChannel* GetFromChannel(Channel* channel); static ClientChannelFilter* GetFromChannel(Channel* channel);
static ArenaPromise<ServerMetadataHandle> MakeCallPromise( static ArenaPromise<ServerMetadataHandle> MakeCallPromise(
grpc_channel_element* elem, CallArgs call_args, grpc_channel_element* elem, CallArgs call_args,
@ -196,7 +196,7 @@ class ClientChannel {
// via grpc_client_channel_watch_connectivity_state(). // via grpc_client_channel_watch_connectivity_state().
class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface { class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface {
public: public:
ExternalConnectivityWatcher(ClientChannel* chand, ExternalConnectivityWatcher(ClientChannelFilter* chand,
grpc_polling_entity pollent, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_connectivity_state* state,
grpc_closure* on_complete, grpc_closure* on_complete,
@ -205,7 +205,7 @@ class ClientChannel {
~ExternalConnectivityWatcher() override; ~ExternalConnectivityWatcher() override;
// Removes the watcher from the external_watchers_ map. // Removes the watcher from the external_watchers_ map.
static void RemoveWatcherFromExternalWatchersMap(ClientChannel* chand, static void RemoveWatcherFromExternalWatchersMap(ClientChannelFilter* chand,
grpc_closure* on_complete, grpc_closure* on_complete,
bool cancel); bool cancel);
@ -222,7 +222,7 @@ class ClientChannel {
void RemoveWatcherLocked() void RemoveWatcherLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_);
ClientChannel* chand_; ClientChannelFilter* chand_;
grpc_polling_entity pollent_; grpc_polling_entity pollent_;
grpc_connectivity_state initial_state_; grpc_connectivity_state initial_state_;
grpc_connectivity_state* state_; grpc_connectivity_state* state_;
@ -231,8 +231,9 @@ class ClientChannel {
std::atomic<bool> done_{false}; std::atomic<bool> done_{false};
}; };
ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error); ClientChannelFilter(grpc_channel_element_args* args,
~ClientChannel(); grpc_error_handle* error);
~ClientChannelFilter();
// Filter vtable functions. // Filter vtable functions.
static grpc_error_handle Init(grpc_channel_element* elem, static grpc_error_handle Init(grpc_channel_element* elem,
@ -378,15 +379,15 @@ class ClientChannel {
}; };
// //
// ClientChannel::LoadBalancedCall // ClientChannelFilter::LoadBalancedCall
// //
// TODO(roth): As part of simplifying cancellation in the filter stack, // TODO(roth): As part of simplifying cancellation in the filter stack,
// this should no longer need to be ref-counted. // this should no longer need to be ref-counted.
class ClientChannel::LoadBalancedCall class ClientChannelFilter::LoadBalancedCall
: public InternallyRefCounted<LoadBalancedCall, UnrefCallDtor> { : public InternallyRefCounted<LoadBalancedCall, UnrefCallDtor> {
public: public:
LoadBalancedCall(ClientChannel* chand, LoadBalancedCall(ClientChannelFilter* chand,
grpc_call_context_element* call_context, grpc_call_context_element* call_context,
absl::AnyInvocable<void()> on_commit, absl::AnyInvocable<void()> on_commit,
bool is_transparent_retry); bool is_transparent_retry);
@ -396,15 +397,15 @@ class ClientChannel::LoadBalancedCall
// Called by channel when removing a call from the list of queued calls. // Called by channel when removing a call from the list of queued calls.
void RemoveCallFromLbQueuedCallsLocked() void RemoveCallFromLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_);
// Called by the channel for each queued call when a new picker // Called by the channel for each queued call when a new picker
// becomes available. // becomes available.
virtual void RetryPickLocked() virtual void RetryPickLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_) = 0; ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_) = 0;
protected: protected:
ClientChannel* chand() const { return chand_; } ClientChannelFilter* chand() const { return chand_; }
ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const { ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const {
return static_cast<ClientCallTracer::CallAttemptTracer*>( return static_cast<ClientCallTracer::CallAttemptTracer*>(
call_context_[GRPC_CONTEXT_CALL_TRACER].value); call_context_[GRPC_CONTEXT_CALL_TRACER].value);
@ -458,13 +459,13 @@ class ClientChannel::LoadBalancedCall
grpc_error_handle* error); grpc_error_handle* error);
// Adds the call to the channel's list of queued picks if not already present. // Adds the call to the channel's list of queued picks if not already present.
void AddCallToLbQueuedCallsLocked() void AddCallToLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_);
// Called when adding the call to the LB queue. // Called when adding the call to the LB queue.
virtual void OnAddToQueueLocked() virtual void OnAddToQueueLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_) = 0; ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_) = 0;
ClientChannel* chand_; ClientChannelFilter* chand_;
absl::AnyInvocable<void()> on_commit_; absl::AnyInvocable<void()> on_commit_;
@ -477,8 +478,8 @@ class ClientChannel::LoadBalancedCall
grpc_call_context_element* const call_context_; grpc_call_context_element* const call_context_;
}; };
class ClientChannel::FilterBasedLoadBalancedCall class ClientChannelFilter::FilterBasedLoadBalancedCall
: public ClientChannel::LoadBalancedCall { : public ClientChannelFilter::LoadBalancedCall {
public: public:
// If on_call_destruction_complete is non-null, then it will be // If on_call_destruction_complete is non-null, then it will be
// invoked once the LoadBalancedCall is completely destroyed. // invoked once the LoadBalancedCall is completely destroyed.
@ -486,7 +487,7 @@ class ClientChannel::FilterBasedLoadBalancedCall
// the LB call has a subchannel call and ensuring that the // the LB call has a subchannel call and ensuring that the
// on_call_destruction_complete closure passed down from the surface // on_call_destruction_complete closure passed down from the surface
// is not invoked until after the subchannel call stack is destroyed. // is not invoked until after the subchannel call stack is destroyed.
FilterBasedLoadBalancedCall(ClientChannel* chand, FilterBasedLoadBalancedCall(ClientChannelFilter* chand,
const grpc_call_element_args& args, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete, grpc_closure* on_call_destruction_complete,
@ -555,10 +556,10 @@ class ClientChannel::FilterBasedLoadBalancedCall
void TryPick(bool was_queued); void TryPick(bool was_queued);
void OnAddToQueueLocked() override void OnAddToQueueLocked() override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_);
void RetryPickLocked() override void RetryPickLocked() override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_);
void CreateSubchannelCall(); void CreateSubchannelCall();
@ -579,9 +580,8 @@ class ClientChannel::FilterBasedLoadBalancedCall
// Set when we fail inside the LB call. // Set when we fail inside the LB call.
grpc_error_handle failure_error_; grpc_error_handle failure_error_;
// Accessed while holding ClientChannel::lb_mu_.
LbQueuedCallCanceller* lb_call_canceller_ LbQueuedCallCanceller* lb_call_canceller_
ABSL_GUARDED_BY(&ClientChannel::lb_mu_) = nullptr; ABSL_GUARDED_BY(&ClientChannelFilter::lb_mu_) = nullptr;
RefCountedPtr<SubchannelCall> subchannel_call_; RefCountedPtr<SubchannelCall> subchannel_call_;
@ -604,10 +604,10 @@ class ClientChannel::FilterBasedLoadBalancedCall
grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {}; grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
}; };
class ClientChannel::PromiseBasedLoadBalancedCall class ClientChannelFilter::PromiseBasedLoadBalancedCall
: public ClientChannel::LoadBalancedCall { : public ClientChannelFilter::LoadBalancedCall {
public: public:
PromiseBasedLoadBalancedCall(ClientChannel* chand, PromiseBasedLoadBalancedCall(ClientChannelFilter* chand,
absl::AnyInvocable<void()> on_commit, absl::AnyInvocable<void()> on_commit,
bool is_transparent_retry); bool is_transparent_retry);
@ -622,7 +622,7 @@ class ClientChannel::PromiseBasedLoadBalancedCall
void RetryPickLocked() override; void RetryPickLocked() override;
void OnAddToQueueLocked() override void OnAddToQueueLocked() override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_);
grpc_polling_entity pollent_; grpc_polling_entity pollent_;
ClientMetadataHandle client_initial_metadata_; ClientMetadataHandle client_initial_metadata_;

@ -35,7 +35,7 @@
// //
// This file contains internal interfaces used to allow various plugins // This file contains internal interfaces used to allow various plugins
// (filters, LB policies, etc) to access internal data provided by the // (filters, LB policies, etc) to access internal data provided by the
// ClientChannel that is not normally accessible via external APIs. // ClientChannelFilter that is not normally accessible via external APIs.
// //
// Channel arg key for health check service name. // Channel arg key for health check service name.

@ -42,12 +42,12 @@ void BuildClientChannelConfiguration(CoreConfiguration::Builder* builder) {
internal::RetryServiceConfigParser::Register(builder); internal::RetryServiceConfigParser::Register(builder);
builder->channel_init() builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, ->RegisterFilter(GRPC_CLIENT_CHANNEL,
&ClientChannel::kFilterVtableWithPromises) &ClientChannelFilter::kFilterVtableWithPromises)
.If(IsEverythingBelowClientChannelPromiseSafe) .If(IsEverythingBelowClientChannelPromiseSafe)
.Terminal(); .Terminal();
builder->channel_init() builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, ->RegisterFilter(GRPC_CLIENT_CHANNEL,
&ClientChannel::kFilterVtableWithoutPromises) &ClientChannelFilter::kFilterVtableWithoutPromises)
.IfNot(IsEverythingBelowClientChannelPromiseSafe) .IfNot(IsEverythingBelowClientChannelPromiseSafe)
.Terminal(); .Terminal();
} }

@ -1592,8 +1592,8 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
// Start watching the channel's connectivity state. If the channel // Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE before the timer fires, we go into // goes into state TRANSIENT_FAILURE before the timer fires, we go into
// fallback mode even if the fallback timeout has not elapsed. // fallback mode even if the fallback timeout has not elapsed.
ClientChannel* client_channel = ClientChannelFilter* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(lb_channel_)); ClientChannelFilter::GetFromChannel(Channel::FromC(lb_channel_));
GPR_ASSERT(client_channel != nullptr); GPR_ASSERT(client_channel != nullptr);
// Ref held by callback. // Ref held by callback.
watcher_ = watcher_ =
@ -1659,8 +1659,8 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked() {
} }
void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
ClientChannel* client_channel = ClientChannelFilter* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(lb_channel_)); ClientChannelFilter::GetFromChannel(Channel::FromC(lb_channel_));
GPR_ASSERT(client_channel != nullptr); GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_); client_channel->RemoveConnectivityWatcher(watcher_);
} }

@ -1557,8 +1557,8 @@ RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
parent_channelz_node_ = std::move(parent_channelz_node); parent_channelz_node_ = std::move(parent_channelz_node);
} }
// Start connectivity watch. // Start connectivity watch.
ClientChannel* client_channel = ClientChannelFilter* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(channel_)); ClientChannelFilter::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr); GPR_ASSERT(client_channel != nullptr);
watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher")); watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
client_channel->AddConnectivityWatcher( client_channel->AddConnectivityWatcher(
@ -1583,8 +1583,8 @@ void RlsLb::RlsChannel::Orphan() {
} }
// Stop connectivity watch. // Stop connectivity watch.
if (watcher_ != nullptr) { if (watcher_ != nullptr) {
ClientChannel* client_channel = ClientChannelFilter* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(channel_)); ClientChannelFilter::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr); GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_); client_channel->RemoveConnectivityWatcher(watcher_);
watcher_ = nullptr; watcher_ = nullptr;

@ -98,7 +98,7 @@ namespace grpc_core {
// //
RetryFilter::RetryFilter(const ChannelArgs& args, grpc_error_handle* error) RetryFilter::RetryFilter(const ChannelArgs& args, grpc_error_handle* error)
: client_channel_(args.GetObject<ClientChannel>()), : client_channel_(args.GetObject<ClientChannelFilter>()),
event_engine_(args.GetObject<EventEngine>()), event_engine_(args.GetObject<EventEngine>()),
per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)), per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)),
service_config_parser_index_( service_config_parser_index_(

@ -72,7 +72,7 @@ class RetryFilter {
return retry_throttle_data_; return retry_throttle_data_;
} }
ClientChannel* client_channel() const { return client_channel_; } ClientChannelFilter* client_channel() const { return client_channel_; }
size_t per_rpc_retry_buffer_size() const { size_t per_rpc_retry_buffer_size() const {
return per_rpc_retry_buffer_size_; return per_rpc_retry_buffer_size_;
@ -110,7 +110,7 @@ class RetryFilter {
static void GetChannelInfo(grpc_channel_element* /*elem*/, static void GetChannelInfo(grpc_channel_element* /*elem*/,
const grpc_channel_info* /*info*/) {} const grpc_channel_info* /*info*/) {}
ClientChannel* client_channel_; ClientChannelFilter* client_channel_;
grpc_event_engine::experimental::EventEngine* const event_engine_; grpc_event_engine::experimental::EventEngine* const event_engine_;
size_t per_rpc_retry_buffer_size_; size_t per_rpc_retry_buffer_size_;
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_; RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;

@ -310,7 +310,8 @@ namespace {
void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) { void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
grpc_transport_stream_op_batch* batch = grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg); static_cast<grpc_transport_stream_op_batch*>(arg);
auto* lb_call = static_cast<ClientChannel::FilterBasedLoadBalancedCall*>( auto* lb_call =
static_cast<ClientChannelFilter::FilterBasedLoadBalancedCall*>(
batch->handler_private.extra_arg); batch->handler_private.extra_arg);
// Note: This will release the call combiner. // Note: This will release the call combiner.
lb_call->StartTransportStreamOpBatch(batch); lb_call->StartTransportStreamOpBatch(batch);
@ -1710,7 +1711,7 @@ void RetryFilter::LegacyCallData::StartTransportStreamOpBatch(
call_attempt_->StartRetriableBatches(); call_attempt_->StartRetriableBatches();
} }
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall>
RetryFilter::LegacyCallData::CreateLoadBalancedCall( RetryFilter::LegacyCallData::CreateLoadBalancedCall(
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) { absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
grpc_call_element_args args = {owning_call_, nullptr, call_context_, grpc_call_element_args args = {owning_call_, nullptr, call_context_,

@ -256,7 +256,7 @@ class RetryFilter::LegacyCallData {
void MaybeCancelPerAttemptRecvTimer(); void MaybeCancelPerAttemptRecvTimer();
LegacyCallData* calld_; LegacyCallData* calld_;
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> lb_call_; OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall> lb_call_;
bool lb_call_committed_ = false; bool lb_call_committed_ = false;
grpc_closure on_per_attempt_recv_timer_; grpc_closure on_per_attempt_recv_timer_;
@ -363,7 +363,7 @@ class RetryFilter::LegacyCallData {
void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures); void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
static void StartTransparentRetry(void* arg, grpc_error_handle error); static void StartTransparentRetry(void* arg, grpc_error_handle error);
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(absl::AnyInvocable<void()> on_commit, CreateLoadBalancedCall(absl::AnyInvocable<void()> on_commit,
bool is_transparent_retry); bool is_transparent_retry);
@ -394,7 +394,8 @@ class RetryFilter::LegacyCallData {
// LB call used when we've committed to a call attempt and the retry // LB call used when we've committed to a call attempt and the retry
// state for that attempt is no longer needed. This provides a fast // state for that attempt is no longer needed. This provides a fast
// path for long-running streaming calls that minimizes overhead. // path for long-running streaming calls that minimizes overhead.
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> committed_call_; OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall>
committed_call_;
// When are are not yet fully committed to a particular call (i.e., // When are are not yet fully committed to a particular call (i.e.,
// either we might still retry or we have committed to the call but // either we might still retry or we have committed to the call but

@ -281,8 +281,8 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
if (IsLameChannel(channel_)) { if (IsLameChannel(channel_)) {
*status = absl::UnavailableError("xds client has a lame channel"); *status = absl::UnavailableError("xds client has a lame channel");
} else { } else {
ClientChannel* client_channel = ClientChannelFilter* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(channel_)); ClientChannelFilter::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr); GPR_ASSERT(client_channel != nullptr);
watcher_ = new StateWatcher(std::move(on_connectivity_failure)); watcher_ = new StateWatcher(std::move(on_connectivity_failure));
client_channel->AddConnectivityWatcher( client_channel->AddConnectivityWatcher(
@ -297,8 +297,8 @@ GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() {
void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() { void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() {
if (!IsLameChannel(channel_)) { if (!IsLameChannel(channel_)) {
ClientChannel* client_channel = ClientChannelFilter* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(channel_)); ClientChannelFilter::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr); GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_); client_channel->RemoveConnectivityWatcher(watcher_);
} }

@ -35,20 +35,20 @@ namespace testing {
namespace { namespace {
TEST(MakeSubchannelArgs, UsesChannelDefaultAuthorityByDefault) { TEST(MakeSubchannelArgs, UsesChannelDefaultAuthorityByDefault) {
ChannelArgs args = ClientChannel::MakeSubchannelArgs( ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs(), ChannelArgs(), nullptr, "foo.example.com"); ChannelArgs(), ChannelArgs(), nullptr, "foo.example.com");
EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "foo.example.com"); EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "foo.example.com");
} }
TEST(MakeSubchannelArgs, DefaultAuthorityFromChannelArgs) { TEST(MakeSubchannelArgs, DefaultAuthorityFromChannelArgs) {
ChannelArgs args = ClientChannel::MakeSubchannelArgs( ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"),
ChannelArgs(), nullptr, "foo.example.com"); ChannelArgs(), nullptr, "foo.example.com");
EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "bar.example.com"); EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "bar.example.com");
} }
TEST(MakeSubchannelArgs, DefaultAuthorityFromResolver) { TEST(MakeSubchannelArgs, DefaultAuthorityFromResolver) {
ChannelArgs args = ClientChannel::MakeSubchannelArgs( ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs(), ChannelArgs(),
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), nullptr, ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), nullptr,
"foo.example.com"); "foo.example.com");
@ -57,7 +57,7 @@ TEST(MakeSubchannelArgs, DefaultAuthorityFromResolver) {
TEST(MakeSubchannelArgs, TEST(MakeSubchannelArgs,
DefaultAuthorityFromChannelArgsOverridesValueFromResolver) { DefaultAuthorityFromChannelArgsOverridesValueFromResolver) {
ChannelArgs args = ClientChannel::MakeSubchannelArgs( ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"),
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "baz.example.com"), nullptr, ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "baz.example.com"), nullptr,
"foo.example.com"); "foo.example.com");
@ -65,14 +65,14 @@ TEST(MakeSubchannelArgs,
} }
TEST(MakeSubchannelArgs, ArgsFromChannelTrumpPerAddressArgs) { TEST(MakeSubchannelArgs, ArgsFromChannelTrumpPerAddressArgs) {
ChannelArgs args = ClientChannel::MakeSubchannelArgs( ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs().Set("foo", 1), ChannelArgs().Set("foo", 2), nullptr, ChannelArgs().Set("foo", 1), ChannelArgs().Set("foo", 2), nullptr,
"foo.example.com"); "foo.example.com");
EXPECT_EQ(args.GetInt("foo"), 1); EXPECT_EQ(args.GetInt("foo"), 1);
} }
TEST(MakeSubchannelArgs, StripsOutNoSubchannelArgs) { TEST(MakeSubchannelArgs, StripsOutNoSubchannelArgs) {
ChannelArgs args = ClientChannel::MakeSubchannelArgs( ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "foo", 1), ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "foo", 1),
ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "bar", 1), nullptr, ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "bar", 1), nullptr,
"foo.example.com"); "foo.example.com");

Loading…
Cancel
Save