From d3f50ace3953651c1f67a3c9da84821abae747fe Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 5 Sep 2019 08:13:59 -0700 Subject: [PATCH] Use mutex instead of combiner in client channel data plane. --- .../filters/client_channel/client_channel.cc | 450 +++++++++--------- src/core/lib/gprpp/ref_counted_ptr.h | 2 + test/core/gprpp/ref_counted_ptr_test.cc | 14 + 3 files changed, 229 insertions(+), 237 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8a0fabfa6fd..40ac1f22059 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -130,7 +130,7 @@ class ChannelData { return disconnect_error_.Load(MemoryOrder::ACQUIRE); } - grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; } + Mutex* data_plane_mu() const { return &data_plane_mu_; } LoadBalancingPolicy::SubchannelPicker* picker() const { return picker_.get(); @@ -166,8 +166,6 @@ class ChannelData { private: class SubchannelWrapper; - class ConnectivityStateAndPickerSetter; - class ServiceConfigSetter; class ClientChannelControlHelper; class ExternalConnectivityWatcher { @@ -214,6 +212,14 @@ class ChannelData { ChannelData(grpc_channel_element_args* args, grpc_error** error); ~ChannelData(); + void UpdateStateAndPickerLocked( + grpc_connectivity_state state, const char* reason, + UniquePtr picker); + + void UpdateServiceConfigLocked( + RefCountedPtr retry_throttle_data, + RefCountedPtr service_config); + void CreateResolvingLoadBalancingPolicyLocked(); void DestroyResolvingLoadBalancingPolicyLocked(); @@ -250,9 +256,9 @@ class ChannelData { channelz::ChannelNode* channelz_node_; // - // Fields used in the data plane. Guarded by data_plane_combiner. + // Fields used in the data plane. Guarded by data_plane_mu. // - grpc_combiner* data_plane_combiner_; + mutable Mutex data_plane_mu_; UniquePtr picker_; QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks. // Data from service config. @@ -282,13 +288,13 @@ class ChannelData { Map subchannel_wrappers_; // Pending ConnectedSubchannel updates for each SubchannelWrapper. // Updates are queued here in the control plane combiner and then applied - // in the data plane combiner when the picker is updated. + // in the data plane mutex when the picker is updated. Map, RefCountedPtr, RefCountedPtrLess> pending_subchannel_updates_; // - // Fields accessed from both data plane and control plane combiners. + // Fields accessed from both data plane mutex and control plane combiner. // Atomic disconnect_error_; @@ -322,7 +328,16 @@ class CallData { void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem); // Invoked by channel for queued picks when the picker is updated. - static void StartPickLocked(void* arg, grpc_error* error); + static void PickSubchannel(void* arg, grpc_error* error); + + // Helper function for performing a pick while holding the data plane + // mutex. Returns true if the pick is complete, in which case the caller + // must invoke PickDone() or AsyncPickDone() with the returned error. + bool PickSubchannelLocked(grpc_call_element* elem, grpc_error** error); + + // Schedules a callback to process the completed pick. The callback + // will not run until after this method returns. + void AsyncPickDone(grpc_call_element* elem, grpc_error* error); private: class QueuedPickCanceller; @@ -931,7 +946,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { return connected_subchannel_.get(); } - // Caller must be holding the data-plane combiner. + // Caller must be holding the data-plane mutex. ConnectedSubchannel* connected_subchannel_in_data_plane() const { return connected_subchannel_in_data_plane_.get(); } @@ -1059,7 +1074,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { // Update the connected subchannel only if the channel is not shutting // down. This is because once the channel is shutting down, we // ignore picker updates from the LB policy, which means that - // ConnectivityStateAndPickerSetter will never process the entries + // UpdateStateAndPickerLocked() will never process the entries // in chand_->pending_subchannel_updates_. So we don't want to add // entries there that will never be processed, since that would // leave dangling refs to the channel and prevent its destruction. @@ -1069,7 +1084,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { if (connected_subchannel_ != connected_subchannel) { connected_subchannel_ = std::move(connected_subchannel); // Record the new connected subchannel so that it can be updated - // in the data plane combiner the next time the picker is updated. + // in the data plane mutex the next time the picker is updated. chand_->pending_subchannel_updates_[Ref( DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_; } @@ -1086,159 +1101,10 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { Map watcher_map_; // To be accessed only in the control plane combiner. RefCountedPtr connected_subchannel_; - // To be accessed only in the data plane combiner. + // To be accessed only in the data plane mutex. RefCountedPtr connected_subchannel_in_data_plane_; }; -// -// ChannelData::ConnectivityStateAndPickerSetter -// - -// A fire-and-forget class that sets the channel's connectivity state -// and then hops into the data plane combiner to update the picker. -// Must be instantiated while holding the control plane combiner. -// Deletes itself when done. -class ChannelData::ConnectivityStateAndPickerSetter { - public: - ConnectivityStateAndPickerSetter( - ChannelData* chand, grpc_connectivity_state state, const char* reason, - UniquePtr picker) - : chand_(chand), picker_(std::move(picker)) { - // Clean the control plane when entering IDLE, while holding control plane - // combiner. - if (picker_ == nullptr) { - chand->health_check_service_name_.reset(); - chand->saved_service_config_.reset(); - chand->received_first_resolver_result_ = false; - } - // Update connectivity state here, while holding control plane combiner. - grpc_connectivity_state_set(&chand->state_tracker_, state, reason); - if (chand->channelz_node_ != nullptr) { - chand->channelz_node_->SetConnectivityState(state); - chand->channelz_node_->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string( - channelz::ChannelNode::GetChannelConnectivityStateChangeString( - state))); - } - // Grab any pending subchannel updates. - pending_subchannel_updates_ = - std::move(chand_->pending_subchannel_updates_); - // Bounce into the data plane combiner to reset the picker. - GRPC_CHANNEL_STACK_REF(chand->owning_stack_, - "ConnectivityStateAndPickerSetter"); - GRPC_CLOSURE_INIT(&closure_, SetPickerInDataPlane, this, - grpc_combiner_scheduler(chand->data_plane_combiner_)); - GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); - } - - private: - static void SetPickerInDataPlane(void* arg, grpc_error* ignored) { - auto* self = static_cast(arg); - // Handle subchannel updates. - for (auto& p : self->pending_subchannel_updates_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: updating subchannel wrapper %p data plane " - "connected_subchannel to %p", - self->chand_, p.first.get(), p.second.get()); - } - p.first->set_connected_subchannel_in_data_plane(std::move(p.second)); - } - // Swap out the picker. We hang on to the old picker so that it can - // be deleted in the control-plane combiner, since that's where we need - // to unref the subchannel wrappers that are reffed by the picker. - self->picker_.swap(self->chand_->picker_); - // Clean the data plane if the updated picker is nullptr. - if (self->chand_->picker_ == nullptr) { - self->chand_->received_service_config_data_ = false; - self->chand_->retry_throttle_data_.reset(); - self->chand_->service_config_.reset(); - } - // Re-process queued picks. - for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr; - pick = pick->next) { - CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE); - } - // Pop back into the control plane combiner to delete ourself, so - // that we make sure to unref subchannel wrappers there. This - // includes both the ones reffed by the old picker (now stored in - // self->picker_) and the ones in self->pending_subchannel_updates_. - GRPC_CLOSURE_INIT(&self->closure_, CleanUpInControlPlane, self, - grpc_combiner_scheduler(self->chand_->combiner_)); - GRPC_CLOSURE_SCHED(&self->closure_, GRPC_ERROR_NONE); - } - - static void CleanUpInControlPlane(void* arg, grpc_error* ignored) { - auto* self = static_cast(arg); - GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, - "ConnectivityStateAndPickerSetter"); - Delete(self); - } - - ChannelData* chand_; - UniquePtr picker_; - Map, RefCountedPtr, - RefCountedPtrLess> - pending_subchannel_updates_; - grpc_closure closure_; -}; - -// -// ChannelData::ServiceConfigSetter -// - -// A fire-and-forget class that sets the channel's service config data -// in the data plane combiner. Deletes itself when done. -class ChannelData::ServiceConfigSetter { - public: - ServiceConfigSetter( - ChannelData* chand, - Optional - retry_throttle_data, - RefCountedPtr service_config) - : chand_(chand), - retry_throttle_data_(retry_throttle_data), - service_config_(std::move(service_config)) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter"); - GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this, - grpc_combiner_scheduler(chand->data_plane_combiner_)); - GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); - } - - private: - static void SetServiceConfigData(void* arg, grpc_error* ignored) { - ServiceConfigSetter* self = static_cast(arg); - ChannelData* chand = self->chand_; - // Update channel state. - chand->received_service_config_data_ = true; - if (self->retry_throttle_data_.has_value()) { - chand->retry_throttle_data_ = - internal::ServerRetryThrottleMap::GetDataForServer( - chand->server_name_.get(), - self->retry_throttle_data_.value().max_milli_tokens, - self->retry_throttle_data_.value().milli_token_ratio); - } - chand->service_config_ = std::move(self->service_config_); - // Apply service config to queued picks. - for (QueuedPick* pick = chand->queued_picks_; pick != nullptr; - pick = pick->next) { - CallData* calld = static_cast(pick->elem->call_data); - calld->MaybeApplyServiceConfigToCallLocked(pick->elem); - } - // Clean up. - GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, - "ServiceConfigSetter"); - Delete(self); - } - - ChannelData* chand_; - Optional - retry_throttle_data_; - RefCountedPtr service_config_; - grpc_closure closure_; -}; - // // ChannelData::ExternalConnectivityWatcher::WatcherList // @@ -1409,9 +1275,7 @@ class ChannelData::ClientChannelControlHelper } // Do update only if not shutting down. if (disconnect_error == GRPC_ERROR_NONE) { - // Will delete itself. - New(chand_, state, "helper", - std::move(picker)); + chand_->UpdateStateAndPickerLocked(state, "helper", std::move(picker)); } } @@ -1495,7 +1359,6 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) client_channel_factory_( ClientChannelFactory::GetFromChannelArgs(args->channel_args)), channelz_node_(GetChannelzNode(args->channel_args)), - data_plane_combiner_(grpc_combiner_create()), combiner_(grpc_combiner_create()), interested_parties_(grpc_pollset_set_create()), subchannel_pool_(GetSubchannelPool(args->channel_args)), @@ -1568,13 +1431,108 @@ ChannelData::~ChannelData() { // Stop backup polling. grpc_client_channel_stop_backup_polling(interested_parties_); grpc_pollset_set_destroy(interested_parties_); - GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel"); GRPC_COMBINER_UNREF(combiner_, "client_channel"); GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED)); grpc_connectivity_state_destroy(&state_tracker_); gpr_mu_destroy(&info_mu_); } +void ChannelData::UpdateStateAndPickerLocked( + grpc_connectivity_state state, const char* reason, + UniquePtr picker) { + // Clean the control plane when entering IDLE. + if (picker_ == nullptr) { + health_check_service_name_.reset(); + saved_service_config_.reset(); + received_first_resolver_result_ = false; + } + // Update connectivity state. + grpc_connectivity_state_set(&state_tracker_, state, reason); + if (channelz_node_ != nullptr) { + channelz_node_->SetConnectivityState(state); + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string( + channelz::ChannelNode::GetChannelConnectivityStateChangeString( + state))); + } + // Grab data plane lock to do subchannel updates and update the picker. + // + // Note that we want to minimize the work done while holding the data + // plane lock, to keep the critical section small. So, for all of the + // objects that we might wind up unreffing here, we actually hold onto + // the refs until after we release the lock, and then unref them at + // that point. This includes the following: + // - refs to subchannel wrappers in the keys of pending_subchannel_updates_ + // - ref stored in retry_throttle_data_ + // - ref stored in service_config_ + // - ownership of the existing picker in picker_ + RefCountedPtr retry_throttle_data_to_unref; + RefCountedPtr service_config_to_unref; + { + MutexLock lock(&data_plane_mu_); + // Handle subchannel updates. + for (auto& p : pending_subchannel_updates_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: updating subchannel wrapper %p data plane " + "connected_subchannel to %p", + this, p.first.get(), p.second.get()); + } + // Note: We do not remove the entry from pending_subchannel_updates_ + // here, since this would unref the subchannel wrapper; instead, + // we wait until we've released the lock to clear the map. + p.first->set_connected_subchannel_in_data_plane(std::move(p.second)); + } + // Swap out the picker. + // Note: Original value will be destroyed after the lock is released. + picker_.swap(picker); + // Clean the data plane if the updated picker is nullptr. + if (picker_ == nullptr) { + received_service_config_data_ = false; + // Note: We save the objects to unref until after the lock is released. + retry_throttle_data_to_unref = std::move(retry_throttle_data_); + service_config_to_unref = std::move(service_config_); + } + // Re-process queued picks. + for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) { + grpc_call_element* elem = pick->elem; + CallData* calld = static_cast(elem->call_data); + grpc_error* error = GRPC_ERROR_NONE; + if (calld->PickSubchannelLocked(elem, &error)) { + calld->AsyncPickDone(elem, error); + } + } + } + // Clear the pending update map after releasing the lock, to keep the + // critical section small. + pending_subchannel_updates_.clear(); +} + +void ChannelData::UpdateServiceConfigLocked( + RefCountedPtr retry_throttle_data, + RefCountedPtr service_config) { + // Grab data plane lock to update service config. + // + // We defer unreffing the old values (and deallocating memory) until + // after releasing the lock to keep the critical section small. + { + MutexLock lock(&data_plane_mu_); + // Update service config. + received_service_config_data_ = true; + // Old values will be unreffed after lock is released. + retry_throttle_data_.swap(retry_throttle_data); + service_config_.swap(service_config); + // Apply service config to queued picks. + for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) { + CallData* calld = static_cast(pick->elem->call_data); + calld->MaybeApplyServiceConfigToCallLocked(pick->elem); + } + } + // Old values will be unreffed after lock is released when they go out + // of scope. +} + void ChannelData::CreateResolvingLoadBalancingPolicyLocked() { // Instantiate resolving LB policy. LoadBalancingPolicy::Args lb_args; @@ -1746,15 +1704,20 @@ bool ChannelData::ProcessResolverResultLocked( // if we feel it is unnecessary. if (service_config_changed || !chand->received_first_resolver_result_) { chand->received_first_resolver_result_ = true; - Optional - retry_throttle_data; + RefCountedPtr retry_throttle_data; if (parsed_service_config != nullptr) { - retry_throttle_data = parsed_service_config->retry_throttling(); + Optional + retry_throttle_config = parsed_service_config->retry_throttling(); + if (retry_throttle_config.has_value()) { + retry_throttle_data = + internal::ServerRetryThrottleMap::GetDataForServer( + chand->server_name_.get(), + retry_throttle_config.value().max_milli_tokens, + retry_throttle_config.value().milli_token_ratio); + } } - // Create service config setter to update channel state in the data - // plane combiner. Destroys itself when done. - New(chand, retry_throttle_data, - chand->saved_service_config_); + chand->UpdateServiceConfigLocked(std::move(retry_throttle_data), + chand->saved_service_config_); } UniquePtr processed_lb_policy_name; chand->ProcessLbPolicy(result, parsed_service_config, @@ -1838,8 +1801,8 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { static_cast(value) == GRPC_CHANNEL_IDLE) { if (chand->disconnect_error() == GRPC_ERROR_NONE) { // Enter IDLE state. - New(chand, GRPC_CHANNEL_IDLE, - "channel entering IDLE", nullptr); + chand->UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, + "channel entering IDLE", nullptr); } GRPC_ERROR_UNREF(op->disconnect_with_error); } else { @@ -1848,8 +1811,8 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { GRPC_ERROR_NONE); chand->disconnect_error_.Store(op->disconnect_with_error, MemoryOrder::RELEASE); - New( - chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API", + chand->UpdateStateAndPickerLocked( + GRPC_CHANNEL_SHUTDOWN, "shutdown from API", UniquePtr( New( GRPC_ERROR_REF(op->disconnect_with_error)))); @@ -2092,8 +2055,8 @@ void CallData::StartTransportStreamOpBatch( // Add the batch to the pending list. calld->PendingBatchesAdd(elem, batch); // Check if we've already gotten a subchannel call. - // Note that once we have completed the pick, we do not need to enter - // the channel combiner, which is more efficient (especially for + // Note that once we have picked a subchannel, we do not need to acquire + // the channel's data plane mutex, which is more efficient (especially for // streaming calls). if (calld->subchannel_call_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -2105,18 +2068,15 @@ void CallData::StartTransportStreamOpBatch( return; } // We do not yet have a subchannel call. - // For batches containing a send_initial_metadata op, enter the channel - // combiner to start a pick. + // For batches containing a send_initial_metadata op, acquire the + // channel's data plane mutex to pick a subchannel. if (GPR_LIKELY(batch->send_initial_metadata)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner", + gpr_log(GPR_INFO, + "chand=%p calld=%p: grabbing data plane mutex to perform pick", chand, calld); } - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT( - &batch->handler_private.closure, StartPickLocked, elem, - grpc_combiner_scheduler(chand->data_plane_combiner())), - GRPC_ERROR_NONE); + PickSubchannel(elem, GRPC_ERROR_NONE); } else { // For all other batches, release the call combiner. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -2544,8 +2504,8 @@ void CallData::DoRetry(grpc_call_element* elem, this, next_attempt_time - ExecCtx::Get()->Now()); } // Schedule retry after computed delay. - GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem, - grpc_combiner_scheduler(chand->data_plane_combiner())); + GRPC_CLOSURE_INIT(&pick_closure_, PickSubchannel, elem, + grpc_schedule_on_exec_ctx); grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_); // Update bookkeeping. if (retry_state != nullptr) retry_state->retry_dispatched = true; @@ -3660,6 +3620,11 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) { } } +void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) { + GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_SCHED(&pick_closure_, error); +} + void CallData::PickDone(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); ChannelData* chand = static_cast(elem->channel_data); @@ -3682,10 +3647,9 @@ class CallData::QueuedPickCanceller { public: explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) { auto* calld = static_cast(elem->call_data); - auto* chand = static_cast(elem->channel_data); GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller"); GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, - grpc_combiner_scheduler(chand->data_plane_combiner())); + grpc_schedule_on_exec_ctx); calld->call_combiner_->SetNotifyOnCancel(&closure_); } @@ -3694,6 +3658,7 @@ class CallData::QueuedPickCanceller { auto* self = static_cast(arg); auto* chand = static_cast(self->elem_->channel_data); auto* calld = static_cast(self->elem_->call_data); + MutexLock lock(chand->data_plane_mu()); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling queued pick: " @@ -3818,23 +3783,38 @@ const char* PickResultTypeName( GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -void CallData::StartPickLocked(void* arg, grpc_error* error) { +void CallData::PickSubchannel(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); CallData* calld = static_cast(elem->call_data); ChannelData* chand = static_cast(elem->channel_data); - GPR_ASSERT(calld->connected_subchannel_ == nullptr); - GPR_ASSERT(calld->subchannel_call_ == nullptr); - // picker's being null means the channel is currently in IDLE state. The - // incoming call will make the channel exit IDLE and queue itself. + bool pick_complete; + { + MutexLock lock(chand->data_plane_mu()); + pick_complete = calld->PickSubchannelLocked(elem, &error); + } + if (pick_complete) { + PickDone(elem, error); + GRPC_ERROR_UNREF(error); + } +} + +bool CallData::PickSubchannelLocked(grpc_call_element* elem, + grpc_error** error) { + ChannelData* chand = static_cast(elem->channel_data); + GPR_ASSERT(connected_subchannel_ == nullptr); + GPR_ASSERT(subchannel_call_ == nullptr); + // The picker being null means that the channel is currently in IDLE state. + // The incoming call will make the channel exit IDLE. if (chand->picker() == nullptr) { - // We are currently in the data plane. - // Bounce into the control plane to exit IDLE. - chand->CheckConnectivityState(true); - calld->AddCallToQueuedPicksLocked(elem); - return; + // Bounce into the control plane combiner to exit IDLE. + chand->CheckConnectivityState(/*try_to_connect=*/true); + // Queue the pick, so that it will be attempted once the channel + // becomes connected. + AddCallToQueuedPicksLocked(elem); + return false; } // Apply service config to call if needed. - calld->MaybeApplyServiceConfigToCallLocked(elem); + MaybeApplyServiceConfigToCallLocked(elem); // If this is a retry, use the send_initial_metadata payload that // we've cached; otherwise, use the pending batch. The // send_initial_metadata batch will be the first pending batch in the @@ -3846,31 +3826,27 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { // subchannel's copy of the metadata batch (which is copied for each // attempt) to the LB policy instead the one from the parent channel. LoadBalancingPolicy::PickArgs pick_args; - pick_args.call_state = &calld->lb_call_state_; + pick_args.call_state = &lb_call_state_; Metadata initial_metadata( - calld, - calld->seen_send_initial_metadata_ - ? &calld->send_initial_metadata_ - : calld->pending_batches_[0] + this, + seen_send_initial_metadata_ + ? &send_initial_metadata_ + : pending_batches_[0] .batch->payload->send_initial_metadata.send_initial_metadata); pick_args.initial_metadata = &initial_metadata; // Grab initial metadata flags so that we can check later if the call has // wait_for_ready enabled. const uint32_t send_initial_metadata_flags = - calld->seen_send_initial_metadata_ - ? calld->send_initial_metadata_flags_ - : calld->pending_batches_[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - // When done, we schedule this closure to leave the data plane combiner. - GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem, - grpc_schedule_on_exec_ctx); + seen_send_initial_metadata_ ? send_initial_metadata_flags_ + : pending_batches_[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; // Attempt pick. auto result = chand->picker()->Pick(pick_args); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)", - chand, calld, PickResultTypeName(result.type), + chand, this, PickResultTypeName(result.type), result.subchannel.get(), grpc_error_string(result.error)); } switch (result.type) { @@ -3879,10 +3855,9 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { grpc_error* disconnect_error = chand->disconnect_error(); if (disconnect_error != GRPC_ERROR_NONE) { GRPC_ERROR_UNREF(result.error); - GRPC_CLOSURE_SCHED(&calld->pick_closure_, - GRPC_ERROR_REF(disconnect_error)); - if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem); - break; + if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); + *error = GRPC_ERROR_REF(disconnect_error); + return true; } // If wait_for_ready is false, then the error indicates the RPC // attempt's final status. @@ -3890,19 +3865,20 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { // Retry if appropriate; otherwise, fail. grpc_status_code status = GRPC_STATUS_OK; - grpc_error_get_status(result.error, calld->deadline_, &status, nullptr, + grpc_error_get_status(result.error, deadline_, &status, nullptr, nullptr, nullptr); - if (!calld->enable_retries_ || - !calld->MaybeRetry(elem, nullptr /* batch_data */, status, - nullptr /* server_pushback_md */)) { + const bool retried = enable_retries_ && + MaybeRetry(elem, nullptr /* batch_data */, status, + nullptr /* server_pushback_md */); + if (!retried) { grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to pick subchannel", &result.error, 1); GRPC_ERROR_UNREF(result.error); - GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error); + *error = new_error; } - if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem); - break; + if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); + return !retried; } // If wait_for_ready is true, then queue to retry when we get a new // picker. @@ -3910,26 +3886,26 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { } // Fallthrough case LoadBalancingPolicy::PickResult::PICK_QUEUE: - if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem); - break; + if (!pick_queued_) AddCallToQueuedPicksLocked(elem); + return false; default: // PICK_COMPLETE + if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); // Handle drops. if (GPR_UNLIKELY(result.subchannel == nullptr)) { result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); } else { // Grab a ref to the connected subchannel while we're still - // holding the data plane combiner. - calld->connected_subchannel_ = + // holding the data plane mutex. + connected_subchannel_ = chand->GetConnectedSubchannelInDataPlane(result.subchannel.get()); - GPR_ASSERT(calld->connected_subchannel_ != nullptr); + GPR_ASSERT(connected_subchannel_ != nullptr); } - calld->lb_recv_trailing_metadata_ready_ = - result.recv_trailing_metadata_ready; - calld->lb_recv_trailing_metadata_ready_user_data_ = + lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready; + lb_recv_trailing_metadata_ready_user_data_ = result.recv_trailing_metadata_ready_user_data; - GRPC_CLOSURE_SCHED(&calld->pick_closure_, result.error); - if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem); + *error = result.error; + return true; } } diff --git a/src/core/lib/gprpp/ref_counted_ptr.h b/src/core/lib/gprpp/ref_counted_ptr.h index 19f38d7f013..2e3f4671581 100644 --- a/src/core/lib/gprpp/ref_counted_ptr.h +++ b/src/core/lib/gprpp/ref_counted_ptr.h @@ -103,6 +103,8 @@ class RefCountedPtr { if (value_ != nullptr) value_->Unref(); } + void swap(RefCountedPtr& other) { std::swap(value_, other.value_); } + // If value is non-null, we take ownership of a ref to it. void reset(T* value = nullptr) { if (value_ != nullptr) value_->Unref(); diff --git a/test/core/gprpp/ref_counted_ptr_test.cc b/test/core/gprpp/ref_counted_ptr_test.cc index 96dbdf884b0..38bd4b61364 100644 --- a/test/core/gprpp/ref_counted_ptr_test.cc +++ b/test/core/gprpp/ref_counted_ptr_test.cc @@ -151,6 +151,20 @@ TEST(RefCountedPtr, EqualityOperators) { EXPECT_NE(foo, nullptr); } +TEST(RefCountedPtr, Swap) { + Foo* foo = New(); + Foo* bar = New(); + RefCountedPtr ptr1(foo); + RefCountedPtr ptr2(bar); + ptr1.swap(ptr2); + EXPECT_EQ(foo, ptr2.get()); + EXPECT_EQ(bar, ptr1.get()); + RefCountedPtr ptr3; + ptr3.swap(ptr2); + EXPECT_EQ(nullptr, ptr2.get()); + EXPECT_EQ(foo, ptr3.get()); +} + TEST(MakeRefCounted, NoArgs) { RefCountedPtr foo = MakeRefCounted(); EXPECT_EQ(0, foo->value());