From d763099c874402721a9eaba08f6290a1bf955709 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 23 Mar 2021 11:47:06 -0700 Subject: [PATCH] Add lock annotations in client_channel filter. (#25777) --- .../filters/client_channel/client_channel.cc | 603 +++++++++--------- 1 file changed, 312 insertions(+), 291 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 7b8a16a2b59..c86ff900577 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -121,18 +121,11 @@ namespace { // ChannelData definition // -class LoadBalancedCall; - class ChannelData { public: - struct ResolverQueuedCall { - grpc_call_element* elem; - ResolverQueuedCall* next = nullptr; - }; - struct LbQueuedCall { - LoadBalancedCall* lb_call; - LbQueuedCall* next = nullptr; - }; + class CallData; + class RetryingCall; + class LoadBalancedCall; static grpc_error* Init(grpc_channel_element* elem, grpc_channel_element_args* args); @@ -142,51 +135,6 @@ class ChannelData { static void GetChannelInfo(grpc_channel_element* elem, const grpc_channel_info* info); - bool deadline_checking_enabled() const { return deadline_checking_enabled_; } - bool enable_retries() const { return enable_retries_; } - size_t per_rpc_retry_buffer_size() const { - return per_rpc_retry_buffer_size_; - } - grpc_channel_stack* owning_stack() const { return owning_stack_; } - - // Note: Does NOT return a new ref. - grpc_error* disconnect_error() const { - return disconnect_error_.Load(MemoryOrder::ACQUIRE); - } - - Mutex* resolution_mu() const { return &resolution_mu_; } - // These methods all require holding resolution_mu_. - void AddResolverQueuedCall(ResolverQueuedCall* call, - grpc_polling_entity* pollent); - void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove, - grpc_polling_entity* pollent); - bool received_service_config_data() const { - return received_service_config_data_; - } - grpc_error* resolver_transient_failure_error() const { - return resolver_transient_failure_error_; - } - RefCountedPtr service_config() const { - return service_config_; - } - ConfigSelector* config_selector() const { return config_selector_.get(); } - RefCountedPtr dynamic_filters() const { - return dynamic_filters_; - } - - Mutex* data_plane_mu() const { return &data_plane_mu_; } - // These methods all require holding data_plane_mu_. - LoadBalancingPolicy::SubchannelPicker* picker() const { - return picker_.get(); - } - void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent); - void RemoveLbQueuedCall(LbQueuedCall* to_remove, - grpc_polling_entity* pollent); - RefCountedPtr GetConnectedSubchannelInDataPlane( - SubchannelInterface* subchannel) const; - - WorkSerializer* work_serializer() const { return work_serializer_.get(); } - grpc_connectivity_state CheckConnectivityState(bool try_to_connect); void AddExternalConnectivityWatcher(grpc_polling_entity pollent, @@ -215,6 +163,7 @@ class ChannelData { AsyncConnectivityStateWatcherInterface* watcher); private: + class DynamicTerminationFilterChannelData; class SubchannelWrapper; class ClientChannelControlHelper; class ConnectivityWatcherAdder; @@ -281,9 +230,23 @@ class ChannelData { ChannelData* chand_; }; + struct ResolverQueuedCall { + grpc_call_element* elem; + ResolverQueuedCall* next = nullptr; + }; + struct LbQueuedCall { + LoadBalancedCall* lb_call; + LbQueuedCall* next = nullptr; + }; + ChannelData(grpc_channel_element_args* args, grpc_error** error); ~ChannelData(); + // Note: Does NOT return a new ref. + grpc_error* disconnect_error() const { + return disconnect_error_.Load(MemoryOrder::ACQUIRE); + } + // Note: All methods with "Locked" suffix must be invoked from within // work_serializer_. @@ -318,6 +281,23 @@ class ChannelData { void TryToConnectLocked(); + // These methods all require holding resolution_mu_. + void AddResolverQueuedCall(ResolverQueuedCall* call, + grpc_polling_entity* pollent) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); + void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove, + grpc_polling_entity* pollent) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); + + // These methods all require holding data_plane_mu_. + void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); + void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); + RefCountedPtr GetConnectedSubchannelInDataPlane( + SubchannelInterface* subchannel) const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); + // // Fields set at construction and never modified. // @@ -337,21 +317,26 @@ class ChannelData { // mutable Mutex resolution_mu_; // Linked list of calls queued waiting for resolver result. - ResolverQueuedCall* resolver_queued_calls_ = nullptr; + ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) = + nullptr; // Data from service config. - grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE; - bool received_service_config_data_ = false; - RefCountedPtr service_config_; - RefCountedPtr config_selector_; - RefCountedPtr dynamic_filters_; + grpc_error* resolver_transient_failure_error_ + ABSL_GUARDED_BY(resolution_mu_) = GRPC_ERROR_NONE; + bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false; + RefCountedPtr service_config_ ABSL_GUARDED_BY(resolution_mu_); + RefCountedPtr config_selector_ + ABSL_GUARDED_BY(resolution_mu_); + RefCountedPtr dynamic_filters_ + ABSL_GUARDED_BY(resolution_mu_); // // Fields used in the data plane. Guarded by data_plane_mu_. // mutable Mutex data_plane_mu_; - std::unique_ptr picker_; + std::unique_ptr picker_ + ABSL_GUARDED_BY(data_plane_mu_); // Linked list of calls queued waiting for LB pick. - LbQueuedCall* lb_queued_calls_ = nullptr; + LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr; // // Fields used in the control plane. Guarded by work_serializer. @@ -390,8 +375,8 @@ class ChannelData { // synchronously via get_channel_info(). // Mutex info_mu_; - UniquePtr info_lb_policy_name_; - UniquePtr info_service_config_json_; + UniquePtr info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_); + UniquePtr info_service_config_json_ ABSL_GUARDED_BY(info_mu_); // // Fields guarded by a mutex, since they need to be accessed @@ -399,14 +384,14 @@ class ChannelData { // mutable Mutex external_watchers_mu_; std::map> - external_watchers_; + external_watchers_ ABSL_GUARDED_BY(external_watchers_mu_); }; // -// CallData definition +// ChannelData::CallData definition // -class CallData { +class ChannelData::CallData { public: static grpc_error* Init(grpc_call_element* elem, const grpc_call_element_args* args); @@ -424,7 +409,8 @@ class CallData { // Returns true if the service config has been applied to the call, in which // case the caller must invoke ResolutionDone() or AsyncResolutionDone() // with the returned error. - bool CheckResolutionLocked(grpc_call_element* elem, grpc_error** error); + bool CheckResolutionLocked(grpc_call_element* elem, grpc_error** error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ChannelData::resolution_mu_); // Schedules a callback to continue processing the call once // resolution is complete. The callback will not run until after this // method returns. @@ -470,16 +456,19 @@ class CallData { // If an error is returned, the error indicates the status with which // the call should be failed. grpc_error* ApplyServiceConfigToCallLocked( - grpc_call_element* elem, grpc_metadata_batch* initial_metadata); + grpc_call_element* elem, grpc_metadata_batch* initial_metadata) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ChannelData::resolution_mu_); // Invoked when the resolver result is applied to the caller, on both // success or failure. static void ResolutionDone(void* arg, grpc_error* error); // Removes the call (if present) from the channel's list of calls queued // for name resolution. - void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem); + void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ChannelData::resolution_mu_); // Adds the call (if not already present) to the channel's list of // calls queued for name resolution. - void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem); + void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ChannelData::resolution_mu_); static void RecvInitialMetadataReadyForConfigSelectorCommitCallback( void* arg, grpc_error* error); @@ -534,10 +523,10 @@ class CallData { }; // -// RetryingCall definition +// ChannelData::RetryingCall definition // -class RetryingCall { +class ChannelData::RetryingCall { public: RetryingCall( ChannelData* chand, const grpc_call_element_args& args, @@ -581,7 +570,7 @@ class RetryingCall { gpr_refcount refs; grpc_call_element* elem; RetryingCall* call; - RefCountedPtr lb_call; + RefCountedPtr lb_call; // The batch to use in the subchannel call. // Its payload field points to SubchannelCallRetryState::batch_payload. grpc_transport_stream_op_batch batch; @@ -840,7 +829,7 @@ class RetryingCall { grpc_closure retry_closure_; - RefCountedPtr lb_call_; + RefCountedPtr lb_call_; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when @@ -897,14 +886,14 @@ class RetryingCall { }; // -// LoadBalancedCall definition +// ChannelData::LoadBalancedCall definition // // This object is ref-counted, but it cannot inherit from RefCounted<>, // because it is allocated on the arena and can't free its memory when // its refcount goes to zero. So instead, it manually implements the // same API as RefCounted<>, so that it can be used with RefCountedPtr<>. -class LoadBalancedCall { +class ChannelData::LoadBalancedCall { public: static RefCountedPtr Create( ChannelData* chand, const grpc_call_element_args& args, @@ -932,7 +921,8 @@ class LoadBalancedCall { // Helper function for performing an LB pick while holding the data plane // mutex. Returns true if the pick is complete, in which case the caller // must invoke PickDone() or AsyncPickDone() with the returned error. - bool PickSubchannelLocked(grpc_error** error); + bool PickSubchannelLocked(grpc_error** error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ChannelData::data_plane_mu_); // Schedules a callback to process the completed pick. The callback // will not run until after this method returns. void AsyncPickDone(grpc_error* error); @@ -990,9 +980,11 @@ class LoadBalancedCall { // Invoked when a pick is completed, on both success or failure. static void PickDone(void* arg, grpc_error* error); // Removes the call from the channel's list of queued picks if present. - void MaybeRemoveCallFromLbQueuedCallsLocked(); + void MaybeRemoveCallFromLbQueuedCallsLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ChannelData::data_plane_mu_); // Adds the call to the channel's list of queued picks if not already present. - void MaybeAddCallToLbQueuedCallsLocked(); + void MaybeAddCallToLbQueuedCallsLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ChannelData::data_plane_mu_); RefCount refs_; @@ -1067,10 +1059,20 @@ const grpc_arg_pointer_vtable kRetryThrottleDataArgPointerVtable = { RetryThrottleDataArgCopy, RetryThrottleDataArgDestroy, RetryThrottleDataArgCmp}; -class DynamicTerminationFilterChannelData { +class ChannelData::DynamicTerminationFilterChannelData { public: + class DynamicTerminationFilterCallData; + + static const grpc_channel_filter kDynamicTerminationFilterVtable; + static grpc_error* Init(grpc_channel_element* elem, - grpc_channel_element_args* args); + grpc_channel_element_args* args) { + GPR_ASSERT(args->is_last); + GPR_ASSERT(elem->filter == &kDynamicTerminationFilterVtable); + new (elem->channel_data) + DynamicTerminationFilterChannelData(args->channel_args); + return GRPC_ERROR_NONE; + } static void Destroy(grpc_channel_element* elem) { auto* chand = @@ -1084,11 +1086,6 @@ class DynamicTerminationFilterChannelData { static void GetChannelInfo(grpc_channel_element* /*elem*/, const grpc_channel_info* /*info*/) {} - ChannelData* chand() const { return chand_; } - RefCountedPtr retry_throttle_data() const { - return retry_throttle_data_; - } - private: static RefCountedPtr GetRetryThrottleDataFromArgs( const grpc_channel_args* args) { @@ -1108,7 +1105,8 @@ class DynamicTerminationFilterChannelData { RefCountedPtr retry_throttle_data_; }; -class DynamicTerminationFilterCallData { +class ChannelData::DynamicTerminationFilterChannelData:: + DynamicTerminationFilterCallData { public: static grpc_error* Init(grpc_call_element* elem, const grpc_call_element_args* args) { @@ -1124,7 +1122,7 @@ class DynamicTerminationFilterCallData { auto* chand = static_cast(elem->channel_data); RefCountedPtr subchannel_call; - if (chand->chand()->enable_retries()) { + if (chand->chand_->enable_retries_) { if (GPR_LIKELY(calld->retrying_call_ != nullptr)) { subchannel_call = calld->retrying_call_->subchannel_call(); calld->retrying_call_->~RetryingCall(); @@ -1149,7 +1147,7 @@ class DynamicTerminationFilterCallData { static_cast(elem->call_data); auto* chand = static_cast(elem->channel_data); - if (chand->chand()->enable_retries()) { + if (chand->chand_->enable_retries_) { calld->retrying_call_->StartTransportStreamOpBatch(batch); } else { calld->lb_call_->StartTransportStreamOpBatch(batch); @@ -1162,13 +1160,13 @@ class DynamicTerminationFilterCallData { static_cast(elem->call_data); auto* chand = static_cast(elem->channel_data); - ChannelData* client_channel = chand->chand(); + ChannelData* client_channel = chand->chand_; grpc_call_element_args args = { calld->owning_call_, nullptr, calld->call_context_, calld->path_, calld->call_start_time_, calld->deadline_, calld->arena_, calld->call_combiner_}; - if (client_channel->enable_retries()) { + if (client_channel->enable_retries_) { // Get retry settings from service config. auto* svc_cfg_call_data = static_cast( calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); @@ -1177,8 +1175,8 @@ class DynamicTerminationFilterCallData { svc_cfg_call_data->GetMethodParsedConfig( ClientChannelServiceConfigParser::ParserIndex())); // Create retrying call. - calld->retrying_call_ = calld->arena_->New( - client_channel, args, pollent, chand->retry_throttle_data(), + calld->retrying_call_ = calld->arena_->New( + client_channel, args, pollent, chand->retry_throttle_data_, method_config == nullptr ? nullptr : method_config->retry_policy()); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log( @@ -1187,8 +1185,8 @@ class DynamicTerminationFilterCallData { client_channel, calld, calld->retrying_call_); } } else { - calld->lb_call_ = - LoadBalancedCall::Create(client_channel, args, pollent, 0); + calld->lb_call_ = ChannelData::LoadBalancedCall::Create(client_channel, + args, pollent, 0); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p dynamic_termination_calld=%p: create lb_call=%p", @@ -1217,33 +1215,30 @@ class DynamicTerminationFilterCallData { CallCombiner* call_combiner_; grpc_call_context_element* call_context_; - RetryingCall* retrying_call_ = nullptr; + ChannelData::RetryingCall* retrying_call_ = nullptr; RefCountedPtr lb_call_; }; -const grpc_channel_filter kDynamicTerminationFilterVtable = { - DynamicTerminationFilterCallData::StartTransportStreamOpBatch, - DynamicTerminationFilterChannelData::StartTransportOp, - sizeof(DynamicTerminationFilterCallData), - DynamicTerminationFilterCallData::Init, - DynamicTerminationFilterCallData::SetPollent, - DynamicTerminationFilterCallData::Destroy, - sizeof(DynamicTerminationFilterChannelData), - DynamicTerminationFilterChannelData::Init, - DynamicTerminationFilterChannelData::Destroy, - DynamicTerminationFilterChannelData::GetChannelInfo, - "dynamic_filter_termination", +const grpc_channel_filter ChannelData::DynamicTerminationFilterChannelData:: + kDynamicTerminationFilterVtable = { + ChannelData::DynamicTerminationFilterChannelData:: + DynamicTerminationFilterCallData::StartTransportStreamOpBatch, + ChannelData::DynamicTerminationFilterChannelData::StartTransportOp, + sizeof(ChannelData::DynamicTerminationFilterChannelData:: + DynamicTerminationFilterCallData), + ChannelData::DynamicTerminationFilterChannelData:: + DynamicTerminationFilterCallData::Init, + ChannelData::DynamicTerminationFilterChannelData:: + DynamicTerminationFilterCallData::SetPollent, + ChannelData::DynamicTerminationFilterChannelData:: + DynamicTerminationFilterCallData::Destroy, + sizeof(ChannelData::DynamicTerminationFilterChannelData), + ChannelData::DynamicTerminationFilterChannelData::Init, + ChannelData::DynamicTerminationFilterChannelData::Destroy, + ChannelData::DynamicTerminationFilterChannelData::GetChannelInfo, + "dynamic_filter_termination", }; -grpc_error* DynamicTerminationFilterChannelData::Init( - grpc_channel_element* elem, grpc_channel_element_args* args) { - GPR_ASSERT(args->is_last); - GPR_ASSERT(elem->filter == &kDynamicTerminationFilterVtable); - new (elem->channel_data) - DynamicTerminationFilterChannelData(args->channel_args); - return GRPC_ERROR_NONE; -} - // // ChannelData::SubchannelWrapper // @@ -2300,7 +2295,8 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() { // Construct dynamic filter stack. std::vector filters = config_selector->GetFilters(); - filters.push_back(&kDynamicTerminationFilterVtable); + filters.push_back( + &DynamicTerminationFilterChannelData::kDynamicTerminationFilterVtable); absl::InlinedVector args_to_add; args_to_add.push_back(grpc_channel_arg_pointer_create( const_cast(GRPC_ARG_CLIENT_CHANNEL_DATA), this, @@ -2462,8 +2458,11 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { if (state_tracker_.state() != GRPC_CHANNEL_READY) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected"); } - LoadBalancingPolicy::PickResult result = - picker_->Pick(LoadBalancingPolicy::PickArgs()); + LoadBalancingPolicy::PickResult result; + { + MutexLock lock(&data_plane_mu_); + result = picker_->Pick(LoadBalancingPolicy::PickArgs()); + } ConnectedSubchannel* connected_subchannel = nullptr; if (result.subchannel != nullptr) { SubchannelWrapper* subchannel = @@ -2636,10 +2635,11 @@ void ChannelData::RemoveConnectivityWatcher( // CallData implementation // -CallData::CallData(grpc_call_element* elem, const ChannelData& chand, - const grpc_call_element_args& args) +ChannelData::CallData::CallData(grpc_call_element* elem, + const ChannelData& chand, + const grpc_call_element_args& args) : deadline_state_(elem, args, - GPR_LIKELY(chand.deadline_checking_enabled()) + GPR_LIKELY(chand.deadline_checking_enabled_) ? args.deadline : GRPC_MILLIS_INF_FUTURE), path_(grpc_slice_ref_internal(args.path)), @@ -2654,7 +2654,7 @@ CallData::CallData(grpc_call_element* elem, const ChannelData& chand, } } -CallData::~CallData() { +ChannelData::CallData::~CallData() { grpc_slice_unref_internal(path_); GRPC_ERROR_UNREF(cancel_error_); // Make sure there are no remaining pending batches. @@ -2663,16 +2663,16 @@ CallData::~CallData() { } } -grpc_error* CallData::Init(grpc_call_element* elem, - const grpc_call_element_args* args) { +grpc_error* ChannelData::CallData::Init(grpc_call_element* elem, + const grpc_call_element_args* args) { ChannelData* chand = static_cast(elem->channel_data); new (elem->call_data) CallData(elem, *chand, *args); return GRPC_ERROR_NONE; } -void CallData::Destroy(grpc_call_element* elem, - const grpc_call_final_info* /*final_info*/, - grpc_closure* then_schedule_closure) { +void ChannelData::CallData::Destroy(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* then_schedule_closure) { CallData* calld = static_cast(elem->call_data); RefCountedPtr dynamic_call = std::move(calld->dynamic_call_); @@ -2685,12 +2685,12 @@ void CallData::Destroy(grpc_call_element* elem, } } -void CallData::StartTransportStreamOpBatch( +void ChannelData::CallData::StartTransportStreamOpBatch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0); CallData* calld = static_cast(elem->call_data); ChannelData* chand = static_cast(elem->channel_data); - if (GPR_LIKELY(chand->deadline_checking_enabled())) { + if (GPR_LIKELY(chand->deadline_checking_enabled_)) { grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); } // Intercept recv_initial_metadata for config selector on-committed callback. @@ -2774,8 +2774,8 @@ void CallData::StartTransportStreamOpBatch( } } -void CallData::SetPollent(grpc_call_element* elem, - grpc_polling_entity* pollent) { +void ChannelData::CallData::SetPollent(grpc_call_element* elem, + grpc_polling_entity* pollent) { CallData* calld = static_cast(elem->call_data); calld->pollent_ = pollent; } @@ -2784,7 +2784,8 @@ void CallData::SetPollent(grpc_call_element* elem, // pending_batches management // -size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) { +size_t ChannelData::CallData::GetBatchIndex( + grpc_transport_stream_op_batch* batch) { // Note: It is important the send_initial_metadata be the first entry // here, since the code in pick_subchannel_locked() assumes it will be. if (batch->send_initial_metadata) return 0; @@ -2797,8 +2798,8 @@ size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) { } // This is called via the call combiner, so access to calld is synchronized. -void CallData::PendingBatchesAdd(grpc_call_element* elem, - grpc_transport_stream_op_batch* batch) { +void ChannelData::CallData::PendingBatchesAdd( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { ChannelData* chand = static_cast(elem->channel_data); const size_t idx = GetBatchIndex(batch); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -2812,7 +2813,8 @@ void CallData::PendingBatchesAdd(grpc_call_element* elem, } // This is called via the call combiner, so access to calld is synchronized. -void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) { +void ChannelData::CallData::FailPendingBatchInCallCombiner(void* arg, + grpc_error* error) { grpc_transport_stream_op_batch* batch = static_cast(arg); CallData* calld = static_cast(batch->handler_private.extra_arg); @@ -2822,7 +2824,7 @@ void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) { } // This is called via the call combiner, so access to calld is synchronized. -void CallData::PendingBatchesFail( +void ChannelData::CallData::PendingBatchesFail( grpc_call_element* elem, grpc_error* error, YieldCallCombinerPredicate yield_call_combiner_predicate) { GPR_ASSERT(error != GRPC_ERROR_NONE); @@ -2857,8 +2859,8 @@ void CallData::PendingBatchesFail( } // This is called via the call combiner, so access to calld is synchronized. -void CallData::ResumePendingBatchInCallCombiner(void* arg, - grpc_error* /*ignored*/) { +void ChannelData::CallData::ResumePendingBatchInCallCombiner( + void* arg, grpc_error* /*ignored*/) { grpc_transport_stream_op_batch* batch = static_cast(arg); auto* elem = @@ -2869,7 +2871,7 @@ void CallData::ResumePendingBatchInCallCombiner(void* arg, } // This is called via the call combiner, so access to calld is synchronized. -void CallData::PendingBatchesResume(grpc_call_element* elem) { +void ChannelData::CallData::PendingBatchesResume(grpc_call_element* elem) { ChannelData* chand = static_cast(elem->channel_data); // Retries not enabled; send down batches as-is. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -2904,7 +2906,7 @@ void CallData::PendingBatchesResume(grpc_call_element* elem) { // A class to handle the call combiner cancellation callback for a // queued pick. -class CallData::ResolverQueuedCallCanceller { +class ChannelData::CallData::ResolverQueuedCallCanceller { public: explicit ResolverQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) { auto* calld = static_cast(elem->call_data); @@ -2920,7 +2922,7 @@ class CallData::ResolverQueuedCallCanceller { auto* chand = static_cast(self->elem_->channel_data); auto* calld = static_cast(self->elem_->call_data); { - MutexLock lock(chand->resolution_mu()); + MutexLock lock(&chand->resolution_mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling resolver queued pick: " @@ -2944,7 +2946,7 @@ class CallData::ResolverQueuedCallCanceller { grpc_closure closure_; }; -void CallData::MaybeRemoveCallFromResolverQueuedCallsLocked( +void ChannelData::CallData::MaybeRemoveCallFromResolverQueuedCallsLocked( grpc_call_element* elem) { if (!queued_pending_resolver_result_) return; auto* chand = static_cast(elem->channel_data); @@ -2959,7 +2961,7 @@ void CallData::MaybeRemoveCallFromResolverQueuedCallsLocked( resolver_call_canceller_ = nullptr; } -void CallData::MaybeAddCallToResolverQueuedCallsLocked( +void ChannelData::CallData::MaybeAddCallToResolverQueuedCallsLocked( grpc_call_element* elem) { if (queued_pending_resolver_result_) return; auto* chand = static_cast(elem->channel_data); @@ -2974,14 +2976,14 @@ void CallData::MaybeAddCallToResolverQueuedCallsLocked( resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem); } -grpc_error* CallData::ApplyServiceConfigToCallLocked( +grpc_error* ChannelData::CallData::ApplyServiceConfigToCallLocked( grpc_call_element* elem, grpc_metadata_batch* initial_metadata) { ChannelData* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", chand, this); } - ConfigSelector* config_selector = chand->config_selector(); + ConfigSelector* config_selector = chand->config_selector_.get(); if (config_selector != nullptr) { // Use the ConfigSelector to determine the config for the call. ConfigSelector::CallConfig call_config = @@ -3003,7 +3005,7 @@ grpc_error* CallData::ApplyServiceConfigToCallLocked( if (method_params != nullptr) { // If the deadline from the service config is shorter than the one // from the client API, reset the deadline timer. - if (chand->deadline_checking_enabled() && method_params->timeout() != 0) { + if (chand->deadline_checking_enabled_ && method_params->timeout() != 0) { const grpc_millis per_method_deadline = grpc_cycle_counter_to_millis_round_up(call_start_time_) + method_params->timeout(); @@ -3028,13 +3030,14 @@ grpc_error* CallData::ApplyServiceConfigToCallLocked( } } // Set the dynamic filter stack. - dynamic_filters_ = chand->dynamic_filters(); + dynamic_filters_ = chand->dynamic_filters_; } return GRPC_ERROR_NONE; } -void CallData::RecvInitialMetadataReadyForConfigSelectorCommitCallback( - void* arg, grpc_error* error) { +void ChannelData::CallData:: + RecvInitialMetadataReadyForConfigSelectorCommitCallback(void* arg, + grpc_error* error) { auto* self = static_cast(arg); if (self->on_call_committed_ != nullptr) { self->on_call_committed_(); @@ -3047,8 +3050,9 @@ void CallData::RecvInitialMetadataReadyForConfigSelectorCommitCallback( // TODO(roth): Consider not intercepting this callback unless we // actually need to, if this causes a performance problem. -void CallData::InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback( - grpc_transport_stream_op_batch* batch) { +void ChannelData::CallData:: + InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback( + grpc_transport_stream_op_batch* batch) { original_recv_initial_metadata_ready_ = batch->payload->recv_initial_metadata.recv_initial_metadata_ready; GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, @@ -3058,12 +3062,13 @@ void CallData::InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback( &recv_initial_metadata_ready_; } -void CallData::AsyncResolutionDone(grpc_call_element* elem, grpc_error* error) { +void ChannelData::CallData::AsyncResolutionDone(grpc_call_element* elem, + grpc_error* error) { GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr); ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); } -void CallData::ResolutionDone(void* arg, grpc_error* error) { +void ChannelData::CallData::ResolutionDone(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); ChannelData* chand = static_cast(elem->channel_data); CallData* calld = static_cast(elem->call_data); @@ -3079,13 +3084,13 @@ void CallData::ResolutionDone(void* arg, grpc_error* error) { calld->CreateDynamicCall(elem); } -void CallData::CheckResolution(void* arg, grpc_error* error) { +void ChannelData::CallData::CheckResolution(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); CallData* calld = static_cast(elem->call_data); ChannelData* chand = static_cast(elem->channel_data); bool resolution_complete; { - MutexLock lock(chand->resolution_mu()); + MutexLock lock(&chand->resolution_mu_); resolution_complete = calld->CheckResolutionLocked(elem, &error); } if (resolution_complete) { @@ -3094,8 +3099,8 @@ void CallData::CheckResolution(void* arg, grpc_error* error) { } } -bool CallData::CheckResolutionLocked(grpc_call_element* elem, - grpc_error** error) { +bool ChannelData::CallData::CheckResolutionLocked(grpc_call_element* elem, + grpc_error** error) { ChannelData* chand = static_cast(elem->channel_data); // If we're still in IDLE, we need to start resolving. if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) { @@ -3103,16 +3108,16 @@ bool CallData::CheckResolutionLocked(grpc_call_element* elem, // in case we are still in IDLE state. Since we are holding on to the // resolution mutex here, we offload it on the ExecCtx so that we don't // deadlock with ourselves. - GRPC_CHANNEL_STACK_REF(chand->owning_stack(), "CheckResolutionLocked"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "CheckResolutionLocked"); ExecCtx::Run( DEBUG_LOCATION, GRPC_CLOSURE_CREATE( [](void* arg, grpc_error* /*error*/) { auto* chand = static_cast(arg); - chand->work_serializer()->Run( + chand->work_serializer_->Run( [chand]() { chand->CheckConnectivityState(/*try_to_connect=*/true); - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack(), + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "CheckResolutionLocked"); }, DEBUG_LOCATION); @@ -3129,10 +3134,10 @@ bool CallData::CheckResolutionLocked(grpc_call_element* elem, send_initial_metadata.send_initial_metadata_flags; // If we don't yet have a resolver result, we need to queue the call // until we get one. - if (GPR_UNLIKELY(!chand->received_service_config_data())) { + if (GPR_UNLIKELY(!chand->received_service_config_data_)) { // If the resolver returned transient failure before returning the // first service config, fail any non-wait_for_ready calls. - grpc_error* resolver_error = chand->resolver_transient_failure_error(); + grpc_error* resolver_error = chand->resolver_transient_failure_error_; if (resolver_error != GRPC_ERROR_NONE && (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { @@ -3155,7 +3160,7 @@ bool CallData::CheckResolutionLocked(grpc_call_element* elem, return true; } -void CallData::CreateDynamicCall(grpc_call_element* elem) { +void ChannelData::CallData::CreateDynamicCall(grpc_call_element* elem) { auto* chand = static_cast(elem->channel_data); DynamicFilters::Call::Args args = {std::move(dynamic_filters_), pollent_, @@ -3225,7 +3230,7 @@ void CallData::CreateDynamicCall(grpc_call_element* elem) { // (census filter is on top of this one) // - add census stats for retries -RetryingCall::RetryingCall( +ChannelData::RetryingCall::RetryingCall( ChannelData* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, RefCountedPtr retry_throttle_data, @@ -3258,7 +3263,7 @@ RetryingCall::RetryingCall( retry_committed_(false), last_attempt_got_server_pushback_(false) {} -RetryingCall::~RetryingCall() { +ChannelData::RetryingCall::~RetryingCall() { grpc_slice_unref_internal(path_); GRPC_ERROR_UNREF(cancel_error_); // Make sure there are no remaining pending batches. @@ -3267,7 +3272,7 @@ RetryingCall::~RetryingCall() { } } -void RetryingCall::StartTransportStreamOpBatch( +void ChannelData::RetryingCall::StartTransportStreamOpBatch( grpc_transport_stream_op_batch* batch) { // If we've previously been cancelled, immediately fail any new batches. if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) { @@ -3333,7 +3338,8 @@ void RetryingCall::StartTransportStreamOpBatch( PendingBatchesResume(); } -RefCountedPtr RetryingCall::subchannel_call() const { +RefCountedPtr ChannelData::RetryingCall::subchannel_call() + const { if (lb_call_ == nullptr) return nullptr; return lb_call_->subchannel_call(); } @@ -3342,7 +3348,8 @@ RefCountedPtr RetryingCall::subchannel_call() const { // send op data caching // -void RetryingCall::MaybeCacheSendOpsForBatch(PendingBatch* pending) { +void ChannelData::RetryingCall::MaybeCacheSendOpsForBatch( + PendingBatch* pending) { if (pending->send_ops_cached) return; pending->send_ops_cached = true; grpc_transport_stream_op_batch* batch = pending->batch; @@ -3381,7 +3388,7 @@ void RetryingCall::MaybeCacheSendOpsForBatch(PendingBatch* pending) { } } -void RetryingCall::FreeCachedSendInitialMetadata() { +void ChannelData::RetryingCall::FreeCachedSendInitialMetadata() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p retrying_call=%p: destroying send_initial_metadata", @@ -3390,7 +3397,7 @@ void RetryingCall::FreeCachedSendInitialMetadata() { grpc_metadata_batch_destroy(&send_initial_metadata_); } -void RetryingCall::FreeCachedSendMessage(size_t idx) { +void ChannelData::RetryingCall::FreeCachedSendMessage(size_t idx) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p retrying_call=%p: destroying send_messages[%" PRIuPTR "]", @@ -3399,7 +3406,7 @@ void RetryingCall::FreeCachedSendMessage(size_t idx) { send_messages_[idx]->Destroy(); } -void RetryingCall::FreeCachedSendTrailingMetadata() { +void ChannelData::RetryingCall::FreeCachedSendTrailingMetadata() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand_=%p retrying_call=%p: destroying send_trailing_metadata", @@ -3408,7 +3415,7 @@ void RetryingCall::FreeCachedSendTrailingMetadata() { grpc_metadata_batch_destroy(&send_trailing_metadata_); } -void RetryingCall::FreeCachedSendOpDataAfterCommit( +void ChannelData::RetryingCall::FreeCachedSendOpDataAfterCommit( SubchannelCallRetryState* retry_state) { if (retry_state->completed_send_initial_metadata) { FreeCachedSendInitialMetadata(); @@ -3421,7 +3428,7 @@ void RetryingCall::FreeCachedSendOpDataAfterCommit( } } -void RetryingCall::FreeCachedSendOpDataForCompletedBatch( +void ChannelData::RetryingCall::FreeCachedSendOpDataForCompletedBatch( SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state) { if (batch_data->batch.send_initial_metadata) { @@ -3439,7 +3446,8 @@ void RetryingCall::FreeCachedSendOpDataForCompletedBatch( // pending_batches management // -size_t RetryingCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) { +size_t ChannelData::RetryingCall::GetBatchIndex( + grpc_transport_stream_op_batch* batch) { // Note: It is important the send_initial_metadata be the first entry // here, since the code in pick_subchannel_locked() assumes it will be. if (batch->send_initial_metadata) return 0; @@ -3452,7 +3460,8 @@ size_t RetryingCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) { } // This is called via the call combiner, so access to calld is synchronized. -void RetryingCall::PendingBatchesAdd(grpc_transport_stream_op_batch* batch) { +void ChannelData::RetryingCall::PendingBatchesAdd( + grpc_transport_stream_op_batch* batch) { const size_t idx = GetBatchIndex(batch); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log( @@ -3483,7 +3492,7 @@ void RetryingCall::PendingBatchesAdd(grpc_transport_stream_op_batch* batch) { pending_send_trailing_metadata_ = true; } if (GPR_UNLIKELY(bytes_buffered_for_retry_ > - chand_->per_rpc_retry_buffer_size())) { + chand_->per_rpc_retry_buffer_size_)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p retrying_call=%p: exceeded retry buffer size, " @@ -3511,7 +3520,7 @@ void RetryingCall::PendingBatchesAdd(grpc_transport_stream_op_batch* batch) { } } -void RetryingCall::PendingBatchClear(PendingBatch* pending) { +void ChannelData::RetryingCall::PendingBatchClear(PendingBatch* pending) { if (enable_retries_) { if (pending->batch->send_initial_metadata) { pending_send_initial_metadata_ = false; @@ -3526,7 +3535,7 @@ void RetryingCall::PendingBatchClear(PendingBatch* pending) { pending->batch = nullptr; } -void RetryingCall::MaybeClearPendingBatch(PendingBatch* pending) { +void ChannelData::RetryingCall::MaybeClearPendingBatch(PendingBatch* pending) { grpc_transport_stream_op_batch* batch = pending->batch; // We clear the pending batch if all of its callbacks have been // scheduled and reset to nullptr. @@ -3548,8 +3557,8 @@ void RetryingCall::MaybeClearPendingBatch(PendingBatch* pending) { } // This is called via the call combiner, so access to calld is synchronized. -void RetryingCall::FailPendingBatchInCallCombiner(void* arg, - grpc_error* error) { +void ChannelData::RetryingCall::FailPendingBatchInCallCombiner( + void* arg, grpc_error* error) { grpc_transport_stream_op_batch* batch = static_cast(arg); RetryingCall* call = @@ -3560,7 +3569,7 @@ void RetryingCall::FailPendingBatchInCallCombiner(void* arg, } // This is called via the call combiner, so access to calld is synchronized. -void RetryingCall::PendingBatchesFail( +void ChannelData::RetryingCall::PendingBatchesFail( grpc_error* error, YieldCallCombinerPredicate yield_call_combiner_predicate) { GPR_ASSERT(error != GRPC_ERROR_NONE); @@ -3597,18 +3606,18 @@ void RetryingCall::PendingBatchesFail( } // This is called via the call combiner, so access to calld is synchronized. -void RetryingCall::ResumePendingBatchInCallCombiner(void* arg, - grpc_error* /*ignored*/) { +void ChannelData::RetryingCall::ResumePendingBatchInCallCombiner( + void* arg, grpc_error* /*ignored*/) { grpc_transport_stream_op_batch* batch = static_cast(arg); - auto* lb_call = - static_cast(batch->handler_private.extra_arg); + auto* lb_call = static_cast( + batch->handler_private.extra_arg); // Note: This will release the call combiner. lb_call->StartTransportStreamOpBatch(batch); } // This is called via the call combiner, so access to calld is synchronized. -void RetryingCall::PendingBatchesResume() { +void ChannelData::RetryingCall::PendingBatchesResume() { if (enable_retries_) { StartRetriableSubchannelBatches(this, GRPC_ERROR_NONE); return; @@ -3642,8 +3651,9 @@ void RetryingCall::PendingBatchesResume() { } template -RetryingCall::PendingBatch* RetryingCall::PendingBatchFind( - const char* log_message, Predicate predicate) { +ChannelData::RetryingCall::PendingBatch* +ChannelData::RetryingCall::PendingBatchFind(const char* log_message, + Predicate predicate) { for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { PendingBatch* pending = &pending_batches_[i]; grpc_transport_stream_op_batch* batch = pending->batch; @@ -3664,7 +3674,8 @@ RetryingCall::PendingBatch* RetryingCall::PendingBatchFind( // retry code // -void RetryingCall::RetryCommit(SubchannelCallRetryState* retry_state) { +void ChannelData::RetryingCall::RetryCommit( + SubchannelCallRetryState* retry_state) { if (retry_committed_) return; retry_committed_ = true; if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -3676,8 +3687,8 @@ void RetryingCall::RetryCommit(SubchannelCallRetryState* retry_state) { } } -void RetryingCall::DoRetry(SubchannelCallRetryState* retry_state, - grpc_millis server_pushback_ms) { +void ChannelData::RetryingCall::DoRetry(SubchannelCallRetryState* retry_state, + grpc_millis server_pushback_ms) { GPR_ASSERT(retry_policy_ != nullptr); // Reset LB call. lb_call_.reset(); @@ -3704,9 +3715,9 @@ void RetryingCall::DoRetry(SubchannelCallRetryState* retry_state, if (retry_state != nullptr) retry_state->retry_dispatched = true; } -bool RetryingCall::MaybeRetry(SubchannelCallBatchData* batch_data, - grpc_status_code status, - grpc_mdelem* server_pushback_md) { +bool ChannelData::RetryingCall::MaybeRetry(SubchannelCallBatchData* batch_data, + grpc_status_code status, + grpc_mdelem* server_pushback_md) { // Get retry policy. if (retry_policy_ == nullptr) return false; // If we've already dispatched a retry from this call, return true. @@ -3814,17 +3825,17 @@ bool RetryingCall::MaybeRetry(SubchannelCallBatchData* batch_data, } // -// RetryingCall::SubchannelCallBatchData +// ChannelData::RetryingCall::SubchannelCallBatchData // -RetryingCall::SubchannelCallBatchData* -RetryingCall::SubchannelCallBatchData::Create(RetryingCall* call, int refcount, - bool set_on_complete) { +ChannelData::RetryingCall::SubchannelCallBatchData* +ChannelData::RetryingCall::SubchannelCallBatchData::Create( + RetryingCall* call, int refcount, bool set_on_complete) { return call->arena_->New(call, refcount, set_on_complete); } -RetryingCall::SubchannelCallBatchData::SubchannelCallBatchData( +ChannelData::RetryingCall::SubchannelCallBatchData::SubchannelCallBatchData( RetryingCall* call, int refcount, bool set_on_complete) : call(call), lb_call(call->lb_call_) { SubchannelCallRetryState* retry_state = @@ -3832,14 +3843,14 @@ RetryingCall::SubchannelCallBatchData::SubchannelCallBatchData( batch.payload = &retry_state->batch_payload; gpr_ref_init(&refs, refcount); if (set_on_complete) { - GRPC_CLOSURE_INIT(&on_complete, RetryingCall::OnComplete, this, + GRPC_CLOSURE_INIT(&on_complete, ChannelData::RetryingCall::OnComplete, this, grpc_schedule_on_exec_ctx); batch.on_complete = &on_complete; } GRPC_CALL_STACK_REF(call->owning_call_, "batch_data"); } -void RetryingCall::SubchannelCallBatchData::Destroy() { +void ChannelData::RetryingCall::SubchannelCallBatchData::Destroy() { SubchannelCallRetryState* retry_state = static_cast(lb_call->GetParentData()); if (batch.send_initial_metadata) { @@ -3862,8 +3873,8 @@ void RetryingCall::SubchannelCallBatchData::Destroy() { // recv_initial_metadata callback handling // -void RetryingCall::InvokeRecvInitialMetadataCallback(void* arg, - grpc_error* error) { +void ChannelData::RetryingCall::InvokeRecvInitialMetadataCallback( + void* arg, grpc_error* error) { SubchannelCallBatchData* batch_data = static_cast(arg); // Find pending batch. @@ -3897,7 +3908,8 @@ void RetryingCall::InvokeRecvInitialMetadataCallback(void* arg, GRPC_ERROR_REF(error)); } -void RetryingCall::RecvInitialMetadataReady(void* arg, grpc_error* error) { +void ChannelData::RetryingCall::RecvInitialMetadataReady(void* arg, + grpc_error* error) { SubchannelCallBatchData* batch_data = static_cast(arg); RetryingCall* call = batch_data->call; @@ -3957,7 +3969,8 @@ void RetryingCall::RecvInitialMetadataReady(void* arg, grpc_error* error) { // recv_message callback handling // -void RetryingCall::InvokeRecvMessageCallback(void* arg, grpc_error* error) { +void ChannelData::RetryingCall::InvokeRecvMessageCallback(void* arg, + grpc_error* error) { SubchannelCallBatchData* batch_data = static_cast(arg); RetryingCall* call = batch_data->call; @@ -3987,7 +4000,7 @@ void RetryingCall::InvokeRecvMessageCallback(void* arg, grpc_error* error) { Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error)); } -void RetryingCall::RecvMessageReady(void* arg, grpc_error* error) { +void ChannelData::RetryingCall::RecvMessageReady(void* arg, grpc_error* error) { SubchannelCallBatchData* batch_data = static_cast(arg); RetryingCall* call = batch_data->call; @@ -4043,9 +4056,9 @@ void RetryingCall::RecvMessageReady(void* arg, grpc_error* error) { // recv_trailing_metadata handling // -void RetryingCall::GetCallStatus(grpc_metadata_batch* md_batch, - grpc_error* error, grpc_status_code* status, - grpc_mdelem** server_pushback_md) { +void ChannelData::RetryingCall::GetCallStatus( + grpc_metadata_batch* md_batch, grpc_error* error, grpc_status_code* status, + grpc_mdelem** server_pushback_md) { if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr); } else { @@ -4060,7 +4073,7 @@ void RetryingCall::GetCallStatus(grpc_metadata_batch* md_batch, GRPC_ERROR_UNREF(error); } -void RetryingCall::AddClosureForRecvTrailingMetadataReady( +void ChannelData::RetryingCall::AddClosureForRecvTrailingMetadataReady( SubchannelCallBatchData* batch_data, grpc_error* error, CallCombinerClosureList* closures) { // Find pending batch. @@ -4094,7 +4107,7 @@ void RetryingCall::AddClosureForRecvTrailingMetadataReady( MaybeClearPendingBatch(pending); } -void RetryingCall::AddClosuresForDeferredRecvCallbacks( +void ChannelData::RetryingCall::AddClosuresForDeferredRecvCallbacks( SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) { if (batch_data->batch.recv_trailing_metadata) { @@ -4125,7 +4138,7 @@ void RetryingCall::AddClosuresForDeferredRecvCallbacks( } } -bool RetryingCall::PendingBatchIsUnstarted( +bool ChannelData::RetryingCall::PendingBatchIsUnstarted( PendingBatch* pending, SubchannelCallRetryState* retry_state) { if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { return false; @@ -4145,7 +4158,7 @@ bool RetryingCall::PendingBatchIsUnstarted( return false; } -void RetryingCall::AddClosuresToFailUnstartedPendingBatches( +void ChannelData::RetryingCall::AddClosuresToFailUnstartedPendingBatches( SubchannelCallRetryState* retry_state, grpc_error* error, CallCombinerClosureList* closures) { for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { @@ -4167,7 +4180,7 @@ void RetryingCall::AddClosuresToFailUnstartedPendingBatches( GRPC_ERROR_UNREF(error); } -void RetryingCall::RunClosuresForCompletedCall( +void ChannelData::RetryingCall::RunClosuresForCompletedCall( SubchannelCallBatchData* batch_data, grpc_error* error) { SubchannelCallRetryState* retry_state = static_cast( @@ -4191,7 +4204,8 @@ void RetryingCall::RunClosuresForCompletedCall( GRPC_ERROR_UNREF(error); } -void RetryingCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) { +void ChannelData::RetryingCall::RecvTrailingMetadataReady(void* arg, + grpc_error* error) { SubchannelCallBatchData* batch_data = static_cast(arg); RetryingCall* call = batch_data->call; @@ -4241,7 +4255,7 @@ void RetryingCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) { // on_complete callback handling // -void RetryingCall::AddClosuresForCompletedPendingBatch( +void ChannelData::RetryingCall::AddClosuresForCompletedPendingBatch( SubchannelCallBatchData* batch_data, grpc_error* error, CallCombinerClosureList* closures) { PendingBatch* pending = PendingBatchFind( @@ -4268,7 +4282,7 @@ void RetryingCall::AddClosuresForCompletedPendingBatch( MaybeClearPendingBatch(pending); } -void RetryingCall::AddClosuresForReplayOrPendingSendOps( +void ChannelData::RetryingCall::AddClosuresForReplayOrPendingSendOps( SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) { bool have_pending_send_message_ops = @@ -4303,7 +4317,7 @@ void RetryingCall::AddClosuresForReplayOrPendingSendOps( } } -void RetryingCall::OnComplete(void* arg, grpc_error* error) { +void ChannelData::RetryingCall::OnComplete(void* arg, grpc_error* error) { SubchannelCallBatchData* batch_data = static_cast(arg); RetryingCall* call = batch_data->call; @@ -4367,17 +4381,17 @@ void RetryingCall::OnComplete(void* arg, grpc_error* error) { // subchannel batch construction // -void RetryingCall::StartBatchInCallCombiner(void* arg, - grpc_error* /*ignored*/) { +void ChannelData::RetryingCall::StartBatchInCallCombiner( + void* arg, grpc_error* /*ignored*/) { grpc_transport_stream_op_batch* batch = static_cast(arg); - auto* lb_call = - static_cast(batch->handler_private.extra_arg); + auto* lb_call = static_cast( + batch->handler_private.extra_arg); // Note: This will release the call combiner. lb_call->StartTransportStreamOpBatch(batch); } -void RetryingCall::AddClosureForSubchannelBatch( +void ChannelData::RetryingCall::AddClosureForSubchannelBatch( grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) { batch->handler_private.extra_arg = lb_call_.get(); GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, @@ -4391,7 +4405,7 @@ void RetryingCall::AddClosureForSubchannelBatch( "start_subchannel_batch"); } -void RetryingCall::AddRetriableSendInitialMetadataOp( +void ChannelData::RetryingCall::AddRetriableSendInitialMetadataOp( SubchannelCallRetryState* retry_state, SubchannelCallBatchData* batch_data) { // Maps the number of retries to the corresponding metadata value slice. @@ -4439,7 +4453,7 @@ void RetryingCall::AddRetriableSendInitialMetadataOp( batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_; } -void RetryingCall::AddRetriableSendMessageOp( +void ChannelData::RetryingCall::AddRetriableSendMessageOp( SubchannelCallRetryState* retry_state, SubchannelCallBatchData* batch_data) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -4457,7 +4471,7 @@ void RetryingCall::AddRetriableSendMessageOp( retry_state->send_message.get()); } -void RetryingCall::AddRetriableSendTrailingMetadataOp( +void ChannelData::RetryingCall::AddRetriableSendTrailingMetadataOp( SubchannelCallRetryState* retry_state, SubchannelCallBatchData* batch_data) { // We need to make a copy of the metadata batch for each attempt, since @@ -4475,7 +4489,7 @@ void RetryingCall::AddRetriableSendTrailingMetadataOp( &retry_state->send_trailing_metadata; } -void RetryingCall::AddRetriableRecvInitialMetadataOp( +void ChannelData::RetryingCall::AddRetriableRecvInitialMetadataOp( SubchannelCallRetryState* retry_state, SubchannelCallBatchData* batch_data) { retry_state->started_recv_initial_metadata = true; @@ -4492,7 +4506,7 @@ void RetryingCall::AddRetriableRecvInitialMetadataOp( &retry_state->recv_initial_metadata_ready; } -void RetryingCall::AddRetriableRecvMessageOp( +void ChannelData::RetryingCall::AddRetriableRecvMessageOp( SubchannelCallRetryState* retry_state, SubchannelCallBatchData* batch_data) { ++retry_state->started_recv_message_count; @@ -4505,7 +4519,7 @@ void RetryingCall::AddRetriableRecvMessageOp( &retry_state->recv_message_ready; } -void RetryingCall::AddRetriableRecvTrailingMetadataOp( +void ChannelData::RetryingCall::AddRetriableRecvTrailingMetadataOp( SubchannelCallRetryState* retry_state, SubchannelCallBatchData* batch_data) { retry_state->started_recv_trailing_metadata = true; @@ -4523,7 +4537,7 @@ void RetryingCall::AddRetriableRecvTrailingMetadataOp( &retry_state->recv_trailing_metadata_ready; } -void RetryingCall::StartInternalRecvTrailingMetadata() { +void ChannelData::RetryingCall::StartInternalRecvTrailingMetadata() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log( GPR_INFO, @@ -4548,8 +4562,8 @@ void RetryingCall::StartInternalRecvTrailingMetadata() { // If there are any cached send ops that need to be replayed on the // current subchannel call, creates and returns a new subchannel batch // to replay those ops. Otherwise, returns nullptr. -RetryingCall::SubchannelCallBatchData* -RetryingCall::MaybeCreateSubchannelBatchForReplay( +ChannelData::RetryingCall::SubchannelCallBatchData* +ChannelData::RetryingCall::MaybeCreateSubchannelBatchForReplay( SubchannelCallRetryState* retry_state) { SubchannelCallBatchData* replay_batch_data = nullptr; // send_initial_metadata. @@ -4607,7 +4621,7 @@ RetryingCall::MaybeCreateSubchannelBatchForReplay( return replay_batch_data; } -void RetryingCall::AddSubchannelBatchesForPendingBatches( +void ChannelData::RetryingCall::AddSubchannelBatchesForPendingBatches( SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) { for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { PendingBatch* pending = &pending_batches_[i]; @@ -4731,8 +4745,8 @@ void RetryingCall::AddSubchannelBatchesForPendingBatches( } } -void RetryingCall::StartRetriableSubchannelBatches(void* arg, - grpc_error* /*ignored*/) { +void ChannelData::RetryingCall::StartRetriableSubchannelBatches( + void* arg, grpc_error* /*ignored*/) { RetryingCall* call = static_cast(arg); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, @@ -4768,7 +4782,7 @@ void RetryingCall::StartRetriableSubchannelBatches(void* arg, closures.RunClosures(call->call_combiner_); } -void RetryingCall::CreateLbCall(void* arg, grpc_error* /*error*/) { +void ChannelData::RetryingCall::CreateLbCall(void* arg, grpc_error* /*error*/) { auto* call = static_cast(arg); const size_t parent_data_size = call->enable_retries_ ? sizeof(SubchannelCallRetryState) : 0; @@ -4776,8 +4790,8 @@ void RetryingCall::CreateLbCall(void* arg, grpc_error* /*error*/) { call->call_context_, call->path_, call->call_start_time_, call->deadline_, call->arena_, call->call_combiner_}; - call->lb_call_ = LoadBalancedCall::Create(call->chand_, args, call->pollent_, - parent_data_size); + call->lb_call_ = ChannelData::LoadBalancedCall::Create( + call->chand_, args, call->pollent_, parent_data_size); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p retrying_call=%p: create lb_call=%p", call->chand_, call, call->lb_call_.get()); @@ -4790,10 +4804,10 @@ void RetryingCall::CreateLbCall(void* arg, grpc_error* /*error*/) { } // -// LoadBalancedCall::Metadata +// ChannelData::LoadBalancedCall::Metadata // -class LoadBalancedCall::Metadata +class ChannelData::LoadBalancedCall::Metadata : public LoadBalancingPolicy::MetadataInterface { public: Metadata(LoadBalancedCall* lb_call, grpc_metadata_batch* batch) @@ -4856,10 +4870,11 @@ class LoadBalancedCall::Metadata }; // -// LoadBalancedCall::LbCallState +// ChannelData::LoadBalancedCall::LbCallState // -class LoadBalancedCall::LbCallState : public LoadBalancingPolicy::CallState { +class ChannelData::LoadBalancedCall::LbCallState + : public LoadBalancingPolicy::CallState { public: explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {} @@ -4895,9 +4910,11 @@ class LoadBalancedCall::LbCallState : public LoadBalancingPolicy::CallState { // LoadBalancedCall // -RefCountedPtr LoadBalancedCall::Create( - ChannelData* chand, const grpc_call_element_args& args, - grpc_polling_entity* pollent, size_t parent_data_size) { +RefCountedPtr +ChannelData::LoadBalancedCall::Create(ChannelData* chand, + const grpc_call_element_args& args, + grpc_polling_entity* pollent, + size_t parent_data_size) { const size_t alloc_size = parent_data_size > 0 ? (GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall)) + @@ -4908,9 +4925,9 @@ RefCountedPtr LoadBalancedCall::Create( return lb_call; } -LoadBalancedCall::LoadBalancedCall(ChannelData* chand, - const grpc_call_element_args& args, - grpc_polling_entity* pollent) +ChannelData::LoadBalancedCall::LoadBalancedCall( + ChannelData* chand, const grpc_call_element_args& args, + grpc_polling_entity* pollent) : refs_(1, GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace) ? "LoadBalancedCall" : nullptr), @@ -4924,7 +4941,7 @@ LoadBalancedCall::LoadBalancedCall(ChannelData* chand, call_context_(args.context), pollent_(pollent) {} -LoadBalancedCall::~LoadBalancedCall() { +ChannelData::LoadBalancedCall::~LoadBalancedCall() { grpc_slice_unref_internal(path_); GRPC_ERROR_UNREF(cancel_error_); if (backend_metric_data_ != nullptr) { @@ -4937,43 +4954,45 @@ LoadBalancedCall::~LoadBalancedCall() { } } -RefCountedPtr LoadBalancedCall::Ref() { +RefCountedPtr +ChannelData::LoadBalancedCall::Ref() { IncrementRefCount(); return RefCountedPtr(this); } -RefCountedPtr LoadBalancedCall::Ref( +RefCountedPtr ChannelData::LoadBalancedCall::Ref( const DebugLocation& location, const char* reason) { IncrementRefCount(location, reason); return RefCountedPtr(this); } -void LoadBalancedCall::Unref() { +void ChannelData::LoadBalancedCall::Unref() { if (GPR_UNLIKELY(refs_.Unref())) { this->~LoadBalancedCall(); } } -void LoadBalancedCall::Unref(const DebugLocation& location, - const char* reason) { +void ChannelData::LoadBalancedCall::Unref(const DebugLocation& location, + const char* reason) { if (GPR_UNLIKELY(refs_.Unref(location, reason))) { this->~LoadBalancedCall(); } } -void LoadBalancedCall::IncrementRefCount() { refs_.Ref(); } +void ChannelData::LoadBalancedCall::IncrementRefCount() { refs_.Ref(); } -void LoadBalancedCall::IncrementRefCount(const DebugLocation& location, - const char* reason) { +void ChannelData::LoadBalancedCall::IncrementRefCount( + const DebugLocation& location, const char* reason) { refs_.Ref(location, reason); } -void* LoadBalancedCall::GetParentData() { +void* ChannelData::LoadBalancedCall::GetParentData() { return reinterpret_cast(this) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall)); } -size_t LoadBalancedCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) { +size_t ChannelData::LoadBalancedCall::GetBatchIndex( + grpc_transport_stream_op_batch* batch) { // Note: It is important the send_initial_metadata be the first entry // here, since the code in pick_subchannel_locked() assumes it will be. if (batch->send_initial_metadata) return 0; @@ -4986,7 +5005,7 @@ size_t LoadBalancedCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) { } // This is called via the call combiner, so access to calld is synchronized. -void LoadBalancedCall::PendingBatchesAdd( +void ChannelData::LoadBalancedCall::PendingBatchesAdd( grpc_transport_stream_op_batch* batch) { const size_t idx = GetBatchIndex(batch); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -4999,8 +5018,8 @@ void LoadBalancedCall::PendingBatchesAdd( } // This is called via the call combiner, so access to calld is synchronized. -void LoadBalancedCall::FailPendingBatchInCallCombiner(void* arg, - grpc_error* error) { +void ChannelData::LoadBalancedCall::FailPendingBatchInCallCombiner( + void* arg, grpc_error* error) { grpc_transport_stream_op_batch* batch = static_cast(arg); auto* self = static_cast(batch->handler_private.extra_arg); @@ -5010,7 +5029,7 @@ void LoadBalancedCall::FailPendingBatchInCallCombiner(void* arg, } // This is called via the call combiner, so access to calld is synchronized. -void LoadBalancedCall::PendingBatchesFail( +void ChannelData::LoadBalancedCall::PendingBatchesFail( grpc_error* error, YieldCallCombinerPredicate yield_call_combiner_predicate) { GPR_ASSERT(error != GRPC_ERROR_NONE); @@ -5045,7 +5064,7 @@ void LoadBalancedCall::PendingBatchesFail( } // This is called via the call combiner, so access to calld is synchronized. -void LoadBalancedCall::ResumePendingBatchInCallCombiner( +void ChannelData::LoadBalancedCall::ResumePendingBatchInCallCombiner( void* arg, grpc_error* /*ignored*/) { grpc_transport_stream_op_batch* batch = static_cast(arg); @@ -5056,7 +5075,7 @@ void LoadBalancedCall::ResumePendingBatchInCallCombiner( } // This is called via the call combiner, so access to calld is synchronized. -void LoadBalancedCall::PendingBatchesResume() { +void ChannelData::LoadBalancedCall::PendingBatchesResume() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { @@ -5084,7 +5103,7 @@ void LoadBalancedCall::PendingBatchesResume() { closures.RunClosures(call_combiner_); } -void LoadBalancedCall::StartTransportStreamOpBatch( +void ChannelData::LoadBalancedCall::StartTransportStreamOpBatch( grpc_transport_stream_op_batch* batch) { // Intercept recv_trailing_metadata_ready for LB callback. if (batch->recv_trailing_metadata) { @@ -5165,8 +5184,9 @@ void LoadBalancedCall::StartTransportStreamOpBatch( } } -void LoadBalancedCall::RecvTrailingMetadataReadyForLoadBalancingPolicy( - void* arg, grpc_error* error) { +void ChannelData::LoadBalancedCall:: + RecvTrailingMetadataReadyForLoadBalancingPolicy(void* arg, + grpc_error* error) { auto* self = static_cast(arg); if (self->lb_recv_trailing_metadata_ready_ != nullptr) { // Set error if call did not succeed. @@ -5204,8 +5224,9 @@ void LoadBalancedCall::RecvTrailingMetadataReadyForLoadBalancingPolicy( // TODO(roth): Consider not intercepting this callback unless we // actually need to, if this causes a performance problem. -void LoadBalancedCall::InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( - grpc_transport_stream_op_batch* batch) { +void ChannelData::LoadBalancedCall:: + InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( + grpc_transport_stream_op_batch* batch) { recv_trailing_metadata_ = batch->payload->recv_trailing_metadata.recv_trailing_metadata; original_recv_trailing_metadata_ready_ = @@ -5217,7 +5238,7 @@ void LoadBalancedCall::InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( &recv_trailing_metadata_ready_; } -void LoadBalancedCall::CreateSubchannelCall() { +void ChannelData::LoadBalancedCall::CreateSubchannelCall() { SubchannelCall::Args call_args = { std::move(connected_subchannel_), pollent_, path_, call_start_time_, deadline_, arena_, @@ -5245,7 +5266,7 @@ void LoadBalancedCall::CreateSubchannelCall() { // because there may be multiple LB picks happening in parallel. // Instead, we will probably need to maintain a list in the CallData // object of pending LB picks to be cancelled when the closure runs. -class LoadBalancedCall::LbQueuedCallCanceller { +class ChannelData::LoadBalancedCall::LbQueuedCallCanceller { public: explicit LbQueuedCallCanceller(RefCountedPtr lb_call) : lb_call_(std::move(lb_call)) { @@ -5260,7 +5281,7 @@ class LoadBalancedCall::LbQueuedCallCanceller { auto* lb_call = self->lb_call_.get(); auto* chand = lb_call->chand_; { - MutexLock lock(chand->data_plane_mu()); + MutexLock lock(&chand->data_plane_mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: cancelling queued pick: " @@ -5284,7 +5305,7 @@ class LoadBalancedCall::LbQueuedCallCanceller { grpc_closure closure_; }; -void LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() { +void ChannelData::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() { if (!queued_pending_lb_pick_) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list", @@ -5296,7 +5317,7 @@ void LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() { lb_call_canceller_ = nullptr; } -void LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() { +void ChannelData::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() { if (queued_pending_lb_pick_) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list", @@ -5309,12 +5330,12 @@ void LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() { lb_call_canceller_ = new LbQueuedCallCanceller(Ref()); } -void LoadBalancedCall::AsyncPickDone(grpc_error* error) { +void ChannelData::LoadBalancedCall::AsyncPickDone(grpc_error* error) { GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx); ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); } -void LoadBalancedCall::PickDone(void* arg, grpc_error* error) { +void ChannelData::LoadBalancedCall::PickDone(void* arg, grpc_error* error) { auto* self = static_cast(arg); if (error != GRPC_ERROR_NONE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { @@ -5341,11 +5362,12 @@ const char* PickResultTypeName( GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -void LoadBalancedCall::PickSubchannel(void* arg, grpc_error* error) { +void ChannelData::LoadBalancedCall::PickSubchannel(void* arg, + grpc_error* error) { auto* self = static_cast(arg); bool pick_complete; { - MutexLock lock(self->chand_->data_plane_mu()); + MutexLock lock(&self->chand_->data_plane_mu_); pick_complete = self->PickSubchannelLocked(&error); } if (pick_complete) { @@ -5354,7 +5376,7 @@ void LoadBalancedCall::PickSubchannel(void* arg, grpc_error* error) { } } -bool LoadBalancedCall::PickSubchannelLocked(grpc_error** error) { +bool ChannelData::LoadBalancedCall::PickSubchannelLocked(grpc_error** error) { GPR_ASSERT(connected_subchannel_ == nullptr); GPR_ASSERT(subchannel_call_ == nullptr); // Grab initial metadata. @@ -5371,7 +5393,7 @@ bool LoadBalancedCall::PickSubchannelLocked(grpc_error** error) { pick_args.call_state = &lb_call_state; Metadata initial_metadata(this, initial_metadata_batch); pick_args.initial_metadata = &initial_metadata; - auto result = chand_->picker()->Pick(pick_args); + auto result = chand_->picker_->Pick(pick_args); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log( GPR_INFO, @@ -5437,16 +5459,15 @@ bool LoadBalancedCall::PickSubchannelLocked(grpc_error** error) { * EXPORTED SYMBOLS */ -using grpc_core::CallData; using grpc_core::ChannelData; const grpc_channel_filter grpc_client_channel_filter = { - CallData::StartTransportStreamOpBatch, + ChannelData::CallData::StartTransportStreamOpBatch, ChannelData::StartTransportOp, - sizeof(CallData), - CallData::Init, - CallData::SetPollent, - CallData::Destroy, + sizeof(ChannelData::CallData), + ChannelData::CallData::Init, + ChannelData::CallData::SetPollent, + ChannelData::CallData::Destroy, sizeof(ChannelData), ChannelData::Init, ChannelData::Destroy,