diff --git a/BUILD b/BUILD index 71781794242..9f96fc622f9 100644 --- a/BUILD +++ b/BUILD @@ -2785,6 +2785,7 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/container:flat_hash_set", "absl/container:inlined_vector", "absl/status", "absl/status:statusor", diff --git a/src/core/BUILD b/src/core/BUILD index c87277c9048..d14bc3008f1 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -2495,6 +2495,7 @@ grpc_cc_library( srcs = ["lib/load_balancing/lb_policy.cc"], hdrs = ["lib/load_balancing/lb_policy.h"], external_deps = [ + "absl/base:core_headers", "absl/status", "absl/status:statusor", "absl/strings", @@ -2514,6 +2515,7 @@ grpc_cc_library( "//:debug_location", "//:event_engine_base_hdrs", "//:exec_ctx", + "//:gpr", "//:gpr_platform", "//:grpc_trace", "//:orphanable", @@ -3849,6 +3851,7 @@ grpc_cc_library( "absl/base:core_headers", "absl/functional:bind_front", "absl/memory", + "absl/random", "absl/status", "absl/status:statusor", "absl/strings", @@ -4421,6 +4424,7 @@ grpc_cc_library( "json_object_loader", "lb_policy", "lb_policy_factory", + "ref_counted", "subchannel_interface", "unique_type_name", "validation_errors", @@ -4445,6 +4449,7 @@ grpc_cc_library( "ext/filters/client_channel/lb_policy/round_robin/round_robin.cc", ], external_deps = [ + "absl/random", "absl/status", "absl/status:statusor", "absl/strings", @@ -4631,6 +4636,7 @@ grpc_cc_library( "ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc", ], external_deps = [ + "absl/base:core_headers", "absl/random", "absl/status", "absl/status:statusor", diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 4390e2223da..dfd529d0ebc 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include "absl/status/status.h" @@ -115,19 +115,12 @@ class ClientChannel::CallData { grpc_call_element* elem, grpc_transport_stream_op_batch* batch); static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent); - // Invoked by channel for queued calls when name resolution is completed. - static void CheckResolution(void* arg, grpc_error_handle error); - // Helper function for applying the service config to a call while - // holding ClientChannel::resolution_mu_. - // Returns true if the service config has been applied to the call, in which - // case the caller must invoke ResolutionDone() or AsyncResolutionDone() - // with the returned error. - bool CheckResolutionLocked(grpc_call_element* elem, grpc_error_handle* error) + void CheckResolution(grpc_call_element* elem, bool was_queued); + + // Removes the call from the channel's list of calls queued + // for name resolution. + void RemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); - // Schedules a callback to continue processing the call once - // resolution is complete. The callback will not run until after this - // method returns. - void AsyncResolutionDone(grpc_call_element* elem, grpc_error_handle error); private: class ResolverQueuedCallCanceller; @@ -166,23 +159,22 @@ class ClientChannel::CallData { // Resumes all pending batches on lb_call_. void PendingBatchesResume(grpc_call_element* elem); + // Helper function for CheckResolution(). Returns true if the call + // can continue (i.e., there is a valid resolution result, or there is + // an invalid resolution result but the call is not wait_for_ready). + bool CheckResolutionLocked( + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, + absl::StatusOr>* config_selector) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. // If an error is returned, the error indicates the status with which // the call should be failed. grpc_error_handle ApplyServiceConfigToCallLocked( - grpc_call_element* elem, grpc_metadata_batch* initial_metadata) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); - // Invoked when the resolver result is applied to the caller, on both - // success or failure. - static void ResolutionDone(void* arg, grpc_error_handle error); - // Removes the call (if present) from the channel's list of calls queued - // for name resolution. - void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); - // Adds the call (if not already present) to the channel's list of - // calls queued for name resolution. - void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem) + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, + const absl::StatusOr>& config_selector); + // Adds the call to the channel's list of calls queued for name resolution. + void AddCallToResolverQueuedCallsLocked(grpc_call_element* elem) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback( @@ -204,15 +196,7 @@ class ClientChannel::CallData { grpc_polling_entity* pollent_ = nullptr; - grpc_closure resolution_done_closure_; - // Accessed while holding ClientChannel::resolution_mu_. - bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = - false; - bool queued_pending_resolver_result_ - ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = false; - ClientChannel::ResolverQueuedCall resolver_queued_call_ - ABSL_GUARDED_BY(&ClientChannel::resolution_mu_); ResolverQueuedCallCanceller* resolver_call_canceller_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = nullptr; @@ -520,20 +504,18 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { } private: - // Subchannel and SubchannelInterface have different interfaces for - // their respective ConnectivityStateWatcherInterface classes. - // The one in Subchannel updates the ConnectedSubchannel along with - // the state, whereas the one in SubchannelInterface does not expose - // the ConnectedSubchannel. - // - // This wrapper provides a bridge between the two. It implements - // Subchannel::ConnectivityStateWatcherInterface and wraps + // This wrapper provides a bridge between the internal Subchannel API + // and the SubchannelInterface API that we expose to LB policies. + // It implements Subchannel::ConnectivityStateWatcherInterface and wraps // the instance of SubchannelInterface::ConnectivityStateWatcherInterface // that was passed in by the LB policy. We pass an instance of this // class to the underlying Subchannel, and when we get updates from // the subchannel, we pass those on to the wrapped watcher to return - // the update to the LB policy. This allows us to set the connected - // subchannel before passing the result back to the LB policy. + // the update to the LB policy. + // + // This class handles things like hopping into the WorkSerializer + // before passing notifications to the LB policy and propagating + // keepalive information betwen subchannels. class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface { public: WatcherWrapper( @@ -571,16 +553,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { } grpc_pollset_set* interested_parties() override { - SubchannelInterface::ConnectivityStateWatcherInterface* watcher = - watcher_.get(); - if (watcher_ == nullptr) watcher = replacement_->watcher_.get(); - return watcher->interested_parties(); - } - - WatcherWrapper* MakeReplacement() { - auto* replacement = new WatcherWrapper(std::move(watcher_), parent_); - replacement_ = replacement; - return replacement; + return watcher_->interested_parties(); } private: @@ -638,7 +611,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { std::unique_ptr watcher_; RefCountedPtr parent_; - WatcherWrapper* replacement_ = nullptr; }; ClientChannel* chand_; @@ -1099,6 +1071,20 @@ ChannelArgs ClientChannel::MakeSubchannelArgs( .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE); } +void ClientChannel::ReprocessQueuedResolverCalls() { + for (grpc_call_element* elem : resolver_queued_calls_) { + CallData* calld = static_cast(elem->call_data); + calld->RemoveCallFromResolverQueuedCallsLocked(elem); + owning_stack_->EventEngine()->Run([elem]() { + ApplicationCallbackExecCtx application_exec_ctx; + ExecCtx exec_ctx; + CallData* calld = static_cast(elem->call_data); + calld->CheckResolution(elem, /*was_queued=*/true); + }); + } + resolver_queued_calls_.clear(); +} + namespace { RefCountedPtr ChooseLbPolicy( @@ -1305,26 +1291,19 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) { // result, then we continue to let it set the connectivity state. // Otherwise, we go into TRANSIENT_FAILURE. if (lb_policy_ == nullptr) { + // Update connectivity state. + // TODO(roth): We should be updating the connectivity state here but + // not the picker. + UpdateStateAndPickerLocked( + GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure", + MakeRefCounted(status)); { MutexLock lock(&resolution_mu_); // Update resolver transient failure. resolver_transient_failure_error_ = MaybeRewriteIllegalStatusCode(status, "resolver"); - // Process calls that were queued waiting for the resolver result. - for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr; - call = call->next) { - grpc_call_element* elem = call->elem; - CallData* calld = static_cast(elem->call_data); - grpc_error_handle error; - if (calld->CheckResolutionLocked(elem, &error)) { - calld->AsyncResolutionDone(elem, error); - } - } + ReprocessQueuedResolverCalls(); } - // Update connectivity state. - UpdateStateAndPickerLocked( - GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure", - MakeRefCounted(status)); } } @@ -1378,30 +1357,6 @@ OrphanablePtr ClientChannel::CreateLbPolicyLocked( return lb_policy; } -void ClientChannel::AddResolverQueuedCall(ResolverQueuedCall* call, - grpc_polling_entity* pollent) { - // Add call to queued calls list. - call->next = resolver_queued_calls_; - resolver_queued_calls_ = call; - // Add call's pollent to channel's interested_parties, so that I/O - // can be done under the call's CQ. - grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_); -} - -void ClientChannel::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove, - grpc_polling_entity* pollent) { - // Remove call's pollent from channel's interested_parties. - grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_); - // Remove from queued calls list. - for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr; - call = &(*call)->next) { - if (*call == to_remove) { - *call = to_remove->next; - return; - } - } -} - void ClientChannel::UpdateServiceConfigInControlPlaneLocked( RefCountedPtr service_config, RefCountedPtr config_selector, std::string lb_policy_name) { @@ -1468,25 +1423,8 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { service_config_.swap(service_config); config_selector_.swap(config_selector); dynamic_filters_.swap(dynamic_filters); - // Process calls that were queued waiting for the resolver result. - for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr; - call = call->next) { - // If there are a lot of queued calls here, resuming them all may cause us - // to stay inside C-core for a long period of time. All of that work would - // be done using the same ExecCtx instance and therefore the same cached - // value of "now". The longer it takes to finish all of this work and exit - // from C-core, the more stale the cached value of "now" may become. This - // can cause problems whereby (e.g.) we calculate a timer deadline based - // on the stale value, which results in the timer firing too early. To - // avoid this, we invalidate the cached value for each call we process. - ExecCtx::Get()->InvalidateNow(); - grpc_call_element* elem = call->elem; - CallData* calld = static_cast(elem->call_data); - grpc_error_handle error; - if (calld->CheckResolutionLocked(elem, &error)) { - calld->AsyncResolutionDone(elem, error); - } - } + // Re-process queued calls asynchronously. + ReprocessQueuedResolverCalls(); } // Old values will be unreffed after lock is released when they go out // of scope. @@ -1502,6 +1440,10 @@ void ClientChannel::CreateResolverLocked() { // Since the validity of the args was checked when the channel was created, // CreateResolver() must return a non-null result. GPR_ASSERT(resolver_ != nullptr); + // TODO(roth): We should be updating the connectivity state here but + // not the picker. But we need to make sure that we are initializing + // the picker to a queueing picker somewhere, in case the LB policy + // does not immediately return a new picker. UpdateStateAndPickerLocked( GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving", MakeRefCounted(nullptr)); @@ -1562,29 +1504,22 @@ void ClientChannel::UpdateStateAndPickerLocked( channelz::ChannelNode::GetChannelConnectivityStateChangeString( state))); } - // Grab data plane lock to update the picker. + // Grab the LB lock to update the picker and trigger reprocessing of the + // queued picks. + // Old picker will be unreffed after releasing the lock. { - MutexLock lock(&data_plane_mu_); - // Swap out the picker. - // Note: Original value will be destroyed after the lock is released. + MutexLock lock(&lb_mu_); picker_.swap(picker); - // Re-process queued picks. - for (LbQueuedCall* call = lb_queued_calls_; call != nullptr; - call = call->next) { - // If there are a lot of queued calls here, resuming them all may cause us - // to stay inside C-core for a long period of time. All of that work would - // be done using the same ExecCtx instance and therefore the same cached - // value of "now". The longer it takes to finish all of this work and exit - // from C-core, the more stale the cached value of "now" may become. This - // can cause problems whereby (e.g.) we calculate a timer deadline based - // on the stale value, which results in the timer firing too early. To - // avoid this, we invalidate the cached value for each call we process. - ExecCtx::Get()->InvalidateNow(); - grpc_error_handle error; - if (call->lb_call->PickSubchannelLocked(&error)) { - call->lb_call->AsyncPickDone(error); - } + // Reprocess queued picks asynchronously. + for (LoadBalancedCall* call : lb_queued_calls_) { + call->RemoveCallFromLbQueuedCallsLocked(); + owning_stack_->EventEngine()->Run([call]() { + ApplicationCallbackExecCtx application_exec_ctx; + ExecCtx exec_ctx; + call->PickSubchannel(/*was_queued=*/true); + }); } + lb_queued_calls_.clear(); } } @@ -1628,7 +1563,7 @@ grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) { } LoadBalancingPolicy::PickResult result; { - MutexLock lock(&data_plane_mu_); + MutexLock lock(&lb_mu_); result = picker_->Pick(LoadBalancingPolicy::PickArgs()); } return HandlePickResult( @@ -1712,6 +1647,9 @@ void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) { GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API", MakeRefCounted( grpc_error_to_absl_status(op->disconnect_with_error))); + // TODO(roth): If this happens when we're still waiting for a + // resolver result, we need to trigger failures for all calls in + // the resolver queue here. } } GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op"); @@ -1748,30 +1686,6 @@ void ClientChannel::GetChannelInfo(grpc_channel_element* elem, } } -void ClientChannel::AddLbQueuedCall(LbQueuedCall* call, - grpc_polling_entity* pollent) { - // Add call to queued picks list. - call->next = lb_queued_calls_; - lb_queued_calls_ = call; - // Add call's pollent to channel's interested_parties, so that I/O - // can be done under the call's CQ. - grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_); -} - -void ClientChannel::RemoveLbQueuedCall(LbQueuedCall* to_remove, - grpc_polling_entity* pollent) { - // Remove call's pollent from channel's interested_parties. - grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_); - // Remove from queued picks list. - for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr; - call = &(*call)->next) { - if (*call == to_remove) { - *call = to_remove->next; - return; - } - } -} - void ClientChannel::TryToConnectLocked() { if (lb_policy_ != nullptr) { lb_policy_->ExitIdleLocked(); @@ -1938,7 +1852,23 @@ void ClientChannel::CallData::StartTransportStreamOpBatch( "config", chand, calld); } - CheckResolution(elem, absl::OkStatus()); + // If we're still in IDLE, we need to start resolving. + if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == + GRPC_CHANNEL_IDLE)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand, + calld); + } + // Bounce into the control plane work serializer to start resolving. + GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ExitIdle"); + chand->work_serializer_->Run( + [chand]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) { + chand->CheckConnectivityState(/*try_to_connect=*/true); + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle"); + }, + DEBUG_LOCATION); + } + calld->CheckResolution(elem, /*was_queued=*/false); } else { // For all other batches, release the call combiner. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -1964,8 +1894,7 @@ void ClientChannel::CallData::SetPollent(grpc_call_element* elem, size_t ClientChannel::CallData::GetBatchIndex( grpc_transport_stream_op_batch* batch) { // Note: It is important the send_initial_metadata be the first entry - // here, since the code in ApplyServiceConfigToCallLocked() and - // CheckResolutionLocked() assumes it will be. + // here, since the code in CheckResolution() assumes it will be. if (batch->send_initial_metadata) return 0; if (batch->send_message) return 1; if (batch->send_trailing_metadata) return 2; @@ -2109,7 +2038,8 @@ class ClientChannel::CallData::ResolverQueuedCallCanceller { } if (calld->resolver_call_canceller_ == self && !error.ok()) { // Remove pick from list of queued picks. - calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_); + calld->RemoveCallFromResolverQueuedCallsLocked(self->elem_); + chand->resolver_queued_calls_.erase(self->elem_); // Fail pending batches on the call. calld->PendingBatchesFail(self->elem_, error, YieldCallCombinerIfPendingBatchesFound); @@ -2123,98 +2053,91 @@ class ClientChannel::CallData::ResolverQueuedCallCanceller { grpc_closure closure_; }; -void ClientChannel::CallData::MaybeRemoveCallFromResolverQueuedCallsLocked( +void ClientChannel::CallData::RemoveCallFromResolverQueuedCallsLocked( grpc_call_element* elem) { - if (!queued_pending_resolver_result_) return; auto* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: removing from resolver queued picks list", chand, this); } - chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_); - queued_pending_resolver_result_ = false; + // Remove call's pollent from channel's interested_parties. + grpc_polling_entity_del_from_pollset_set(pollent_, + chand->interested_parties_); // Lame the call combiner canceller. resolver_call_canceller_ = nullptr; - // Add trace annotation - auto* call_tracer = - static_cast(call_context_[GRPC_CONTEXT_CALL_TRACER].value); - if (call_tracer != nullptr) { - call_tracer->RecordAnnotation("Delayed name resolution complete."); - } + // Note: There's no need to actually remove the call from the queue + // here, beacuse that will be done in + // ResolverQueuedCallCanceller::CancelLocked() or + // ClientChannel::ReprocessQueuedResolverCalls(). } -void ClientChannel::CallData::MaybeAddCallToResolverQueuedCallsLocked( +void ClientChannel::CallData::AddCallToResolverQueuedCallsLocked( grpc_call_element* elem) { - if (queued_pending_resolver_result_) return; auto* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list", chand, this); } - queued_pending_resolver_result_ = true; - resolver_queued_call_.elem = elem; - chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_); + // Add call's pollent to channel's interested_parties, so that I/O + // can be done under the call's CQ. + grpc_polling_entity_add_to_pollset_set(pollent_, chand->interested_parties_); + // Add to queue. + chand->resolver_queued_calls_.insert(elem); // Register call combiner cancellation callback. resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem); } grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked( - grpc_call_element* elem, grpc_metadata_batch* initial_metadata) { + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, + const absl::StatusOr>& config_selector) { ClientChannel* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", chand, this); } - ConfigSelector* config_selector = chand->config_selector_.get(); - if (config_selector != nullptr) { - // Use the ConfigSelector to determine the config for the call. - auto call_config = - config_selector->GetCallConfig({&path_, initial_metadata, arena()}); - if (!call_config.ok()) { - return absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode( - call_config.status(), "ConfigSelector")); - } - // Create a ClientChannelServiceConfigCallData for the call. This stores - // a ref to the ServiceConfig and caches the right set of parsed configs - // to use for the call. The ClientChannelServiceConfigCallData will store - // itself in the call context, so that it can be accessed by filters - // below us in the stack, and it will be cleaned up when the call ends. - auto* service_config_call_data = - arena()->New( - std::move(call_config->service_config), call_config->method_configs, - std::move(call_config->call_attributes), - call_config->call_dispatch_controller, call_context_); - // Apply our own method params to the call. - auto* method_params = static_cast( - service_config_call_data->GetMethodParsedConfig( - chand->service_config_parser_index_)); - if (method_params != nullptr) { - // If the deadline from the service config is shorter than the one - // from the client API, reset the deadline timer. - if (chand->deadline_checking_enabled_ && - method_params->timeout() != Duration::Zero()) { - const Timestamp per_method_deadline = - Timestamp::FromCycleCounterRoundUp(call_start_time_) + - method_params->timeout(); - if (per_method_deadline < deadline_) { - deadline_ = per_method_deadline; - grpc_deadline_state_reset(&deadline_state_, deadline_); - } - } - // If the service config set wait_for_ready and the application - // did not explicitly set it, use the value from the service config. - auto* wait_for_ready = - pending_batches_[0] - ->payload->send_initial_metadata.send_initial_metadata - ->GetOrCreatePointer(WaitForReady()); - if (method_params->wait_for_ready().has_value() && - !wait_for_ready->explicitly_set) { - wait_for_ready->value = method_params->wait_for_ready().value(); + if (!config_selector.ok()) return config_selector.status(); + // Use the ConfigSelector to determine the config for the call. + auto call_config = + (*config_selector)->GetCallConfig({&path_, initial_metadata, arena()}); + if (!call_config.ok()) { + return absl_status_to_grpc_error( + MaybeRewriteIllegalStatusCode(call_config.status(), "ConfigSelector")); + } + // Create a ClientChannelServiceConfigCallData for the call. This stores + // a ref to the ServiceConfig and caches the right set of parsed configs + // to use for the call. The ClientChannelServiceConfigCallData will store + // itself in the call context, so that it can be accessed by filters + // below us in the stack, and it will be cleaned up when the call ends. + auto* service_config_call_data = + arena()->New( + std::move(call_config->service_config), call_config->method_configs, + std::move(call_config->call_attributes), + call_config->call_dispatch_controller, call_context_); + // Apply our own method params to the call. + auto* method_params = static_cast( + service_config_call_data->GetMethodParsedConfig( + chand->service_config_parser_index_)); + if (method_params != nullptr) { + // If the deadline from the service config is shorter than the one + // from the client API, reset the deadline timer. + if (chand->deadline_checking_enabled_ && + method_params->timeout() != Duration::Zero()) { + const Timestamp per_method_deadline = + Timestamp::FromCycleCounterRoundUp(call_start_time_) + + method_params->timeout(); + if (per_method_deadline < deadline_) { + deadline_ = per_method_deadline; + grpc_deadline_state_reset(&deadline_state_, deadline_); } } - // Set the dynamic filter stack. - dynamic_filters_ = chand->dynamic_filters_; + // If the service config set wait_for_ready and the application + // did not explicitly set it, use the value from the service config. + auto* wait_for_ready = initial_metadata->GetOrCreatePointer(WaitForReady()); + if (method_params->wait_for_ready().has_value() && + !wait_for_ready->explicitly_set) { + wait_for_ready->value = method_params->wait_for_ready().value(); + } } return absl::OkStatus(); } @@ -2243,80 +2166,60 @@ void ClientChannel::CallData:: error); } -void ClientChannel::CallData::AsyncResolutionDone(grpc_call_element* elem, - grpc_error_handle error) { - // TODO(roth): Does this callback need to hold a ref to the call stack? - GRPC_CLOSURE_INIT(&resolution_done_closure_, ResolutionDone, elem, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &resolution_done_closure_, error); -} - -void ClientChannel::CallData::ResolutionDone(void* arg, - grpc_error_handle error) { - grpc_call_element* elem = static_cast(arg); +void ClientChannel::CallData::CheckResolution(grpc_call_element* elem, + bool was_queued) { ClientChannel* chand = static_cast(elem->channel_data); - CallData* calld = static_cast(elem->call_data); + grpc_metadata_batch* initial_metadata = + pending_batches_[0]->payload->send_initial_metadata.send_initial_metadata; + // Check if we have a resolver result to use. + absl::StatusOr> config_selector; + { + MutexLock lock(&chand->resolution_mu_); + bool result_ready = + CheckResolutionLocked(elem, initial_metadata, &config_selector); + // If no result is available, queue the call. + if (!result_ready) { + AddCallToResolverQueuedCallsLocked(elem); + return; + } + } + // We have a result. Apply service config to call. + grpc_error_handle error = + ApplyServiceConfigToCallLocked(elem, initial_metadata, config_selector); + // ConfigSelector must be unreffed inside the WorkSerializer. + if (config_selector.ok()) { + chand->work_serializer_->Run( + [config_selector = std::move(*config_selector)]() mutable { + config_selector.reset(); + }, + DEBUG_LOCATION); + } + // Handle errors. if (!error.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: error applying config to call: error=%s", - chand, calld, StatusToString(error).c_str()); + chand, this, StatusToString(error).c_str()); } - calld->PendingBatchesFail(elem, error, YieldCallCombiner); + PendingBatchesFail(elem, error, YieldCallCombiner); return; } - calld->CreateDynamicCall(elem); -} - -void ClientChannel::CallData::CheckResolution(void* arg, - grpc_error_handle error) { - grpc_call_element* elem = static_cast(arg); - CallData* calld = static_cast(elem->call_data); - ClientChannel* chand = static_cast(elem->channel_data); - bool resolution_complete; - { - MutexLock lock(&chand->resolution_mu_); - resolution_complete = calld->CheckResolutionLocked(elem, &error); - } - if (resolution_complete) { - ResolutionDone(elem, error); + // If the call was queued, add trace annotation. + if (was_queued) { + auto* call_tracer = + static_cast(call_context_[GRPC_CONTEXT_CALL_TRACER].value); + if (call_tracer != nullptr) { + call_tracer->RecordAnnotation("Delayed name resolution complete."); + } } + // Create dynamic call. + CreateDynamicCall(elem); } -bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem, - grpc_error_handle* error) { +bool ClientChannel::CallData::CheckResolutionLocked( + grpc_call_element* elem, grpc_metadata_batch* initial_metadata, + absl::StatusOr>* config_selector) { ClientChannel* chand = static_cast(elem->channel_data); - // If we're still in IDLE, we need to start resolving. - if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand, this); - } - // Bounce into the control plane work serializer to start resolving, - // in case we are still in IDLE state. Since we are holding on to the - // resolution mutex here, we offload it on the ExecCtx so that we don't - // deadlock with ourselves. - GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "CheckResolutionLocked"); - ExecCtx::Run( - DEBUG_LOCATION, - GRPC_CLOSURE_CREATE( - [](void* arg, grpc_error_handle /*error*/) { - auto* chand = static_cast(arg); - chand->work_serializer_->Run( - [chand]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) { - chand->CheckConnectivityState(/*try_to_connect=*/true); - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, - "CheckResolutionLocked"); - }, - DEBUG_LOCATION); - }, - chand, nullptr), - absl::OkStatus()); - } - // Get send_initial_metadata batch and flags. - auto& send_initial_metadata = - pending_batches_[0]->payload->send_initial_metadata; - grpc_metadata_batch* initial_metadata_batch = - send_initial_metadata.send_initial_metadata; // If we don't yet have a resolver result, we need to queue the call // until we get one. if (GPR_UNLIKELY(!chand->received_service_config_data_)) { @@ -2324,31 +2227,26 @@ bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem, // first service config, fail any non-wait_for_ready calls. absl::Status resolver_error = chand->resolver_transient_failure_error_; if (!resolver_error.ok() && - !initial_metadata_batch->GetOrCreatePointer(WaitForReady())->value) { + !initial_metadata->GetOrCreatePointer(WaitForReady())->value) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call", chand, this); } - MaybeRemoveCallFromResolverQueuedCallsLocked(elem); - *error = absl_status_to_grpc_error(resolver_error); + *config_selector = absl_status_to_grpc_error(resolver_error); return true; } // Either the resolver has not yet returned a result, or it has // returned transient failure but the call is wait_for_ready. In // either case, queue the call. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: queuing to wait for resolution", - chand, this); + gpr_log(GPR_INFO, "chand=%p calld=%p: no resolver result yet", chand, + this); } - MaybeAddCallToResolverQueuedCallsLocked(elem); return false; } - // Apply service config to call if not yet applied. - if (GPR_LIKELY(!service_config_applied_)) { - service_config_applied_ = true; - *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch); - } - MaybeRemoveCallFromResolverQueuedCallsLocked(elem); + // Result found. + *config_selector = chand->config_selector_; + dynamic_filters_ = chand->dynamic_filters_; return true; } @@ -2591,7 +2489,7 @@ void ClientChannel::LoadBalancedCall::Orphan() { size_t ClientChannel::LoadBalancedCall::GetBatchIndex( grpc_transport_stream_op_batch* batch) { // Note: It is important the send_initial_metadata be the first entry - // here, since the code in PickSubchannelLocked() assumes it will be. + // here, since the code in PickSubchannelImpl() assumes it will be. if (batch->send_initial_metadata) return 0; if (batch->send_message) return 1; if (batch->send_trailing_metadata) return 2; @@ -2817,12 +2715,7 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( // For batches containing a send_initial_metadata op, acquire the // channel's data plane mutex to pick a subchannel. if (GPR_LIKELY(batch->send_initial_metadata)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { - gpr_log(GPR_INFO, - "chand=%p lb_call=%p: grabbing data plane mutex to perform pick", - chand_, this); - } - PickSubchannel(this, absl::OkStatus()); + PickSubchannel(/*was_queued=*/false); } else { // For all other batches, release the call combiner. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { @@ -2998,7 +2891,7 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller { auto* lb_call = self->lb_call_.get(); auto* chand = lb_call->chand_; { - MutexLock lock(&chand->data_plane_mu_); + MutexLock lock(&chand->lb_mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: cancelling queued pick: " @@ -3009,7 +2902,9 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller { if (lb_call->lb_call_canceller_ == self && !error.ok()) { lb_call->call_dispatch_controller_->Commit(); // Remove pick from list of queued picks. - lb_call->MaybeRemoveCallFromLbQueuedCallsLocked(); + lb_call->RemoveCallFromLbQueuedCallsLocked(); + // Remove from queued picks list. + chand->lb_queued_calls_.erase(lb_call); // Fail pending batches on the call. lb_call->PendingBatchesFail(error, YieldCallCombinerIfPendingBatchesFound); @@ -3023,72 +2918,110 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller { grpc_closure closure_; }; -void ClientChannel::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() { - if (!queued_pending_lb_pick_) return; +void ClientChannel::LoadBalancedCall::RemoveCallFromLbQueuedCallsLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list", chand_, this); } - chand_->RemoveLbQueuedCall(&queued_call_, pollent_); - queued_pending_lb_pick_ = false; + // Remove pollset_set linkage. + grpc_polling_entity_del_from_pollset_set(pollent_, + chand_->interested_parties_); // Lame the call combiner canceller. lb_call_canceller_ = nullptr; - // Add trace annotation - if (call_attempt_tracer_ != nullptr) { - call_attempt_tracer_->RecordAnnotation("Delayed LB pick complete."); - } + // Note: There's no need to actually remove the call from the queue + // here, beacuse that will be done in either + // LbQueuedCallCanceller::CancelLocked() or + // in ClientChannel::UpdateStateAndPickerLocked(). } -void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() { - if (queued_pending_lb_pick_) return; +void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list", chand_, this); } - queued_pending_lb_pick_ = true; - queued_call_.lb_call = this; - chand_->AddLbQueuedCall(&queued_call_, pollent_); + // Add call's pollent to channel's interested_parties, so that I/O + // can be done under the call's CQ. + grpc_polling_entity_add_to_pollset_set(pollent_, chand_->interested_parties_); + // Add to queue. + chand_->lb_queued_calls_.insert(this); // Register call combiner cancellation callback. lb_call_canceller_ = new LbQueuedCallCanceller(Ref()); } -void ClientChannel::LoadBalancedCall::AsyncPickDone(grpc_error_handle error) { - // TODO(roth): Does this callback need to hold a ref to LoadBalancedCall? - GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx); - ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); -} - -void ClientChannel::LoadBalancedCall::PickDone(void* arg, - grpc_error_handle error) { - auto* self = static_cast(arg); - if (!error.ok()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { - gpr_log(GPR_INFO, - "chand=%p lb_call=%p: failed to pick subchannel: error=%s", - self->chand_, self, StatusToString(error).c_str()); - } - self->PendingBatchesFail(error, YieldCallCombiner); - return; +void ClientChannel::LoadBalancedCall::PickSubchannel(bool was_queued) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { + gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to perform pick", + chand_, this); } - self->call_dispatch_controller_->Commit(); - self->CreateSubchannelCall(); -} - -void ClientChannel::LoadBalancedCall::PickSubchannel(void* arg, - grpc_error_handle error) { - auto* self = static_cast(arg); - bool pick_complete; + // Grab mutex and take a ref to the picker. + RefCountedPtr picker; { - MutexLock lock(&self->chand_->data_plane_mu_); - pick_complete = self->PickSubchannelLocked(&error); - } - if (pick_complete) { - PickDone(self, error); + MutexLock lock(&chand_->lb_mu_); + picker = chand_->picker_; + } + // We may accumulate multiple pickers here, because if a picker says + // to queue the call, we check again to see if the picker has been + // updated before we queue it. + std::vector> + pickers_to_unref; + while (true) { + // Do pick. + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { + gpr_log(GPR_INFO, "chand=%p lb_call=%p: performing pick with picker=%p", + chand_, this, picker.get()); + } + grpc_error_handle error; + bool pick_complete = PickSubchannelImpl(picker.get(), &error); + if (!pick_complete) { + MutexLock lock(&chand_->lb_mu_); + // If picker has been swapped out since we grabbed it, try again. + if (chand_->picker_ != picker) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: pick not complete, but picker changed", + chand_, this); + } + pickers_to_unref.emplace_back(std::move(picker)); + picker = chand_->picker_; + continue; + } + // Otherwise queue the pick to try again later when we get a new picker. + AddCallToLbQueuedCallsLocked(); + break; + } + // Pick is complete. + // If it was queued, add a trace annotation. + if (was_queued && call_attempt_tracer_ != nullptr) { + call_attempt_tracer_->RecordAnnotation("Delayed LB pick complete."); + } + // If the pick failed, fail the call. + if (!error.ok()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: failed to pick subchannel: error=%s", + chand_, this, StatusToString(error).c_str()); + } + PendingBatchesFail(error, YieldCallCombiner); + break; + } + // Pick succeeded. + call_dispatch_controller_->Commit(); + CreateSubchannelCall(); + break; } + pickers_to_unref.emplace_back(std::move(picker)); + // Unref pickers in WorkSerializer. + chand_->work_serializer_->Run( + [pickers_to_unref = std::move(pickers_to_unref)]() mutable { + for (auto& picker : pickers_to_unref) { + picker.reset(DEBUG_LOCATION, "PickSubchannel"); + } + }, + DEBUG_LOCATION); } -bool ClientChannel::LoadBalancedCall::PickSubchannelLocked( - grpc_error_handle* error) { +bool ClientChannel::LoadBalancedCall::PickSubchannelImpl( + LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) { GPR_ASSERT(connected_subchannel_ == nullptr); GPR_ASSERT(subchannel_call_ == nullptr); // Grab initial metadata. @@ -3103,91 +3036,81 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked( pick_args.call_state = &lb_call_state; Metadata initial_metadata(initial_metadata_batch); pick_args.initial_metadata = &initial_metadata; - auto result = chand_->picker_->Pick(pick_args); + auto result = picker->Pick(pick_args); return HandlePickResult( &result, // CompletePick - [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { - gpr_log(GPR_INFO, - "chand=%p lb_call=%p: LB pick succeeded: subchannel=%p", - chand_, this, complete_pick->subchannel.get()); - } - GPR_ASSERT(complete_pick->subchannel != nullptr); - // Grab a ref to the connected subchannel while we're still - // holding the data plane mutex. - SubchannelWrapper* subchannel = static_cast( - complete_pick->subchannel.get()); - connected_subchannel_ = subchannel->connected_subchannel(); - // If the subchannel has no connected subchannel (e.g., if the - // subchannel has moved out of state READY but the LB policy hasn't - // yet seen that change and given us a new picker), then just - // queue the pick. We'll try again as soon as we get a new picker. - if (connected_subchannel_ == nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { - gpr_log(GPR_INFO, - "chand=%p lb_call=%p: subchannel returned by LB picker " - "has no connected subchannel; queueing pick", - chand_, this); - } - MaybeAddCallToLbQueuedCallsLocked(); - return false; - } - lb_subchannel_call_tracker_ = - std::move(complete_pick->subchannel_call_tracker); - if (lb_subchannel_call_tracker_ != nullptr) { - lb_subchannel_call_tracker_->Start(); - } - MaybeRemoveCallFromLbQueuedCallsLocked(); - return true; - }, + [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: LB pick succeeded: subchannel=%p", + chand_, this, complete_pick->subchannel.get()); + } + GPR_ASSERT(complete_pick->subchannel != nullptr); + // Grab a ref to the connected subchannel while we're still + // holding the data plane mutex. + SubchannelWrapper* subchannel = + static_cast(complete_pick->subchannel.get()); + connected_subchannel_ = subchannel->connected_subchannel(); + // If the subchannel has no connected subchannel (e.g., if the + // subchannel has moved out of state READY but the LB policy hasn't + // yet seen that change and given us a new picker), then just + // queue the pick. We'll try again as soon as we get a new picker. + if (connected_subchannel_ == nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: subchannel returned by LB picker " + "has no connected subchannel; queueing pick", + chand_, this); + } + return false; + } + lb_subchannel_call_tracker_ = + std::move(complete_pick->subchannel_call_tracker); + if (lb_subchannel_call_tracker_ != nullptr) { + lb_subchannel_call_tracker_->Start(); + } + return true; + }, // QueuePick - [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { - gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_, - this); - } - MaybeAddCallToLbQueuedCallsLocked(); - return false; - }, + [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { + gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_, + this); + } + return false; + }, // FailPick [this, initial_metadata_batch, - &error](LoadBalancingPolicy::PickResult::Fail* fail_pick) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { - gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", - chand_, this, fail_pick->status.ToString().c_str()); - } - // If wait_for_ready is false, then the error indicates the RPC - // attempt's final status. - if (!initial_metadata_batch->GetOrCreatePointer(WaitForReady()) - ->value) { - *error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode( - std::move(fail_pick->status), "LB pick")); - MaybeRemoveCallFromLbQueuedCallsLocked(); - return true; - } - // If wait_for_ready is true, then queue to retry when we get a new - // picker. - MaybeAddCallToLbQueuedCallsLocked(); - return false; - }, + &error](LoadBalancingPolicy::PickResult::Fail* fail_pick) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { + gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", chand_, + this, fail_pick->status.ToString().c_str()); + } + // If wait_for_ready is false, then the error indicates the RPC + // attempt's final status. + if (!initial_metadata_batch->GetOrCreatePointer(WaitForReady()) + ->value) { + *error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode( + std::move(fail_pick->status), "LB pick")); + return true; + } + // If wait_for_ready is true, then queue to retry when we get a new + // picker. + return false; + }, // DropPick - [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { - gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", - chand_, this, drop_pick->status.ToString().c_str()); - } - *error = grpc_error_set_int( - absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode( - std::move(drop_pick->status), "LB drop")), - StatusIntProperty::kLbPolicyDrop, 1); - MaybeRemoveCallFromLbQueuedCallsLocked(); - return true; - }); + [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { + gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", chand_, + this, drop_pick->status.ToString().c_str()); + } + *error = grpc_error_set_int( + absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode( + std::move(drop_pick->status), "LB drop")), + StatusIntProperty::kLbPolicyDrop, 1); + return true; + }); } } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 3b107303441..65c53e08759 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -24,11 +24,11 @@ #include #include #include -#include #include #include #include "absl/base/thread_annotations.h" +#include "absl/container/flat_hash_set.h" #include "absl/status/status.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" @@ -222,15 +222,6 @@ class ClientChannel { std::atomic done_{false}; }; - struct ResolverQueuedCall { - grpc_call_element* elem; - ResolverQueuedCall* next = nullptr; - }; - struct LbQueuedCall { - LoadBalancedCall* lb_call; - LbQueuedCall* next = nullptr; - }; - ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error); ~ClientChannel(); @@ -246,6 +237,9 @@ class ClientChannel { // Note: All methods with "Locked" suffix must be invoked from within // work_serializer_. + void ReprocessQueuedResolverCalls() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&resolution_mu_); + void OnResolverResultChangedLocked(Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); void OnResolverErrorLocked(absl::Status status) @@ -284,20 +278,6 @@ class ClientChannel { void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); - // These methods all require holding resolution_mu_. - void AddResolverQueuedCall(ResolverQueuedCall* call, - grpc_polling_entity* pollent) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); - void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove, - grpc_polling_entity* pollent) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); - - // These methods all require holding data_plane_mu_. - void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); - void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); - // // Fields set at construction and never modified. // @@ -316,9 +296,9 @@ class ClientChannel { // Fields related to name resolution. Guarded by resolution_mu_. // mutable Mutex resolution_mu_; - // Linked list of calls queued waiting for resolver result. - ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) = - nullptr; + // List of calls queued waiting for resolver result. + absl::flat_hash_set resolver_queued_calls_ + ABSL_GUARDED_BY(resolution_mu_); // Data from service config. absl::Status resolver_transient_failure_error_ ABSL_GUARDED_BY(resolution_mu_); @@ -330,13 +310,13 @@ class ClientChannel { ABSL_GUARDED_BY(resolution_mu_); // - // Fields used in the data plane. Guarded by data_plane_mu_. + // Fields related to LB picks. Guarded by lb_mu_. // - mutable Mutex data_plane_mu_; + mutable Mutex lb_mu_; RefCountedPtr picker_ - ABSL_GUARDED_BY(data_plane_mu_); - // Linked list of calls queued waiting for LB pick. - LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr; + ABSL_GUARDED_BY(lb_mu_); + absl::flat_hash_set lb_queued_calls_ + ABSL_GUARDED_BY(lb_mu_); // // Fields used in the control plane. Guarded by work_serializer. @@ -360,7 +340,7 @@ class ClientChannel { // The set of SubchannelWrappers that currently exist. // No need to hold a ref, since the map is updated in the control-plane // work_serializer when the SubchannelWrappers are created and destroyed. - std::set subchannel_wrappers_ + absl::flat_hash_set subchannel_wrappers_ ABSL_GUARDED_BY(*work_serializer_); int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1; grpc_error_handle disconnect_error_ ABSL_GUARDED_BY(*work_serializer_); @@ -422,16 +402,11 @@ class ClientChannel::LoadBalancedCall void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); - // Invoked by channel for queued LB picks when the picker is updated. - static void PickSubchannel(void* arg, grpc_error_handle error); - // Helper function for performing an LB pick while holding the data plane - // mutex. Returns true if the pick is complete, in which case the caller - // must invoke PickDone() or AsyncPickDone() with the returned error. - bool PickSubchannelLocked(grpc_error_handle* error) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); - // Schedules a callback to process the completed pick. The callback - // will not run until after this method returns. - void AsyncPickDone(grpc_error_handle error); + void PickSubchannel(bool was_queued); + + // Called by channel when removing a call from the list of queued calls. + void RemoveCallFromLbQueuedCallsLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); RefCountedPtr subchannel_call() const { return subchannel_call_; @@ -479,14 +454,14 @@ class ClientChannel::LoadBalancedCall void RecordCallCompletion(absl::Status status); void CreateSubchannelCall(); - // Invoked when a pick is completed, on both success or failure. - static void PickDone(void* arg, grpc_error_handle error); - // Removes the call from the channel's list of queued picks if present. - void MaybeRemoveCallFromLbQueuedCallsLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); + + // Helper function for performing an LB pick with a specified picker. + // Returns true if the pick is complete. + bool PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker* picker, + grpc_error_handle* error); // Adds the call to the channel's list of queued picks if not already present. - void MaybeAddCallToLbQueuedCallsLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); + void AddCallToLbQueuedCallsLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); ClientChannel* chand_; @@ -513,15 +488,9 @@ class ClientChannel::LoadBalancedCall // Set when we fail inside the LB call. grpc_error_handle failure_error_; - grpc_closure pick_closure_; - - // Accessed while holding ClientChannel::data_plane_mu_. - ClientChannel::LbQueuedCall queued_call_ - ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_); - bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = - false; + // Accessed while holding ClientChannel::lb_mu_. LbQueuedCallCanceller* lb_call_canceller_ - ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr; + ABSL_GUARDED_BY(&ClientChannel::lb_mu_) = nullptr; RefCountedPtr connected_subchannel_; const BackendMetricData* backend_metric_data_ = nullptr; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 2695e53056a..9209ccdbcbe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -61,6 +61,7 @@ #include #include +#include #include #include #include @@ -389,19 +390,15 @@ class GrpcLb : public LoadBalancingPolicy { // Returns the LB token to use for a drop, or null if the call // should not be dropped. // - // Note: This is called from the picker, so it will be invoked in - // the channel's data plane mutex, NOT the control plane - // work_serializer. It should not be accessed by any other part of the LB - // policy. + // Note: This is called from the picker, NOT from inside the control + // plane work_serializer. const char* ShouldDrop(); private: std::vector serverlist_; - // Guarded by the channel's data plane mutex, NOT the control - // plane work_serializer. It should not be accessed by anything but the - // picker via the ShouldDrop() method. - size_t drop_index_ = 0; + // Accessed from the picker, so needs synchronization. + std::atomic drop_index_{0}; }; class Picker : public SubchannelPicker { @@ -717,8 +714,8 @@ bool GrpcLb::Serverlist::ContainsAllDropEntries() const { const char* GrpcLb::Serverlist::ShouldDrop() { if (serverlist_.empty()) return nullptr; - GrpcLbServer& server = serverlist_[drop_index_]; - drop_index_ = (drop_index_ + 1) % serverlist_.size(); + size_t index = drop_index_.fetch_add(1, std::memory_order_relaxed); + GrpcLbServer& server = serverlist_[index % serverlist_.size()]; return server.drop ? server.load_balance_token : nullptr; } diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 354c18f7cd0..6f924b03e49 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -30,7 +29,6 @@ #include #include "absl/base/attributes.h" -#include "absl/base/thread_annotations.h" #include "absl/container/inlined_vector.h" #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -39,11 +37,10 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" -#include - #define XXH_INLINE_ALL #include "xxhash.h" +#include #include #include @@ -55,8 +52,8 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/closure.h" @@ -143,8 +140,6 @@ class RingHash : public LoadBalancingPolicy { void ResetBackoffLocked() override; private: - ~RingHash() override; - // Forward declaration. class RingHashSubchannelList; @@ -165,13 +160,11 @@ class RingHash : public LoadBalancingPolicy { const ServerAddress& address() const { return address_; } - grpc_connectivity_state GetConnectivityState() const { - return connectivity_state_.load(std::memory_order_relaxed); + grpc_connectivity_state logical_connectivity_state() const { + return logical_connectivity_state_; } - - absl::Status GetConnectivityStatus() const { - MutexLock lock(&mu_); - return connectivity_status_; + const absl::Status& logical_connectivity_status() const { + return logical_connectivity_status_; } private: @@ -188,20 +181,28 @@ class RingHash : public LoadBalancingPolicy { // subchannel in some cases; for example, once this is set to // TRANSIENT_FAILURE, we do not change it again until we get READY, // so we skip any interim stops in CONNECTING. - // Uses an atomic so that it can be accessed outside of the WorkSerializer. - std::atomic connectivity_state_{GRPC_CHANNEL_IDLE}; - - mutable Mutex mu_; - absl::Status connectivity_status_ ABSL_GUARDED_BY(&mu_); + grpc_connectivity_state logical_connectivity_state_ = GRPC_CHANNEL_IDLE; + absl::Status logical_connectivity_status_; }; // A list of subchannels and the ring containing those subchannels. class RingHashSubchannelList : public SubchannelList { public: - struct RingEntry { - uint64_t hash; - RingHashSubchannelData* subchannel; + class Ring : public RefCounted { + public: + struct RingEntry { + uint64_t hash; + size_t subchannel_index; + }; + + Ring(RingHashLbConfig* config, RingHashSubchannelList* subchannel_list, + const ChannelArgs& args); + + const std::vector& ring() const { return ring_; } + + private: + std::vector ring_; }; RingHashSubchannelList(RingHash* policy, ServerAddressList addresses, @@ -212,7 +213,7 @@ class RingHash : public LoadBalancingPolicy { p->Unref(DEBUG_LOCATION, "subchannel_list"); } - const std::vector& ring() const { return ring_; } + RefCountedPtr ring() { return ring_; } // Updates the counters of subchannels in each state when a // subchannel transitions from old_state to new_state. @@ -236,7 +237,7 @@ class RingHash : public LoadBalancingPolicy { size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; - std::vector ring_; + RefCountedPtr ring_; // The index of the subchannel currently doing an internally // triggered connection attempt, if any. @@ -251,24 +252,31 @@ class RingHash : public LoadBalancingPolicy { class Picker : public SubchannelPicker { public: - explicit Picker(RefCountedPtr subchannel_list) - : subchannel_list_(std::move(subchannel_list)) {} - - ~Picker() override { - // Hop into WorkSerializer to unref the subchannel list, since that may - // trigger the unreffing of the underlying subchannels. - MakeOrphanable(std::move(subchannel_list_)); + Picker(RefCountedPtr ring_hash_lb, + RingHashSubchannelList* subchannel_list) + : ring_hash_lb_(std::move(ring_hash_lb)), + ring_(subchannel_list->ring()) { + subchannels_.reserve(subchannel_list->num_subchannels()); + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + RingHashSubchannelData* subchannel_data = + subchannel_list->subchannel(i); + subchannels_.emplace_back( + SubchannelInfo{subchannel_data->subchannel()->Ref(), + subchannel_data->logical_connectivity_state(), + subchannel_data->logical_connectivity_status()}); + } } PickResult Pick(PickArgs args) override; private: - // An interface for running a callback in the control plane WorkSerializer. - class WorkSerializerRunner : public Orphanable { + // A fire-and-forget class that schedules subchannel connection attempts + // on the control plane WorkSerializer. + class SubchannelConnectionAttempter : public Orphanable { public: - explicit WorkSerializerRunner( - RefCountedPtr subchannel_list) - : subchannel_list_(std::move(subchannel_list)) { + explicit SubchannelConnectionAttempter( + RefCountedPtr ring_hash_lb) + : ring_hash_lb_(std::move(ring_hash_lb)) { GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr); } @@ -278,56 +286,43 @@ class RingHash : public LoadBalancingPolicy { ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus()); } - // Will be invoked inside of the WorkSerializer. - virtual void Run() {} - - protected: - RingHash* ring_hash_lb() const { - return static_cast(subchannel_list_->policy()); + void AddSubchannel(RefCountedPtr subchannel) { + subchannels_.push_back(std::move(subchannel)); } private: static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) { - auto* self = static_cast(arg); - self->ring_hash_lb()->work_serializer()->Run( + auto* self = static_cast(arg); + self->ring_hash_lb_->work_serializer()->Run( [self]() { - self->Run(); + if (!self->ring_hash_lb_->shutdown_) { + for (auto& subchannel : self->subchannels_) { + subchannel->RequestConnection(); + } + } delete self; }, DEBUG_LOCATION); } - RefCountedPtr subchannel_list_; + RefCountedPtr ring_hash_lb_; grpc_closure closure_; + std::vector> subchannels_; }; - // A fire-and-forget class that schedules subchannel connection attempts - // on the control plane WorkSerializer. - class SubchannelConnectionAttempter : public WorkSerializerRunner { - public: - explicit SubchannelConnectionAttempter( - RefCountedPtr subchannel_list) - : WorkSerializerRunner(std::move(subchannel_list)) {} - - void AddSubchannel(RefCountedPtr subchannel) { - subchannels_.push_back(std::move(subchannel)); - } - - void Run() override { - if (!ring_hash_lb()->shutdown_) { - for (auto& subchannel : subchannels_) { - subchannel->RequestConnection(); - } - } - } - - private: - std::vector> subchannels_; + struct SubchannelInfo { + RefCountedPtr subchannel; + grpc_connectivity_state state; + absl::Status status; }; - RefCountedPtr subchannel_list_; + RefCountedPtr ring_hash_lb_; + RefCountedPtr ring_; + std::vector subchannels_; }; + ~RingHash() override; + void ShutdownLocked() override; // Current config from resolver. @@ -353,7 +348,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { return PickResult::Fail( absl::InternalError("ring hash value is not a number")); } - const auto& ring = subchannel_list_->ring(); + const auto& ring = ring_->ring(); // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c // (ketama_get_server) NOTE: The algorithm depends on using signed integers // for lowp, highp, and first_index. Do not change them! @@ -386,27 +381,25 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { [&](RefCountedPtr subchannel) { if (subchannel_connection_attempter == nullptr) { subchannel_connection_attempter = - MakeOrphanable( - subchannel_list_->Ref(DEBUG_LOCATION, - "SubchannelConnectionAttempter")); + MakeOrphanable(ring_hash_lb_->Ref( + DEBUG_LOCATION, "SubchannelConnectionAttempter")); } subchannel_connection_attempter->AddSubchannel(std::move(subchannel)); }; - switch (ring[first_index].subchannel->GetConnectivityState()) { + SubchannelInfo& first_subchannel = + subchannels_[ring[first_index].subchannel_index]; + switch (first_subchannel.state) { case GRPC_CHANNEL_READY: - return PickResult::Complete( - ring[first_index].subchannel->subchannel()->Ref()); + return PickResult::Complete(first_subchannel.subchannel); case GRPC_CHANNEL_IDLE: - ScheduleSubchannelConnectionAttempt( - ring[first_index].subchannel->subchannel()->Ref()); + ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel); ABSL_FALLTHROUGH_INTENDED; case GRPC_CHANNEL_CONNECTING: return PickResult::Queue(); default: // GRPC_CHANNEL_TRANSIENT_FAILURE break; } - ScheduleSubchannelConnectionAttempt( - ring[first_index].subchannel->subchannel()->Ref()); + ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel); // Loop through remaining subchannels to find one in READY. // On the way, we make sure the right set of connection attempts // will happen. @@ -414,19 +407,17 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { bool found_first_non_failed = false; for (size_t i = 1; i < ring.size(); ++i) { const auto& entry = ring[(first_index + i) % ring.size()]; - if (entry.subchannel == ring[first_index].subchannel) { + if (entry.subchannel_index == ring[first_index].subchannel_index) { continue; } - grpc_connectivity_state connectivity_state = - entry.subchannel->GetConnectivityState(); - if (connectivity_state == GRPC_CHANNEL_READY) { - return PickResult::Complete(entry.subchannel->subchannel()->Ref()); + SubchannelInfo& subchannel_info = subchannels_[entry.subchannel_index]; + if (subchannel_info.state == GRPC_CHANNEL_READY) { + return PickResult::Complete(subchannel_info.subchannel); } if (!found_second_subchannel) { - switch (connectivity_state) { + switch (subchannel_info.state) { case GRPC_CHANNEL_IDLE: - ScheduleSubchannelConnectionAttempt( - entry.subchannel->subchannel()->Ref()); + ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel); ABSL_FALLTHROUGH_INTENDED; case GRPC_CHANNEL_CONNECTING: return PickResult::Queue(); @@ -436,13 +427,11 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { found_second_subchannel = true; } if (!found_first_non_failed) { - if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ScheduleSubchannelConnectionAttempt( - entry.subchannel->subchannel()->Ref()); + if (subchannel_info.state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel); } else { - if (connectivity_state == GRPC_CHANNEL_IDLE) { - ScheduleSubchannelConnectionAttempt( - entry.subchannel->subchannel()->Ref()); + if (subchannel_info.state == GRPC_CHANNEL_IDLE) { + ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel); } found_first_non_failed = true; } @@ -450,27 +439,16 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { } return PickResult::Fail(absl::UnavailableError(absl::StrCat( "ring hash cannot find a connected subchannel; first failure: ", - ring[first_index].subchannel->GetConnectivityStatus().ToString()))); + first_subchannel.status.ToString()))); } // -// RingHash::RingHashSubchannelList +// RingHash::RingHashSubchannelList::Ring // -RingHash::RingHashSubchannelList::RingHashSubchannelList( - RingHash* policy, ServerAddressList addresses, const ChannelArgs& args) - : SubchannelList(policy, - (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) - ? "RingHashSubchannelList" - : nullptr), - std::move(addresses), policy->channel_control_helper(), - args), - num_idle_(num_subchannels()) { - // Need to maintain a ref to the LB policy as long as we maintain - // any references to subchannels, since the subchannels' - // pollset_sets will include the LB policy's pollset_set. - policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); - // Construct the ring. +RingHash::RingHashSubchannelList::Ring::Ring( + RingHashLbConfig* config, RingHashSubchannelList* subchannel_list, + const ChannelArgs& args) { // Store the weights while finding the sum. struct AddressWeight { std::string address; @@ -481,9 +459,9 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList( }; std::vector address_weights; size_t sum = 0; - address_weights.reserve(num_subchannels()); - for (size_t i = 0; i < num_subchannels(); ++i) { - RingHashSubchannelData* sd = subchannel(i); + address_weights.reserve(subchannel_list->num_subchannels()); + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + RingHashSubchannelData* sd = subchannel_list->subchannel(i); const ServerAddressWeightAttribute* weight_attribute = static_cast< const ServerAddressWeightAttribute*>(sd->address().GetAttribute( ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); @@ -517,10 +495,8 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList( // to fit. const size_t ring_size_cap = args.GetInt(GRPC_ARG_RING_HASH_LB_RING_SIZE_CAP) .value_or(kRingSizeCapDefault); - const size_t min_ring_size = - std::min(policy->config_->min_ring_size(), ring_size_cap); - const size_t max_ring_size = - std::min(policy->config_->max_ring_size(), ring_size_cap); + const size_t min_ring_size = std::min(config->min_ring_size(), ring_size_cap); + const size_t max_ring_size = std::min(config->max_ring_size(), ring_size_cap); const double scale = std::min( std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight, static_cast(max_ring_size)); @@ -537,7 +513,7 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList( double target_hashes = 0.0; uint64_t min_hashes_per_host = ring_size; uint64_t max_hashes_per_host = 0; - for (size_t i = 0; i < num_subchannels(); ++i) { + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { const std::string& address_string = address_weights[i].address; hash_key_buffer.assign(address_string.begin(), address_string.end()); hash_key_buffer.emplace_back('_'); @@ -550,7 +526,7 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList( absl::string_view hash_key(hash_key_buffer.data(), hash_key_buffer.size()); const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0); - ring_.push_back({hash, subchannel(i)}); + ring_.push_back({hash, i}); ++count; ++current_hashes; hash_key_buffer.erase(offset_start, hash_key_buffer.end()); @@ -561,14 +537,34 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList( std::max(static_cast(i), max_hashes_per_host); } std::sort(ring_.begin(), ring_.end(), - [](const RingHashSubchannelList::RingEntry& lhs, - const RingHashSubchannelList::RingEntry& rhs) -> bool { + [](const RingEntry& lhs, const RingEntry& rhs) -> bool { return lhs.hash < rhs.hash; }); +} + +// +// RingHash::RingHashSubchannelList +// + +RingHash::RingHashSubchannelList::RingHashSubchannelList( + RingHash* policy, ServerAddressList addresses, const ChannelArgs& args) + : SubchannelList(policy, + (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) + ? "RingHashSubchannelList" + : nullptr), + std::move(addresses), policy->channel_control_helper(), + args), + num_idle_(num_subchannels()) { + // Need to maintain a ref to the LB policy as long as we maintain + // any references to subchannels, since the subchannels' + // pollset_sets will include the LB policy's pollset_set. + policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); + // Construct the ring. + ring_ = MakeRefCounted(policy->config_.get(), this, args); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log(GPR_INFO, "[RH %p] created subchannel list %p with %" PRIuPTR " ring entries", - policy, this, ring_.size()); + policy, this, ring_->ring().size()); } } @@ -660,7 +656,7 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked( // Note that we use our own picker regardless of connectivity state. p->channel_control_helper()->UpdateState( state, status, - MakeRefCounted(Ref(DEBUG_LOCATION, "RingHashPicker"))); + MakeRefCounted(p->Ref(DEBUG_LOCATION, "RingHashPicker"), this)); // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will // not be getting any pick requests from the priority policy. // However, because the ring_hash policy does not attempt to @@ -707,7 +703,6 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked( absl::optional old_state, grpc_connectivity_state new_state) { RingHash* p = static_cast(subchannel_list()->policy()); - grpc_connectivity_state last_connectivity_state = GetConnectivityState(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log( GPR_INFO, @@ -715,7 +710,7 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked( "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", p, subchannel(), subchannel_list(), Index(), subchannel_list()->num_subchannels(), - ConnectivityStateName(last_connectivity_state), + ConnectivityStateName(logical_connectivity_state_), ConnectivityStateName(new_state)); } GPR_ASSERT(subchannel() != nullptr); @@ -735,34 +730,23 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked( const bool connection_attempt_complete = new_state != GRPC_CHANNEL_CONNECTING; // Decide what state to report for the purposes of aggregation and // picker behavior. - // If the last recorded state was TRANSIENT_FAILURE, ignore the update - // unless the new state is READY. - bool update_status = true; - absl::Status status = connectivity_status(); - if (last_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE && - new_state != GRPC_CHANNEL_READY && - new_state != GRPC_CHANNEL_TRANSIENT_FAILURE) { - new_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - { - MutexLock lock(&mu_); - status = connectivity_status_; - } - update_status = false; - } - // Update state counters used for aggregation. - subchannel_list()->UpdateStateCountersLocked(last_connectivity_state, - new_state); - // Update status seen by picker if needed. - if (update_status) { - MutexLock lock(&mu_); - connectivity_status_ = connectivity_status(); + // If the last recorded state was TRANSIENT_FAILURE, ignore the change + // unless the new state is READY (or TF again, in which case we need + // to update the status). + if (logical_connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE || + new_state == GRPC_CHANNEL_READY || + new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Update state counters used for aggregation. + subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_, + new_state); + // Update logical state. + logical_connectivity_state_ = new_state; + logical_connectivity_status_ = connectivity_status(); } - // Update last seen state, also used by picker. - connectivity_state_.store(new_state, std::memory_order_relaxed); // Update the RH policy's connectivity state, creating new picker and new // ring. subchannel_list()->UpdateRingHashConnectivityStateLocked( - Index(), connection_attempt_complete, status); + Index(), connection_attempt_complete, logical_connectivity_status_); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index cc3fcf8d6c3..d72e456c0e5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -365,7 +365,6 @@ class RlsLb : public LoadBalancingPolicy { class Picker : public LoadBalancingPolicy::SubchannelPicker { public: explicit Picker(RefCountedPtr lb_policy); - ~Picker() override; PickResult Pick(PickArgs args) override; @@ -1008,19 +1007,6 @@ RlsLb::Picker::Picker(RefCountedPtr lb_policy) } } -RlsLb::Picker::~Picker() { - // It's not safe to unref the default child policy in the picker, - // since that needs to be done in the WorkSerializer. - if (default_child_policy_ != nullptr) { - auto* default_child_policy = default_child_policy_.release(); - lb_policy_->work_serializer()->Run( - [default_child_policy]() { - default_child_policy->Unref(DEBUG_LOCATION, "Picker"); - }, - DEBUG_LOCATION); - } -} - LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) { // Construct key for request. RequestKey key = {BuildKeyMap(config_->key_builder_map(), args.path, diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 634729b80db..2924c3726ae 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -18,13 +18,16 @@ #include #include +#include #include +#include #include #include #include #include +#include "absl/random/random.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" @@ -174,7 +177,7 @@ class RoundRobin : public LoadBalancingPolicy { // Using pointer value only, no ref held -- do not dereference! RoundRobin* parent_; - size_t last_picked_index_; + std::atomic last_picked_index_; std::vector> subchannels_; }; @@ -189,6 +192,8 @@ class RoundRobin : public LoadBalancingPolicy { RefCountedPtr latest_pending_subchannel_list_; bool shutdown_ = false; + + absl::BitGen bit_gen_; }; // @@ -207,27 +212,26 @@ RoundRobin::Picker::Picker(RoundRobin* parent, } // For discussion on why we generate a random starting index for // the picker, see https://github.com/grpc/grpc-go/issues/2580. - // TODO(roth): rand(3) is not thread-safe. This should be replaced with - // something better as part of https://github.com/grpc/grpc/issues/17891. - last_picked_index_ = rand() % subchannels_.size(); + size_t index = + absl::Uniform(parent->bit_gen_, 0, subchannels_.size()); + last_picked_index_.store(index, std::memory_order_relaxed); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p picker %p] created picker from subchannel_list=%p " "with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR, - parent_, this, subchannel_list, subchannels_.size(), - last_picked_index_); + parent_, this, subchannel_list, subchannels_.size(), index); } } RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs /*args*/) { - last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size(); + size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) % + subchannels_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p", - parent_, this, last_picked_index_, - subchannels_[last_picked_index_].get()); + parent_, this, index, subchannels_[index].get()); } - return PickResult::Complete(subchannels_[last_picked_index_]); + return PickResult::Complete(subchannels_[index]); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index 823ef564757..6a88cd5c8aa 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -26,6 +26,7 @@ #include #include +#include "absl/base/thread_annotations.h" #include "absl/random/random.h" #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -46,6 +47,7 @@ #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/work_serializer.h" @@ -138,7 +140,11 @@ class WeightedTargetLb : public LoadBalancingPolicy { private: PickerList pickers_; - absl::BitGen bit_gen_; + + // TODO(roth): Consider using a separate thread-local BitGen for each CPU + // to avoid the need for this mutex. + Mutex mu_; + absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_); }; // Each WeightedChild holds a ref to its parent WeightedTargetLb. @@ -247,8 +253,10 @@ class WeightedTargetLb : public LoadBalancingPolicy { WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick( PickArgs args) { // Generate a random number in [0, total weight). - const uint64_t key = - absl::Uniform(bit_gen_, 0, pickers_.back().first); + const uint64_t key = [&]() { + MutexLock lock(&mu_); + return absl::Uniform(bit_gen_, 0, pickers_.back().first); + }(); // Find the index in pickers_ corresponding to key. size_t mid = 0; size_t start_index = 0; diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index 8b74e26b636..f2607dcb105 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -1351,8 +1351,9 @@ RetryFilter::CallData::CallAttempt::BatchData::~BatchData() { this); } CallAttempt* call_attempt = std::exchange(call_attempt_, nullptr); - GRPC_CALL_STACK_UNREF(call_attempt->calld_->owning_call_, "Retry BatchData"); + grpc_call_stack* owning_call = call_attempt->calld_->owning_call_; call_attempt->Unref(DEBUG_LOCATION, "~BatchData"); + GRPC_CALL_STACK_UNREF(owning_call, "Retry BatchData"); } void RetryFilter::CallData::CallAttempt::BatchData:: diff --git a/src/core/ext/xds/xds_endpoint.cc b/src/core/ext/xds/xds_endpoint.cc index 656c82b8eac..2273d642387 100644 --- a/src/core/ext/xds/xds_endpoint.cc +++ b/src/core/ext/xds/xds_endpoint.cc @@ -93,11 +93,14 @@ std::string XdsEndpointResource::Priority::ToString() const { } bool XdsEndpointResource::DropConfig::ShouldDrop( - const std::string** category_name) const { + const std::string** category_name) { for (size_t i = 0; i < drop_category_list_.size(); ++i) { const auto& drop_category = drop_category_list_[i]; // Generate a random number in [0, 1000000). - const uint32_t random = static_cast(rand()) % 1000000; + const uint32_t random = [&]() { + MutexLock lock(&mu_); + return absl::Uniform(bit_gen_, 0, 1000000); + }(); if (random < drop_category.parts_per_million) { *category_name = &drop_category.name; return true; diff --git a/src/core/ext/xds/xds_endpoint.h b/src/core/ext/xds/xds_endpoint.h index ba25d3eacee..21f502c213f 100644 --- a/src/core/ext/xds/xds_endpoint.h +++ b/src/core/ext/xds/xds_endpoint.h @@ -28,6 +28,8 @@ #include #include +#include "absl/base/thread_annotations.h" +#include "absl/random/random.h" #include "absl/strings/string_view.h" #include "envoy/config/endpoint/v3/endpoint.upbdefs.h" #include "upb/def.h" @@ -38,6 +40,7 @@ #include "src/core/ext/xds/xds_resource_type_impl.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/resolver/server_address.h" namespace grpc_core { @@ -90,7 +93,7 @@ struct XdsEndpointResource : public XdsResourceType::ResourceData { // The only method invoked from outside the WorkSerializer (used in // the data plane). - bool ShouldDrop(const std::string** category_name) const; + bool ShouldDrop(const std::string** category_name); const DropCategoryList& drop_category_list() const { return drop_category_list_; @@ -108,6 +111,11 @@ struct XdsEndpointResource : public XdsResourceType::ResourceData { private: DropCategoryList drop_category_list_; bool drop_all_ = false; + + // TODO(roth): Consider using a separate thread-local BitGen for each CPU + // to avoid the need for this mutex. + Mutex mu_; + absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_); }; PriorityList priorities; diff --git a/src/core/lib/load_balancing/lb_policy.cc b/src/core/lib/load_balancing/lb_policy.cc index 229a201544d..018f7069aac 100644 --- a/src/core/lib/load_balancing/lb_policy.cc +++ b/src/core/lib/load_balancing/lb_policy.cc @@ -69,19 +69,15 @@ LoadBalancingPolicy::SubchannelPicker::SubchannelPicker() LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( PickArgs /*args*/) { // We invoke the parent's ExitIdleLocked() via a closure instead - // of doing it directly here, for two reasons: - // 1. ExitIdleLocked() may cause the policy's state to change and - // a new picker to be delivered to the channel. If that new - // picker is delivered before ExitIdleLocked() returns, then by - // the time this function returns, the pick will already have - // been processed, and we'll be trying to re-process the same - // pick again, leading to a crash. - // 2. We are currently running in the data plane mutex, but we - // need to bounce into the control plane work_serializer to call - // ExitIdleLocked(). - if (!exit_idle_called_ && parent_ != nullptr) { - exit_idle_called_ = true; - auto* parent = parent_->Ref().release(); // ref held by lambda. + // of doing it directly here because ExitIdleLocked() may cause the + // policy's state to change and a new picker to be delivered to the + // channel. If that new picker is delivered before ExitIdleLocked() + // returns, then by the time this function returns, the pick will already + // have been processed, and we'll be trying to re-process the same pick + // again, leading to a crash. + MutexLock lock(&mu_); + if (parent_ != nullptr) { + auto* parent = parent_.release(); // ref held by lambda. ExecCtx::Run(DEBUG_LOCATION, GRPC_CLOSURE_CREATE( [](void* arg, grpc_error_handle /*error*/) { diff --git a/src/core/lib/load_balancing/lb_policy.h b/src/core/lib/load_balancing/lb_policy.h index 5d972182eb9..090a301fd73 100644 --- a/src/core/lib/load_balancing/lb_policy.h +++ b/src/core/lib/load_balancing/lb_policy.h @@ -27,6 +27,7 @@ #include #include +#include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" @@ -44,6 +45,7 @@ #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/load_balancing/subchannel_interface.h" @@ -392,8 +394,8 @@ class LoadBalancingPolicy : public InternallyRefCounted { PickResult Pick(PickArgs args) override; private: - RefCountedPtr parent_; - bool exit_idle_called_ = false; + Mutex mu_; + RefCountedPtr parent_ ABSL_GUARDED_BY(&mu_); }; // A picker that returns PickResult::Fail for all picks.