client channel: don't hold mutexes while calling the ConfigSelector or the LB picker (#31973)

* WIP

* use the same lock for queue and picker -- still need to figure out how to drain queue safely

* working!

* always unref pickers in WorkSerializer

* simplify tracking of queued calls

* fix sanity

* simplify resolver queueing code

* fix tsan failure

* simplify queued LB pick reprocessing

* minor cleanups

* remove unnecessary wrap-around checks

* clang-format

* generate_projects

* fix pollset bug

* use absl::flat_hash_set<> instead of std::set<>

* fix use-after-free in retry code

* add missing BUILD dep
pull/32223/head
Mark D. Roth 2 years ago committed by GitHub
parent 1d8fac36b3
commit e699e0135e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 5
      src/core/BUILD
  3. 557
      src/core/ext/filters/client_channel/client_channel.cc
  4. 85
      src/core/ext/filters/client_channel/client_channel.h
  5. 17
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  6. 55
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  7. 14
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  8. 24
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  9. 14
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  10. 3
      src/core/ext/filters/client_channel/retry_filter.cc
  11. 7
      src/core/ext/xds/xds_endpoint.cc
  12. 10
      src/core/ext/xds/xds_endpoint.h
  13. 22
      src/core/lib/load_balancing/lb_policy.cc
  14. 6
      src/core/lib/load_balancing/lb_policy.h

@ -2768,6 +2768,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_set",
"absl/container:inlined_vector",
"absl/status",
"absl/status:statusor",

@ -2470,6 +2470,7 @@ grpc_cc_library(
srcs = ["lib/load_balancing/lb_policy.cc"],
hdrs = ["lib/load_balancing/lb_policy.h"],
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -2489,6 +2490,7 @@ grpc_cc_library(
"//:debug_location",
"//:event_engine_base_hdrs",
"//:exec_ctx",
"//:gpr",
"//:gpr_platform",
"//:grpc_trace",
"//:orphanable",
@ -3812,6 +3814,7 @@ grpc_cc_library(
"absl/base:core_headers",
"absl/functional:bind_front",
"absl/memory",
"absl/random",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -4409,6 +4412,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/round_robin/round_robin.cc",
],
external_deps = [
"absl/random",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -4595,6 +4599,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc",
],
external_deps = [
"absl/base:core_headers",
"absl/random",
"absl/status",
"absl/status:statusor",

@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <new>
#include <set>
#include <type_traits>
#include <vector>
#include "absl/status/status.h"
@ -115,19 +115,12 @@ class ClientChannel::CallData {
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
// Invoked by channel for queued calls when name resolution is completed.
static void CheckResolution(void* arg, grpc_error_handle error);
// Helper function for applying the service config to a call while
// holding ClientChannel::resolution_mu_.
// Returns true if the service config has been applied to the call, in which
// case the caller must invoke ResolutionDone() or AsyncResolutionDone()
// with the returned error.
bool CheckResolutionLocked(grpc_call_element* elem, grpc_error_handle* error)
void CheckResolution(grpc_call_element* elem, bool was_queued);
// Removes the call from the channel's list of calls queued
// for name resolution.
void RemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
// Schedules a callback to continue processing the call once
// resolution is complete. The callback will not run until after this
// method returns.
void AsyncResolutionDone(grpc_call_element* elem, grpc_error_handle error);
private:
class ResolverQueuedCallCanceller;
@ -166,23 +159,22 @@ class ClientChannel::CallData {
// Resumes all pending batches on lb_call_.
void PendingBatchesResume(grpc_call_element* elem);
// Helper function for CheckResolution(). Returns true if the call
// can continue (i.e., there is a valid resolution result, or there is
// an invalid resolution result but the call is not wait_for_ready).
bool CheckResolutionLocked(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata,
absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
// If an error is returned, the error indicates the status with which
// the call should be failed.
grpc_error_handle ApplyServiceConfigToCallLocked(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
// Invoked when the resolver result is applied to the caller, on both
// success or failure.
static void ResolutionDone(void* arg, grpc_error_handle error);
// Removes the call (if present) from the channel's list of calls queued
// for name resolution.
void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
// Adds the call (if not already present) to the channel's list of
// calls queued for name resolution.
void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem)
grpc_call_element* elem, grpc_metadata_batch* initial_metadata,
const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector);
// Adds the call to the channel's list of calls queued for name resolution.
void AddCallToResolverQueuedCallsLocked(grpc_call_element* elem)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
@ -204,15 +196,7 @@ class ClientChannel::CallData {
grpc_polling_entity* pollent_ = nullptr;
grpc_closure resolution_done_closure_;
// Accessed while holding ClientChannel::resolution_mu_.
bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) =
false;
bool queued_pending_resolver_result_
ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = false;
ClientChannel::ResolverQueuedCall resolver_queued_call_
ABSL_GUARDED_BY(&ClientChannel::resolution_mu_);
ResolverQueuedCallCanceller* resolver_call_canceller_
ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = nullptr;
@ -520,20 +504,18 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
}
private:
// Subchannel and SubchannelInterface have different interfaces for
// their respective ConnectivityStateWatcherInterface classes.
// The one in Subchannel updates the ConnectedSubchannel along with
// the state, whereas the one in SubchannelInterface does not expose
// the ConnectedSubchannel.
//
// This wrapper provides a bridge between the two. It implements
// Subchannel::ConnectivityStateWatcherInterface and wraps
// This wrapper provides a bridge between the internal Subchannel API
// and the SubchannelInterface API that we expose to LB policies.
// It implements Subchannel::ConnectivityStateWatcherInterface and wraps
// the instance of SubchannelInterface::ConnectivityStateWatcherInterface
// that was passed in by the LB policy. We pass an instance of this
// class to the underlying Subchannel, and when we get updates from
// the subchannel, we pass those on to the wrapped watcher to return
// the update to the LB policy. This allows us to set the connected
// subchannel before passing the result back to the LB policy.
// the update to the LB policy.
//
// This class handles things like hopping into the WorkSerializer
// before passing notifications to the LB policy and propagating
// keepalive information betwen subchannels.
class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
public:
WatcherWrapper(
@ -571,16 +553,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
}
grpc_pollset_set* interested_parties() override {
SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
watcher_.get();
if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
return watcher->interested_parties();
}
WatcherWrapper* MakeReplacement() {
auto* replacement = new WatcherWrapper(std::move(watcher_), parent_);
replacement_ = replacement;
return replacement;
return watcher_->interested_parties();
}
private:
@ -638,7 +611,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
RefCountedPtr<SubchannelWrapper> parent_;
WatcherWrapper* replacement_ = nullptr;
};
ClientChannel* chand_;
@ -1099,6 +1071,20 @@ ChannelArgs ClientChannel::MakeSubchannelArgs(
.Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE);
}
void ClientChannel::ReprocessQueuedResolverCalls() {
for (grpc_call_element* elem : resolver_queued_calls_) {
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->RemoveCallFromResolverQueuedCallsLocked(elem);
owning_stack_->EventEngine()->Run([elem]() {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->CheckResolution(elem, /*was_queued=*/true);
});
}
resolver_queued_calls_.clear();
}
namespace {
RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
@ -1311,16 +1297,7 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) {
// Update resolver transient failure.
resolver_transient_failure_error_ =
MaybeRewriteIllegalStatusCode(status, "resolver");
// Process calls that were queued waiting for the resolver result.
for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
call = call->next) {
grpc_call_element* elem = call->elem;
CallData* calld = static_cast<CallData*>(elem->call_data);
grpc_error_handle error;
if (calld->CheckResolutionLocked(elem, &error)) {
calld->AsyncResolutionDone(elem, error);
}
}
ReprocessQueuedResolverCalls();
}
// Update connectivity state.
UpdateStateAndPickerLocked(
@ -1379,30 +1356,6 @@ OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
return lb_policy;
}
void ClientChannel::AddResolverQueuedCall(ResolverQueuedCall* call,
grpc_polling_entity* pollent) {
// Add call to queued calls list.
call->next = resolver_queued_calls_;
resolver_queued_calls_ = call;
// Add call's pollent to channel's interested_parties, so that I/O
// can be done under the call's CQ.
grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
}
void ClientChannel::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
grpc_polling_entity* pollent) {
// Remove call's pollent from channel's interested_parties.
grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
// Remove from queued calls list.
for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr;
call = &(*call)->next) {
if (*call == to_remove) {
*call = to_remove->next;
return;
}
}
}
void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr<ServiceConfig> service_config,
RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
@ -1469,25 +1422,8 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
service_config_.swap(service_config);
config_selector_.swap(config_selector);
dynamic_filters_.swap(dynamic_filters);
// Process calls that were queued waiting for the resolver result.
for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
call = call->next) {
// If there are a lot of queued calls here, resuming them all may cause us
// to stay inside C-core for a long period of time. All of that work would
// be done using the same ExecCtx instance and therefore the same cached
// value of "now". The longer it takes to finish all of this work and exit
// from C-core, the more stale the cached value of "now" may become. This
// can cause problems whereby (e.g.) we calculate a timer deadline based
// on the stale value, which results in the timer firing too early. To
// avoid this, we invalidate the cached value for each call we process.
ExecCtx::Get()->InvalidateNow();
grpc_call_element* elem = call->elem;
CallData* calld = static_cast<CallData*>(elem->call_data);
grpc_error_handle error;
if (calld->CheckResolutionLocked(elem, &error)) {
calld->AsyncResolutionDone(elem, error);
}
}
// Re-process queued calls asynchronously.
ReprocessQueuedResolverCalls();
}
// Old values will be unreffed after lock is released when they go out
// of scope.
@ -1563,29 +1499,22 @@ void ClientChannel::UpdateStateAndPickerLocked(
channelz::ChannelNode::GetChannelConnectivityStateChangeString(
state)));
}
// Grab data plane lock to update the picker.
// Grab the LB lock to update the picker and trigger reprocessing of the
// queued picks.
// Old picker will be unreffed after releasing the lock.
{
MutexLock lock(&data_plane_mu_);
// Swap out the picker.
// Note: Original value will be destroyed after the lock is released.
MutexLock lock(&lb_mu_);
picker_.swap(picker);
// Re-process queued picks.
for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
call = call->next) {
// If there are a lot of queued calls here, resuming them all may cause us
// to stay inside C-core for a long period of time. All of that work would
// be done using the same ExecCtx instance and therefore the same cached
// value of "now". The longer it takes to finish all of this work and exit
// from C-core, the more stale the cached value of "now" may become. This
// can cause problems whereby (e.g.) we calculate a timer deadline based
// on the stale value, which results in the timer firing too early. To
// avoid this, we invalidate the cached value for each call we process.
ExecCtx::Get()->InvalidateNow();
grpc_error_handle error;
if (call->lb_call->PickSubchannelLocked(&error)) {
call->lb_call->AsyncPickDone(error);
}
// Reprocess queued picks asynchronously.
for (LoadBalancedCall* call : lb_queued_calls_) {
call->RemoveCallFromLbQueuedCallsLocked();
owning_stack_->EventEngine()->Run([call]() {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
call->PickSubchannel(/*was_queued=*/true);
});
}
lb_queued_calls_.clear();
}
}
@ -1629,7 +1558,7 @@ grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) {
}
LoadBalancingPolicy::PickResult result;
{
MutexLock lock(&data_plane_mu_);
MutexLock lock(&lb_mu_);
result = picker_->Pick(LoadBalancingPolicy::PickArgs());
}
return HandlePickResult<grpc_error_handle>(
@ -1749,30 +1678,6 @@ void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
}
}
void ClientChannel::AddLbQueuedCall(LbQueuedCall* call,
grpc_polling_entity* pollent) {
// Add call to queued picks list.
call->next = lb_queued_calls_;
lb_queued_calls_ = call;
// Add call's pollent to channel's interested_parties, so that I/O
// can be done under the call's CQ.
grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
}
void ClientChannel::RemoveLbQueuedCall(LbQueuedCall* to_remove,
grpc_polling_entity* pollent) {
// Remove call's pollent from channel's interested_parties.
grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
// Remove from queued picks list.
for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr;
call = &(*call)->next) {
if (*call == to_remove) {
*call = to_remove->next;
return;
}
}
}
void ClientChannel::TryToConnectLocked() {
if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked();
@ -1939,7 +1844,23 @@ void ClientChannel::CallData::StartTransportStreamOpBatch(
"config",
chand, calld);
}
CheckResolution(elem, absl::OkStatus());
// If we're still in IDLE, we need to start resolving.
if (GPR_UNLIKELY(chand->CheckConnectivityState(false) ==
GRPC_CHANNEL_IDLE)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand,
calld);
}
// Bounce into the control plane work serializer to start resolving.
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ExitIdle");
chand->work_serializer_->Run(
[chand]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
chand->CheckConnectivityState(/*try_to_connect=*/true);
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
},
DEBUG_LOCATION);
}
calld->CheckResolution(elem, /*was_queued=*/false);
} else {
// For all other batches, release the call combiner.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
@ -1965,8 +1886,7 @@ void ClientChannel::CallData::SetPollent(grpc_call_element* elem,
size_t ClientChannel::CallData::GetBatchIndex(
grpc_transport_stream_op_batch* batch) {
// Note: It is important the send_initial_metadata be the first entry
// here, since the code in ApplyServiceConfigToCallLocked() and
// CheckResolutionLocked() assumes it will be.
// here, since the code in CheckResolution() assumes it will be.
if (batch->send_initial_metadata) return 0;
if (batch->send_message) return 1;
if (batch->send_trailing_metadata) return 2;
@ -2110,7 +2030,8 @@ class ClientChannel::CallData::ResolverQueuedCallCanceller {
}
if (calld->resolver_call_canceller_ == self && !error.ok()) {
// Remove pick from list of queued picks.
calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_);
calld->RemoveCallFromResolverQueuedCallsLocked(self->elem_);
chand->resolver_queued_calls_.erase(self->elem_);
// Fail pending batches on the call.
calld->PendingBatchesFail(self->elem_, error,
YieldCallCombinerIfPendingBatchesFound);
@ -2124,57 +2045,56 @@ class ClientChannel::CallData::ResolverQueuedCallCanceller {
grpc_closure closure_;
};
void ClientChannel::CallData::MaybeRemoveCallFromResolverQueuedCallsLocked(
void ClientChannel::CallData::RemoveCallFromResolverQueuedCallsLocked(
grpc_call_element* elem) {
if (!queued_pending_resolver_result_) return;
auto* chand = static_cast<ClientChannel*>(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: removing from resolver queued picks list",
chand, this);
}
chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_);
queued_pending_resolver_result_ = false;
// Remove call's pollent from channel's interested_parties.
grpc_polling_entity_del_from_pollset_set(pollent_,
chand->interested_parties_);
// Lame the call combiner canceller.
resolver_call_canceller_ = nullptr;
// Add trace annotation
auto* call_tracer =
static_cast<CallTracer*>(call_context_[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer != nullptr) {
call_tracer->RecordAnnotation("Delayed name resolution complete.");
}
// Note: There's no need to actually remove the call from the queue
// here, beacuse that will be done in
// ResolverQueuedCallCanceller::CancelLocked() or
// ClientChannel::ReprocessQueuedResolverCalls().
}
void ClientChannel::CallData::MaybeAddCallToResolverQueuedCallsLocked(
void ClientChannel::CallData::AddCallToResolverQueuedCallsLocked(
grpc_call_element* elem) {
if (queued_pending_resolver_result_) return;
auto* chand = static_cast<ClientChannel*>(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
chand, this);
}
queued_pending_resolver_result_ = true;
resolver_queued_call_.elem = elem;
chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_);
// Add call's pollent to channel's interested_parties, so that I/O
// can be done under the call's CQ.
grpc_polling_entity_add_to_pollset_set(pollent_, chand->interested_parties_);
// Add to queue.
chand->resolver_queued_calls_.insert(elem);
// Register call combiner cancellation callback.
resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem);
}
grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
grpc_call_element* elem, grpc_metadata_batch* initial_metadata,
const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) {
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
chand, this);
}
ConfigSelector* config_selector = chand->config_selector_.get();
if (config_selector != nullptr) {
if (!config_selector.ok()) return config_selector.status();
// Use the ConfigSelector to determine the config for the call.
auto call_config =
config_selector->GetCallConfig({&path_, initial_metadata, arena()});
(*config_selector)->GetCallConfig({&path_, initial_metadata, arena()});
if (!call_config.ok()) {
return absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
call_config.status(), "ConfigSelector"));
return absl_status_to_grpc_error(
MaybeRewriteIllegalStatusCode(call_config.status(), "ConfigSelector"));
}
// Create a ClientChannelServiceConfigCallData for the call. This stores
// a ref to the ServiceConfig and caches the right set of parsed configs
@ -2205,18 +2125,12 @@ grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
}
// If the service config set wait_for_ready and the application
// did not explicitly set it, use the value from the service config.
auto* wait_for_ready =
pending_batches_[0]
->payload->send_initial_metadata.send_initial_metadata
->GetOrCreatePointer(WaitForReady());
auto* wait_for_ready = initial_metadata->GetOrCreatePointer(WaitForReady());
if (method_params->wait_for_ready().has_value() &&
!wait_for_ready->explicitly_set) {
wait_for_ready->value = method_params->wait_for_ready().value();
}
}
// Set the dynamic filter stack.
dynamic_filters_ = chand->dynamic_filters_;
}
return absl::OkStatus();
}
@ -2244,80 +2158,60 @@ void ClientChannel::CallData::
error);
}
void ClientChannel::CallData::AsyncResolutionDone(grpc_call_element* elem,
grpc_error_handle error) {
// TODO(roth): Does this callback need to hold a ref to the call stack?
GRPC_CLOSURE_INIT(&resolution_done_closure_, ResolutionDone, elem, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &resolution_done_closure_, error);
}
void ClientChannel::CallData::ResolutionDone(void* arg,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
void ClientChannel::CallData::CheckResolution(grpc_call_element* elem,
bool was_queued) {
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
CallData* calld = static_cast<CallData*>(elem->call_data);
grpc_metadata_batch* initial_metadata =
pending_batches_[0]->payload->send_initial_metadata.send_initial_metadata;
// Check if we have a resolver result to use.
absl::StatusOr<RefCountedPtr<ConfigSelector>> config_selector;
{
MutexLock lock(&chand->resolution_mu_);
bool result_ready =
CheckResolutionLocked(elem, initial_metadata, &config_selector);
// If no result is available, queue the call.
if (!result_ready) {
AddCallToResolverQueuedCallsLocked(elem);
return;
}
}
// We have a result. Apply service config to call.
grpc_error_handle error =
ApplyServiceConfigToCallLocked(elem, initial_metadata, config_selector);
// ConfigSelector must be unreffed inside the WorkSerializer.
if (config_selector.ok()) {
chand->work_serializer_->Run(
[config_selector = std::move(*config_selector)]() mutable {
config_selector.reset();
},
DEBUG_LOCATION);
}
// Handle errors.
if (!error.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: error applying config to call: error=%s",
chand, calld, StatusToString(error).c_str());
chand, this, StatusToString(error).c_str());
}
calld->PendingBatchesFail(elem, error, YieldCallCombiner);
PendingBatchesFail(elem, error, YieldCallCombiner);
return;
}
calld->CreateDynamicCall(elem);
}
void ClientChannel::CallData::CheckResolution(void* arg,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
CallData* calld = static_cast<CallData*>(elem->call_data);
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
bool resolution_complete;
{
MutexLock lock(&chand->resolution_mu_);
resolution_complete = calld->CheckResolutionLocked(elem, &error);
// If the call was queued, add trace annotation.
if (was_queued) {
auto* call_tracer =
static_cast<CallTracer*>(call_context_[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer != nullptr) {
call_tracer->RecordAnnotation("Delayed name resolution complete.");
}
if (resolution_complete) {
ResolutionDone(elem, error);
}
// Create dynamic call.
CreateDynamicCall(elem);
}
bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem,
grpc_error_handle* error) {
bool ClientChannel::CallData::CheckResolutionLocked(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata,
absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) {
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
// If we're still in IDLE, we need to start resolving.
if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand, this);
}
// Bounce into the control plane work serializer to start resolving,
// in case we are still in IDLE state. Since we are holding on to the
// resolution mutex here, we offload it on the ExecCtx so that we don't
// deadlock with ourselves.
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "CheckResolutionLocked");
ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle /*error*/) {
auto* chand = static_cast<ClientChannel*>(arg);
chand->work_serializer_->Run(
[chand]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
chand->CheckConnectivityState(/*try_to_connect=*/true);
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_,
"CheckResolutionLocked");
},
DEBUG_LOCATION);
},
chand, nullptr),
absl::OkStatus());
}
// Get send_initial_metadata batch and flags.
auto& send_initial_metadata =
pending_batches_[0]->payload->send_initial_metadata;
grpc_metadata_batch* initial_metadata_batch =
send_initial_metadata.send_initial_metadata;
// If we don't yet have a resolver result, we need to queue the call
// until we get one.
if (GPR_UNLIKELY(!chand->received_service_config_data_)) {
@ -2325,31 +2219,26 @@ bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem,
// first service config, fail any non-wait_for_ready calls.
absl::Status resolver_error = chand->resolver_transient_failure_error_;
if (!resolver_error.ok() &&
!initial_metadata_batch->GetOrCreatePointer(WaitForReady())->value) {
!initial_metadata->GetOrCreatePointer(WaitForReady())->value) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call",
chand, this);
}
MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
*error = absl_status_to_grpc_error(resolver_error);
*config_selector = absl_status_to_grpc_error(resolver_error);
return true;
}
// Either the resolver has not yet returned a result, or it has
// returned transient failure but the call is wait_for_ready. In
// either case, queue the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: queuing to wait for resolution",
chand, this);
gpr_log(GPR_INFO, "chand=%p calld=%p: no resolver result yet", chand,
this);
}
MaybeAddCallToResolverQueuedCallsLocked(elem);
return false;
}
// Apply service config to call if not yet applied.
if (GPR_LIKELY(!service_config_applied_)) {
service_config_applied_ = true;
*error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
}
MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
// Result found.
*config_selector = chand->config_selector_;
dynamic_filters_ = chand->dynamic_filters_;
return true;
}
@ -2592,7 +2481,7 @@ void ClientChannel::LoadBalancedCall::Orphan() {
size_t ClientChannel::LoadBalancedCall::GetBatchIndex(
grpc_transport_stream_op_batch* batch) {
// Note: It is important the send_initial_metadata be the first entry
// here, since the code in PickSubchannelLocked() assumes it will be.
// here, since the code in PickSubchannelImpl() assumes it will be.
if (batch->send_initial_metadata) return 0;
if (batch->send_message) return 1;
if (batch->send_trailing_metadata) return 2;
@ -2820,10 +2709,10 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
if (GPR_LIKELY(batch->send_initial_metadata)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
chand_, this);
"chand=%p lb_call=%p: grabbing LB mutex to perform pick", chand_,
this);
}
PickSubchannel(this, absl::OkStatus());
PickSubchannel(/*was_queued=*/false);
} else {
// For all other batches, release the call combiner.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
@ -2999,7 +2888,7 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller {
auto* lb_call = self->lb_call_.get();
auto* chand = lb_call->chand_;
{
MutexLock lock(&chand->data_plane_mu_);
MutexLock lock(&chand->lb_mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: cancelling queued pick: "
@ -3010,7 +2899,9 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller {
if (lb_call->lb_call_canceller_ == self && !error.ok()) {
lb_call->call_dispatch_controller_->Commit();
// Remove pick from list of queued picks.
lb_call->MaybeRemoveCallFromLbQueuedCallsLocked();
lb_call->RemoveCallFromLbQueuedCallsLocked();
// Remove from queued picks list.
chand->lb_queued_calls_.erase(lb_call);
// Fail pending batches on the call.
lb_call->PendingBatchesFail(error,
YieldCallCombinerIfPendingBatchesFound);
@ -3024,72 +2915,102 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller {
grpc_closure closure_;
};
void ClientChannel::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() {
if (!queued_pending_lb_pick_) return;
void ClientChannel::LoadBalancedCall::RemoveCallFromLbQueuedCallsLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
chand_, this);
}
chand_->RemoveLbQueuedCall(&queued_call_, pollent_);
queued_pending_lb_pick_ = false;
// Remove pollset_set linkage.
grpc_polling_entity_del_from_pollset_set(pollent_,
chand_->interested_parties_);
// Lame the call combiner canceller.
lb_call_canceller_ = nullptr;
// Add trace annotation
if (call_attempt_tracer_ != nullptr) {
call_attempt_tracer_->RecordAnnotation("Delayed LB pick complete.");
}
// Note: There's no need to actually remove the call from the queue
// here, beacuse that will be done in either
// LbQueuedCallCanceller::CancelLocked() or
// in ClientChannel::UpdateStateAndPickerLocked().
}
void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
if (queued_pending_lb_pick_) return;
void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
chand_, this);
}
queued_pending_lb_pick_ = true;
queued_call_.lb_call = this;
chand_->AddLbQueuedCall(&queued_call_, pollent_);
// Add call's pollent to channel's interested_parties, so that I/O
// can be done under the call's CQ.
grpc_polling_entity_add_to_pollset_set(pollent_, chand_->interested_parties_);
// Add to queue.
chand_->lb_queued_calls_.insert(this);
// Register call combiner cancellation callback.
lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
}
void ClientChannel::LoadBalancedCall::AsyncPickDone(grpc_error_handle error) {
// TODO(roth): Does this callback need to hold a ref to LoadBalancedCall?
GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
void ClientChannel::LoadBalancedCall::PickSubchannel(bool was_queued) {
// Grab mutex and take a ref to the picker.
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
{
MutexLock lock(&chand_->lb_mu_);
picker = chand_->picker_;
}
// We may accumulate multiple pickers here, because if a picker says
// to queue the call, we check again to see if the picker has been
// updated before we queue it.
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>
pickers_to_unref;
while (true) {
// Do pick.
grpc_error_handle error;
bool pick_complete = PickSubchannelImpl(picker.get(), &error);
if (!pick_complete) {
MutexLock lock(&chand_->lb_mu_);
// If picker has been swapped out since we grabbed it, try again.
if (chand_->picker_ != picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: pick not complete, but picker changed",
chand_, this);
}
void ClientChannel::LoadBalancedCall::PickDone(void* arg,
grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
pickers_to_unref.emplace_back(std::move(picker));
picker = chand_->picker_;
continue;
}
// Otherwise queue the pick to try again later when we get a new picker.
AddCallToLbQueuedCallsLocked();
break;
}
// Pick is complete.
// If it was queued, add a trace annotation.
if (was_queued && call_attempt_tracer_ != nullptr) {
call_attempt_tracer_->RecordAnnotation("Delayed LB pick complete.");
}
// If the pick failed, fail the call.
if (!error.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: failed to pick subchannel: error=%s",
self->chand_, self, StatusToString(error).c_str());
}
self->PendingBatchesFail(error, YieldCallCombiner);
return;
chand_, this, StatusToString(error).c_str());
}
self->call_dispatch_controller_->Commit();
self->CreateSubchannelCall();
PendingBatchesFail(error, YieldCallCombiner);
break;
}
void ClientChannel::LoadBalancedCall::PickSubchannel(void* arg,
grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
bool pick_complete;
{
MutexLock lock(&self->chand_->data_plane_mu_);
pick_complete = self->PickSubchannelLocked(&error);
// Pick succeeded.
call_dispatch_controller_->Commit();
CreateSubchannelCall();
break;
}
if (pick_complete) {
PickDone(self, error);
pickers_to_unref.emplace_back(std::move(picker));
// Unref pickers in WorkSerializer.
chand_->work_serializer_->Run(
[pickers_to_unref = std::move(pickers_to_unref)]() mutable {
for (auto& picker : pickers_to_unref) {
picker.reset(DEBUG_LOCATION, "PickSubchannel");
}
},
DEBUG_LOCATION);
}
bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
grpc_error_handle* error) {
bool ClientChannel::LoadBalancedCall::PickSubchannelImpl(
LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) {
GPR_ASSERT(connected_subchannel_ == nullptr);
GPR_ASSERT(subchannel_call_ == nullptr);
// Grab initial metadata.
@ -3104,12 +3025,11 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
pick_args.call_state = &lb_call_state;
Metadata initial_metadata(initial_metadata_batch);
pick_args.initial_metadata = &initial_metadata;
auto result = chand_->picker_->Pick(pick_args);
auto result = picker->Pick(pick_args);
return HandlePickResult<bool>(
&result,
// CompletePick
[this](LoadBalancingPolicy::PickResult::Complete* complete_pick)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
[this](LoadBalancingPolicy::PickResult::Complete* complete_pick) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: LB pick succeeded: subchannel=%p",
@ -3118,8 +3038,8 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
GPR_ASSERT(complete_pick->subchannel != nullptr);
// Grab a ref to the connected subchannel while we're still
// holding the data plane mutex.
SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>(
complete_pick->subchannel.get());
SubchannelWrapper* subchannel =
static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
connected_subchannel_ = subchannel->connected_subchannel();
// If the subchannel has no connected subchannel (e.g., if the
// subchannel has moved out of state READY but the LB policy hasn't
@ -3132,7 +3052,6 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
"has no connected subchannel; queueing pick",
chand_, this);
}
MaybeAddCallToLbQueuedCallsLocked();
return false;
}
lb_subchannel_call_tracker_ =
@ -3140,26 +3059,22 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
if (lb_subchannel_call_tracker_ != nullptr) {
lb_subchannel_call_tracker_->Start();
}
MaybeRemoveCallFromLbQueuedCallsLocked();
return true;
},
// QueuePick
[this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
[this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_,
this);
}
MaybeAddCallToLbQueuedCallsLocked();
return false;
},
// FailPick
[this, initial_metadata_batch,
&error](LoadBalancingPolicy::PickResult::Fail* fail_pick)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
&error](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s",
chand_, this, fail_pick->status.ToString().c_str());
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", chand_,
this, fail_pick->status.ToString().c_str());
}
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
@ -3167,26 +3082,22 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
->value) {
*error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
std::move(fail_pick->status), "LB pick"));
MaybeRemoveCallFromLbQueuedCallsLocked();
return true;
}
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
MaybeAddCallToLbQueuedCallsLocked();
return false;
},
// DropPick
[this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
[this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s",
chand_, this, drop_pick->status.ToString().c_str());
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", chand_,
this, drop_pick->status.ToString().c_str());
}
*error = grpc_error_set_int(
absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
std::move(drop_pick->status), "LB drop")),
StatusIntProperty::kLbPolicyDrop, 1);
MaybeRemoveCallFromLbQueuedCallsLocked();
return true;
});
}

@ -24,11 +24,11 @@
#include <atomic>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@ -222,15 +222,6 @@ class ClientChannel {
std::atomic<bool> done_{false};
};
struct ResolverQueuedCall {
grpc_call_element* elem;
ResolverQueuedCall* next = nullptr;
};
struct LbQueuedCall {
LoadBalancedCall* lb_call;
LbQueuedCall* next = nullptr;
};
ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error);
~ClientChannel();
@ -246,6 +237,9 @@ class ClientChannel {
// Note: All methods with "Locked" suffix must be invoked from within
// work_serializer_.
void ReprocessQueuedResolverCalls()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&resolution_mu_);
void OnResolverResultChangedLocked(Resolver::Result result)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void OnResolverErrorLocked(absl::Status status)
@ -284,20 +278,6 @@ class ClientChannel {
void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
// These methods all require holding resolution_mu_.
void AddResolverQueuedCall(ResolverQueuedCall* call,
grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
// These methods all require holding data_plane_mu_.
void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
//
// Fields set at construction and never modified.
//
@ -316,9 +296,9 @@ class ClientChannel {
// Fields related to name resolution. Guarded by resolution_mu_.
//
mutable Mutex resolution_mu_;
// Linked list of calls queued waiting for resolver result.
ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) =
nullptr;
// List of calls queued waiting for resolver result.
absl::flat_hash_set<grpc_call_element*> resolver_queued_calls_
ABSL_GUARDED_BY(resolution_mu_);
// Data from service config.
absl::Status resolver_transient_failure_error_
ABSL_GUARDED_BY(resolution_mu_);
@ -330,13 +310,13 @@ class ClientChannel {
ABSL_GUARDED_BY(resolution_mu_);
//
// Fields used in the data plane. Guarded by data_plane_mu_.
// Fields related to LB picks. Guarded by lb_mu_.
//
mutable Mutex data_plane_mu_;
mutable Mutex lb_mu_;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
ABSL_GUARDED_BY(data_plane_mu_);
// Linked list of calls queued waiting for LB pick.
LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr;
ABSL_GUARDED_BY(lb_mu_);
absl::flat_hash_set<LoadBalancedCall*> lb_queued_calls_
ABSL_GUARDED_BY(lb_mu_);
//
// Fields used in the control plane. Guarded by work_serializer.
@ -360,7 +340,7 @@ class ClientChannel {
// The set of SubchannelWrappers that currently exist.
// No need to hold a ref, since the map is updated in the control-plane
// work_serializer when the SubchannelWrappers are created and destroyed.
std::set<SubchannelWrapper*> subchannel_wrappers_
absl::flat_hash_set<SubchannelWrapper*> subchannel_wrappers_
ABSL_GUARDED_BY(*work_serializer_);
int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1;
grpc_error_handle disconnect_error_ ABSL_GUARDED_BY(*work_serializer_);
@ -422,16 +402,11 @@ class ClientChannel::LoadBalancedCall
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Invoked by channel for queued LB picks when the picker is updated.
static void PickSubchannel(void* arg, grpc_error_handle error);
// Helper function for performing an LB pick while holding the data plane
// mutex. Returns true if the pick is complete, in which case the caller
// must invoke PickDone() or AsyncPickDone() with the returned error.
bool PickSubchannelLocked(grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
// Schedules a callback to process the completed pick. The callback
// will not run until after this method returns.
void AsyncPickDone(grpc_error_handle error);
void PickSubchannel(bool was_queued);
// Called by channel when removing a call from the list of queued calls.
void RemoveCallFromLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);
RefCountedPtr<SubchannelCall> subchannel_call() const {
return subchannel_call_;
@ -479,14 +454,14 @@ class ClientChannel::LoadBalancedCall
void RecordCallCompletion(absl::Status status);
void CreateSubchannelCall();
// Invoked when a pick is completed, on both success or failure.
static void PickDone(void* arg, grpc_error_handle error);
// Removes the call from the channel's list of queued picks if present.
void MaybeRemoveCallFromLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
// Helper function for performing an LB pick with a specified picker.
// Returns true if the pick is complete.
bool PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker* picker,
grpc_error_handle* error);
// Adds the call to the channel's list of queued picks if not already present.
void MaybeAddCallToLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
void AddCallToLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);
ClientChannel* chand_;
@ -513,15 +488,9 @@ class ClientChannel::LoadBalancedCall
// Set when we fail inside the LB call.
grpc_error_handle failure_error_;
grpc_closure pick_closure_;
// Accessed while holding ClientChannel::data_plane_mu_.
ClientChannel::LbQueuedCall queued_call_
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_);
bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) =
false;
// Accessed while holding ClientChannel::lb_mu_.
LbQueuedCallCanceller* lb_call_canceller_
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr;
ABSL_GUARDED_BY(&ClientChannel::lb_mu_) = nullptr;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
const BackendMetricData* backend_metric_data_ = nullptr;

@ -61,6 +61,7 @@
#include <string.h>
#include <algorithm>
#include <atomic>
#include <initializer_list>
#include <map>
#include <memory>
@ -389,19 +390,15 @@ class GrpcLb : public LoadBalancingPolicy {
// Returns the LB token to use for a drop, or null if the call
// should not be dropped.
//
// Note: This is called from the picker, so it will be invoked in
// the channel's data plane mutex, NOT the control plane
// work_serializer. It should not be accessed by any other part of the LB
// policy.
// Note: This is called from the picker, NOT from inside the control
// plane work_serializer.
const char* ShouldDrop();
private:
std::vector<GrpcLbServer> serverlist_;
// Guarded by the channel's data plane mutex, NOT the control
// plane work_serializer. It should not be accessed by anything but the
// picker via the ShouldDrop() method.
size_t drop_index_ = 0;
// Accessed from the picker, so needs synchronization.
std::atomic<size_t> drop_index_{0};
};
class Picker : public SubchannelPicker {
@ -717,8 +714,8 @@ bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
const char* GrpcLb::Serverlist::ShouldDrop() {
if (serverlist_.empty()) return nullptr;
GrpcLbServer& server = serverlist_[drop_index_];
drop_index_ = (drop_index_ + 1) % serverlist_.size();
size_t index = drop_index_.fetch_add(1, std::memory_order_relaxed);
GrpcLbServer& server = serverlist_[index % serverlist_.size()];
return server.drop ? server.load_balance_token : nullptr;
}

@ -254,19 +254,14 @@ class RingHash : public LoadBalancingPolicy {
explicit Picker(RefCountedPtr<RingHashSubchannelList> subchannel_list)
: subchannel_list_(std::move(subchannel_list)) {}
~Picker() override {
// Hop into WorkSerializer to unref the subchannel list, since that may
// trigger the unreffing of the underlying subchannels.
MakeOrphanable<WorkSerializerRunner>(std::move(subchannel_list_));
}
PickResult Pick(PickArgs args) override;
private:
// An interface for running a callback in the control plane WorkSerializer.
class WorkSerializerRunner : public Orphanable {
// A fire-and-forget class that schedules subchannel connection attempts
// on the control plane WorkSerializer.
class SubchannelConnectionAttempter : public Orphanable {
public:
explicit WorkSerializerRunner(
explicit SubchannelConnectionAttempter(
RefCountedPtr<RingHashSubchannelList> subchannel_list)
: subchannel_list_(std::move(subchannel_list)) {
GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
@ -278,17 +273,26 @@ class RingHash : public LoadBalancingPolicy {
ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
}
void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
subchannels_.push_back(std::move(subchannel));
}
// Will be invoked inside of the WorkSerializer.
virtual void Run() {}
void Run() {
if (!ring_hash_lb()->shutdown_) {
for (auto& subchannel : subchannels_) {
subchannel->RequestConnection();
}
}
}
protected:
private:
RingHash* ring_hash_lb() const {
return static_cast<RingHash*>(subchannel_list_->policy());
}
private:
static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<WorkSerializerRunner*>(arg);
auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
self->ring_hash_lb()->work_serializer()->Run(
[self]() {
self->Run();
@ -298,31 +302,8 @@ class RingHash : public LoadBalancingPolicy {
}
RefCountedPtr<RingHashSubchannelList> subchannel_list_;
grpc_closure closure_;
};
// A fire-and-forget class that schedules subchannel connection attempts
// on the control plane WorkSerializer.
class SubchannelConnectionAttempter : public WorkSerializerRunner {
public:
explicit SubchannelConnectionAttempter(
RefCountedPtr<RingHashSubchannelList> subchannel_list)
: WorkSerializerRunner(std::move(subchannel_list)) {}
void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
subchannels_.push_back(std::move(subchannel));
}
void Run() override {
if (!ring_hash_lb()->shutdown_) {
for (auto& subchannel : subchannels_) {
subchannel->RequestConnection();
}
}
}
private:
std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
grpc_closure closure_;
};
RefCountedPtr<RingHashSubchannelList> subchannel_list_;

@ -365,7 +365,6 @@ class RlsLb : public LoadBalancingPolicy {
class Picker : public LoadBalancingPolicy::SubchannelPicker {
public:
explicit Picker(RefCountedPtr<RlsLb> lb_policy);
~Picker() override;
PickResult Pick(PickArgs args) override;
@ -1008,19 +1007,6 @@ RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
}
}
RlsLb::Picker::~Picker() {
// It's not safe to unref the default child policy in the picker,
// since that needs to be done in the WorkSerializer.
if (default_child_policy_ != nullptr) {
auto* default_child_policy = default_child_policy_.release();
lb_policy_->work_serializer()->Run(
[default_child_policy]() {
default_child_policy->Unref(DEBUG_LOCATION, "Picker");
},
DEBUG_LOCATION);
}
}
LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
// Construct key for request.
RequestKey key = {BuildKeyMap(config_->key_builder_map(), args.path,

@ -18,13 +18,16 @@
#include <inttypes.h>
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
@ -174,7 +177,7 @@ class RoundRobin : public LoadBalancingPolicy {
// Using pointer value only, no ref held -- do not dereference!
RoundRobin* parent_;
size_t last_picked_index_;
std::atomic<size_t> last_picked_index_;
std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
};
@ -189,6 +192,8 @@ class RoundRobin : public LoadBalancingPolicy {
RefCountedPtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
bool shutdown_ = false;
absl::BitGen bit_gen_;
};
//
@ -207,27 +212,26 @@ RoundRobin::Picker::Picker(RoundRobin* parent,
}
// For discussion on why we generate a random starting index for
// the picker, see https://github.com/grpc/grpc-go/issues/2580.
// TODO(roth): rand(3) is not thread-safe. This should be replaced with
// something better as part of https://github.com/grpc/grpc/issues/17891.
last_picked_index_ = rand() % subchannels_.size();
size_t index =
absl::Uniform<size_t>(parent->bit_gen_, 0, subchannels_.size());
last_picked_index_.store(index, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p picker %p] created picker from subchannel_list=%p "
"with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR,
parent_, this, subchannel_list, subchannels_.size(),
last_picked_index_);
parent_, this, subchannel_list, subchannels_.size(), index);
}
}
RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs /*args*/) {
last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size();
size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
subchannels_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p",
parent_, this, last_picked_index_,
subchannels_[last_picked_index_].get());
parent_, this, index, subchannels_[index].get());
}
return PickResult::Complete(subchannels_[last_picked_index_]);
return PickResult::Complete(subchannels_[index]);
}
//

@ -26,6 +26,7 @@
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@ -46,6 +47,7 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
@ -138,7 +140,11 @@ class WeightedTargetLb : public LoadBalancingPolicy {
private:
PickerList pickers_;
absl::BitGen bit_gen_;
// TODO(roth): Consider using a separate thread-local BitGen for each CPU
// to avoid the need for this mutex.
Mutex mu_;
absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_);
};
// Each WeightedChild holds a ref to its parent WeightedTargetLb.
@ -247,8 +253,10 @@ class WeightedTargetLb : public LoadBalancingPolicy {
WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
PickArgs args) {
// Generate a random number in [0, total weight).
const uint64_t key =
absl::Uniform<uint64_t>(bit_gen_, 0, pickers_.back().first);
const uint64_t key = [&]() {
MutexLock lock(&mu_);
return absl::Uniform<uint64_t>(bit_gen_, 0, pickers_.back().first);
}();
// Find the index in pickers_ corresponding to key.
size_t mid = 0;
size_t start_index = 0;

@ -1351,8 +1351,9 @@ RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {
this);
}
CallAttempt* call_attempt = std::exchange(call_attempt_, nullptr);
GRPC_CALL_STACK_UNREF(call_attempt->calld_->owning_call_, "Retry BatchData");
grpc_call_stack* owning_call = call_attempt->calld_->owning_call_;
call_attempt->Unref(DEBUG_LOCATION, "~BatchData");
GRPC_CALL_STACK_UNREF(owning_call, "Retry BatchData");
}
void RetryFilter::CallData::CallAttempt::BatchData::

@ -93,11 +93,14 @@ std::string XdsEndpointResource::Priority::ToString() const {
}
bool XdsEndpointResource::DropConfig::ShouldDrop(
const std::string** category_name) const {
const std::string** category_name) {
for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i];
// Generate a random number in [0, 1000000).
const uint32_t random = static_cast<uint32_t>(rand()) % 1000000;
const uint32_t random = [&]() {
MutexLock lock(&mu_);
return absl::Uniform<uint32_t>(bit_gen_, 0, 1000000);
}();
if (random < drop_category.parts_per_million) {
*category_name = &drop_category.name;
return true;

@ -28,6 +28,8 @@
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/random/random.h"
#include "absl/strings/string_view.h"
#include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
#include "upb/def.h"
@ -38,6 +40,7 @@
#include "src/core/ext/xds/xds_resource_type_impl.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/resolver/server_address.h"
namespace grpc_core {
@ -90,7 +93,7 @@ struct XdsEndpointResource : public XdsResourceType::ResourceData {
// The only method invoked from outside the WorkSerializer (used in
// the data plane).
bool ShouldDrop(const std::string** category_name) const;
bool ShouldDrop(const std::string** category_name);
const DropCategoryList& drop_category_list() const {
return drop_category_list_;
@ -108,6 +111,11 @@ struct XdsEndpointResource : public XdsResourceType::ResourceData {
private:
DropCategoryList drop_category_list_;
bool drop_all_ = false;
// TODO(roth): Consider using a separate thread-local BitGen for each CPU
// to avoid the need for this mutex.
Mutex mu_;
absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_);
};
PriorityList priorities;

@ -69,19 +69,15 @@ LoadBalancingPolicy::SubchannelPicker::SubchannelPicker()
LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
PickArgs /*args*/) {
// We invoke the parent's ExitIdleLocked() via a closure instead
// of doing it directly here, for two reasons:
// 1. ExitIdleLocked() may cause the policy's state to change and
// a new picker to be delivered to the channel. If that new
// picker is delivered before ExitIdleLocked() returns, then by
// the time this function returns, the pick will already have
// been processed, and we'll be trying to re-process the same
// pick again, leading to a crash.
// 2. We are currently running in the data plane mutex, but we
// need to bounce into the control plane work_serializer to call
// ExitIdleLocked().
if (!exit_idle_called_ && parent_ != nullptr) {
exit_idle_called_ = true;
auto* parent = parent_->Ref().release(); // ref held by lambda.
// of doing it directly here because ExitIdleLocked() may cause the
// policy's state to change and a new picker to be delivered to the
// channel. If that new picker is delivered before ExitIdleLocked()
// returns, then by the time this function returns, the pick will already
// have been processed, and we'll be trying to re-process the same pick
// again, leading to a crash.
MutexLock lock(&mu_);
if (parent_ != nullptr) {
auto* parent = parent_.release(); // ref held by lambda.
ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle /*error*/) {

@ -27,6 +27,7 @@
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
@ -44,6 +45,7 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
@ -392,8 +394,8 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
PickResult Pick(PickArgs args) override;
private:
RefCountedPtr<LoadBalancingPolicy> parent_;
bool exit_idle_called_ = false;
Mutex mu_;
RefCountedPtr<LoadBalancingPolicy> parent_ ABSL_GUARDED_BY(&mu_);
};
// A picker that returns PickResult::Fail for all picks.

Loading…
Cancel
Save