Second attempt: client channel: don't hold mutexes while calling the ConfigSelector or the LB picker (#32326)

Original attempt was #31973, reverted in #32324 due to test flakiness.

There were two problems causing test flakiness here.

The first problem was that, upon resolver error, we were dispatching an
async callback to re-process each of the queued picks *before* we
updated the channel's connectivity state, which meant that the queued
picks might be re-processed in another thread before the new
connectivity state was set, so tests that expected the state to be
TRANSIENT_FAILURE once RPCs failed might not see the expected state.

The second problem affected the xDS ring hash tests, and it's a bit more
involved to explain.

We have an e2e test that simulates an aggregate cluster failover from a
primary cluster using ring_hash at startup. The primary cluster has two
addresses, both of which are unreachable when the client starts up, so
the client should immediately fail over to the secondary cluster, which
does have reachable endpoints. The test requires that no RPCs are failed
while this failover occurs. The original PR made this test flaky.

The problem here was caused by a combination of two factors:

1. Prior to the original PR, when the picker was updated (which happens
inside the WorkSerializer), we re-processed previously queued picks
synchronously, so it was not possible for another subchannel
connectivity state update (which also happens in the WorkSerializer) to
be processed between the time that we updated the picker and the time
that we re-processed the previously queued picks. The original PR
changed this such that the queued picks are re-processed asynchronously
(outside of the WorkSerializer), so it is now possible for a subchannel
connectivity state update to be processed between when the picker is
updated and when we re-process the previously queued picks.

2. Unlike most LB policies, where the picker does not see updated
subchannel connectivity states until a new picker is created, the
ring_hash picker gets the subchannel connectivity states from the LB
policy via a lock, so it can wind up seeing the new states before it
gets updated. This means that when a subchannel connectivity state
update is processed by the ring_hash policy in the WorkSerializer, it
will immediately be seen by the existing picker, even without a picker
update.

With those two points in mind, the sequence of events in the failing
test were as follows:

1. The pick is attempted in the ring_hash picker for the primary
cluster. This causes the first subchannel to attempt to connect.
2. The subchannel transitions from IDLE to CONNECTING. A new picker is
returned due to the subchannel connectivity state change, and the
channel retries the queued pick. The retried pick is done
asynchronously, but in this case it does not matter: the call will be
re-queued.
3. The connection attempt fails, and the subchannel reports
TRANSIENT_FAILURE. A new picker is again returned, and the channel
retries the queued pick. The retried pick is done asynchronously, but in
this case it does not matter: this causes the picker to trigger a
connection attempt for the second subchannel.
4. The second subchannel transitions from IDLE to CONNECTING. A new
picker is again returned, and the channel retries the queued pick. The
retried pick is done asynchronously, and in this case it *does* matter.
5. The second subchannel now transitions to TRANSIENT_FAILURE. The
ring_hash policy will now report TRANSIENT_FAILURE, but before it can
finish that...
6. ...In another thread, the channel now tries to re-process the queued
pick using the CONNECTING picker from step 4. However, because the
ring_hash policy has already seen the TRANSIENT_FAILURE report from the
second subchannel, that picker will now fail the pick instead of queuing
it.

After discussion with @ejona86 and @dfawley (since this bug actually
exists in Java and Go as well), we agreed that the right solution is to
change the ring_hash picker to contain its own copy of the subchannel
connectivity state information, rather than sharing that information
with the LB policy using synchronization.
pull/32223/head^2
Mark D. Roth 2 years ago committed by GitHub
parent 6589340efc
commit 8249fc10a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 6
      src/core/BUILD
  3. 583
      src/core/ext/filters/client_channel/client_channel.cc
  4. 85
      src/core/ext/filters/client_channel/client_channel.h
  5. 17
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  6. 272
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  7. 14
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  8. 24
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  9. 14
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  10. 3
      src/core/ext/filters/client_channel/retry_filter.cc
  11. 7
      src/core/ext/xds/xds_endpoint.cc
  12. 10
      src/core/ext/xds/xds_endpoint.h
  13. 22
      src/core/lib/load_balancing/lb_policy.cc
  14. 6
      src/core/lib/load_balancing/lb_policy.h

@ -2785,6 +2785,7 @@ grpc_cc_library(
], ],
external_deps = [ external_deps = [
"absl/base:core_headers", "absl/base:core_headers",
"absl/container:flat_hash_set",
"absl/container:inlined_vector", "absl/container:inlined_vector",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",

@ -2495,6 +2495,7 @@ grpc_cc_library(
srcs = ["lib/load_balancing/lb_policy.cc"], srcs = ["lib/load_balancing/lb_policy.cc"],
hdrs = ["lib/load_balancing/lb_policy.h"], hdrs = ["lib/load_balancing/lb_policy.h"],
external_deps = [ external_deps = [
"absl/base:core_headers",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
@ -2514,6 +2515,7 @@ grpc_cc_library(
"//:debug_location", "//:debug_location",
"//:event_engine_base_hdrs", "//:event_engine_base_hdrs",
"//:exec_ctx", "//:exec_ctx",
"//:gpr",
"//:gpr_platform", "//:gpr_platform",
"//:grpc_trace", "//:grpc_trace",
"//:orphanable", "//:orphanable",
@ -3849,6 +3851,7 @@ grpc_cc_library(
"absl/base:core_headers", "absl/base:core_headers",
"absl/functional:bind_front", "absl/functional:bind_front",
"absl/memory", "absl/memory",
"absl/random",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
@ -4421,6 +4424,7 @@ grpc_cc_library(
"json_object_loader", "json_object_loader",
"lb_policy", "lb_policy",
"lb_policy_factory", "lb_policy_factory",
"ref_counted",
"subchannel_interface", "subchannel_interface",
"unique_type_name", "unique_type_name",
"validation_errors", "validation_errors",
@ -4445,6 +4449,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/round_robin/round_robin.cc", "ext/filters/client_channel/lb_policy/round_robin/round_robin.cc",
], ],
external_deps = [ external_deps = [
"absl/random",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
@ -4631,6 +4636,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc", "ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc",
], ],
external_deps = [ external_deps = [
"absl/base:core_headers",
"absl/random", "absl/random",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",

@ -24,7 +24,7 @@
#include <algorithm> #include <algorithm>
#include <functional> #include <functional>
#include <new> #include <new>
#include <set> #include <type_traits>
#include <vector> #include <vector>
#include "absl/status/status.h" #include "absl/status/status.h"
@ -115,19 +115,12 @@ class ClientChannel::CallData {
grpc_call_element* elem, grpc_transport_stream_op_batch* batch); grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent); static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
// Invoked by channel for queued calls when name resolution is completed. void CheckResolution(grpc_call_element* elem, bool was_queued);
static void CheckResolution(void* arg, grpc_error_handle error);
// Helper function for applying the service config to a call while // Removes the call from the channel's list of calls queued
// holding ClientChannel::resolution_mu_. // for name resolution.
// Returns true if the service config has been applied to the call, in which void RemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem)
// case the caller must invoke ResolutionDone() or AsyncResolutionDone()
// with the returned error.
bool CheckResolutionLocked(grpc_call_element* elem, grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
// Schedules a callback to continue processing the call once
// resolution is complete. The callback will not run until after this
// method returns.
void AsyncResolutionDone(grpc_call_element* elem, grpc_error_handle error);
private: private:
class ResolverQueuedCallCanceller; class ResolverQueuedCallCanceller;
@ -166,23 +159,22 @@ class ClientChannel::CallData {
// Resumes all pending batches on lb_call_. // Resumes all pending batches on lb_call_.
void PendingBatchesResume(grpc_call_element* elem); void PendingBatchesResume(grpc_call_element* elem);
// Helper function for CheckResolution(). Returns true if the call
// can continue (i.e., there is a valid resolution result, or there is
// an invalid resolution result but the call is not wait_for_ready).
bool CheckResolutionLocked(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata,
absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
// Applies service config to the call. Must be invoked once we know // Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel. // that the resolver has returned results to the channel.
// If an error is returned, the error indicates the status with which // If an error is returned, the error indicates the status with which
// the call should be failed. // the call should be failed.
grpc_error_handle ApplyServiceConfigToCallLocked( grpc_error_handle ApplyServiceConfigToCallLocked(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata) grpc_call_element* elem, grpc_metadata_batch* initial_metadata,
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector);
// Invoked when the resolver result is applied to the caller, on both // Adds the call to the channel's list of calls queued for name resolution.
// success or failure. void AddCallToResolverQueuedCallsLocked(grpc_call_element* elem)
static void ResolutionDone(void* arg, grpc_error_handle error);
// Removes the call (if present) from the channel's list of calls queued
// for name resolution.
void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
// Adds the call (if not already present) to the channel's list of
// calls queued for name resolution.
void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback( static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
@ -204,15 +196,7 @@ class ClientChannel::CallData {
grpc_polling_entity* pollent_ = nullptr; grpc_polling_entity* pollent_ = nullptr;
grpc_closure resolution_done_closure_;
// Accessed while holding ClientChannel::resolution_mu_. // Accessed while holding ClientChannel::resolution_mu_.
bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) =
false;
bool queued_pending_resolver_result_
ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = false;
ClientChannel::ResolverQueuedCall resolver_queued_call_
ABSL_GUARDED_BY(&ClientChannel::resolution_mu_);
ResolverQueuedCallCanceller* resolver_call_canceller_ ResolverQueuedCallCanceller* resolver_call_canceller_
ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = nullptr; ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = nullptr;
@ -520,20 +504,18 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
} }
private: private:
// Subchannel and SubchannelInterface have different interfaces for // This wrapper provides a bridge between the internal Subchannel API
// their respective ConnectivityStateWatcherInterface classes. // and the SubchannelInterface API that we expose to LB policies.
// The one in Subchannel updates the ConnectedSubchannel along with // It implements Subchannel::ConnectivityStateWatcherInterface and wraps
// the state, whereas the one in SubchannelInterface does not expose
// the ConnectedSubchannel.
//
// This wrapper provides a bridge between the two. It implements
// Subchannel::ConnectivityStateWatcherInterface and wraps
// the instance of SubchannelInterface::ConnectivityStateWatcherInterface // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
// that was passed in by the LB policy. We pass an instance of this // that was passed in by the LB policy. We pass an instance of this
// class to the underlying Subchannel, and when we get updates from // class to the underlying Subchannel, and when we get updates from
// the subchannel, we pass those on to the wrapped watcher to return // the subchannel, we pass those on to the wrapped watcher to return
// the update to the LB policy. This allows us to set the connected // the update to the LB policy.
// subchannel before passing the result back to the LB policy. //
// This class handles things like hopping into the WorkSerializer
// before passing notifications to the LB policy and propagating
// keepalive information betwen subchannels.
class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface { class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
public: public:
WatcherWrapper( WatcherWrapper(
@ -571,16 +553,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
} }
grpc_pollset_set* interested_parties() override { grpc_pollset_set* interested_parties() override {
SubchannelInterface::ConnectivityStateWatcherInterface* watcher = return watcher_->interested_parties();
watcher_.get();
if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
return watcher->interested_parties();
}
WatcherWrapper* MakeReplacement() {
auto* replacement = new WatcherWrapper(std::move(watcher_), parent_);
replacement_ = replacement;
return replacement;
} }
private: private:
@ -638,7 +611,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_; watcher_;
RefCountedPtr<SubchannelWrapper> parent_; RefCountedPtr<SubchannelWrapper> parent_;
WatcherWrapper* replacement_ = nullptr;
}; };
ClientChannel* chand_; ClientChannel* chand_;
@ -1099,6 +1071,20 @@ ChannelArgs ClientChannel::MakeSubchannelArgs(
.Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE); .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE);
} }
void ClientChannel::ReprocessQueuedResolverCalls() {
for (grpc_call_element* elem : resolver_queued_calls_) {
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->RemoveCallFromResolverQueuedCallsLocked(elem);
owning_stack_->EventEngine()->Run([elem]() {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->CheckResolution(elem, /*was_queued=*/true);
});
}
resolver_queued_calls_.clear();
}
namespace { namespace {
RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy( RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
@ -1305,26 +1291,19 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) {
// result, then we continue to let it set the connectivity state. // result, then we continue to let it set the connectivity state.
// Otherwise, we go into TRANSIENT_FAILURE. // Otherwise, we go into TRANSIENT_FAILURE.
if (lb_policy_ == nullptr) { if (lb_policy_ == nullptr) {
// Update connectivity state.
// TODO(roth): We should be updating the connectivity state here but
// not the picker.
UpdateStateAndPickerLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure",
MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(status));
{ {
MutexLock lock(&resolution_mu_); MutexLock lock(&resolution_mu_);
// Update resolver transient failure. // Update resolver transient failure.
resolver_transient_failure_error_ = resolver_transient_failure_error_ =
MaybeRewriteIllegalStatusCode(status, "resolver"); MaybeRewriteIllegalStatusCode(status, "resolver");
// Process calls that were queued waiting for the resolver result. ReprocessQueuedResolverCalls();
for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
call = call->next) {
grpc_call_element* elem = call->elem;
CallData* calld = static_cast<CallData*>(elem->call_data);
grpc_error_handle error;
if (calld->CheckResolutionLocked(elem, &error)) {
calld->AsyncResolutionDone(elem, error);
}
}
} }
// Update connectivity state.
UpdateStateAndPickerLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure",
MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(status));
} }
} }
@ -1378,30 +1357,6 @@ OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
return lb_policy; return lb_policy;
} }
void ClientChannel::AddResolverQueuedCall(ResolverQueuedCall* call,
grpc_polling_entity* pollent) {
// Add call to queued calls list.
call->next = resolver_queued_calls_;
resolver_queued_calls_ = call;
// Add call's pollent to channel's interested_parties, so that I/O
// can be done under the call's CQ.
grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
}
void ClientChannel::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
grpc_polling_entity* pollent) {
// Remove call's pollent from channel's interested_parties.
grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
// Remove from queued calls list.
for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr;
call = &(*call)->next) {
if (*call == to_remove) {
*call = to_remove->next;
return;
}
}
}
void ClientChannel::UpdateServiceConfigInControlPlaneLocked( void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr<ServiceConfig> service_config, RefCountedPtr<ServiceConfig> service_config,
RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) { RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
@ -1468,25 +1423,8 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
service_config_.swap(service_config); service_config_.swap(service_config);
config_selector_.swap(config_selector); config_selector_.swap(config_selector);
dynamic_filters_.swap(dynamic_filters); dynamic_filters_.swap(dynamic_filters);
// Process calls that were queued waiting for the resolver result. // Re-process queued calls asynchronously.
for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr; ReprocessQueuedResolverCalls();
call = call->next) {
// If there are a lot of queued calls here, resuming them all may cause us
// to stay inside C-core for a long period of time. All of that work would
// be done using the same ExecCtx instance and therefore the same cached
// value of "now". The longer it takes to finish all of this work and exit
// from C-core, the more stale the cached value of "now" may become. This
// can cause problems whereby (e.g.) we calculate a timer deadline based
// on the stale value, which results in the timer firing too early. To
// avoid this, we invalidate the cached value for each call we process.
ExecCtx::Get()->InvalidateNow();
grpc_call_element* elem = call->elem;
CallData* calld = static_cast<CallData*>(elem->call_data);
grpc_error_handle error;
if (calld->CheckResolutionLocked(elem, &error)) {
calld->AsyncResolutionDone(elem, error);
}
}
} }
// Old values will be unreffed after lock is released when they go out // Old values will be unreffed after lock is released when they go out
// of scope. // of scope.
@ -1502,6 +1440,10 @@ void ClientChannel::CreateResolverLocked() {
// Since the validity of the args was checked when the channel was created, // Since the validity of the args was checked when the channel was created,
// CreateResolver() must return a non-null result. // CreateResolver() must return a non-null result.
GPR_ASSERT(resolver_ != nullptr); GPR_ASSERT(resolver_ != nullptr);
// TODO(roth): We should be updating the connectivity state here but
// not the picker. But we need to make sure that we are initializing
// the picker to a queueing picker somewhere, in case the LB policy
// does not immediately return a new picker.
UpdateStateAndPickerLocked( UpdateStateAndPickerLocked(
GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving", GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr)); MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
@ -1562,29 +1504,22 @@ void ClientChannel::UpdateStateAndPickerLocked(
channelz::ChannelNode::GetChannelConnectivityStateChangeString( channelz::ChannelNode::GetChannelConnectivityStateChangeString(
state))); state)));
} }
// Grab data plane lock to update the picker. // Grab the LB lock to update the picker and trigger reprocessing of the
// queued picks.
// Old picker will be unreffed after releasing the lock.
{ {
MutexLock lock(&data_plane_mu_); MutexLock lock(&lb_mu_);
// Swap out the picker.
// Note: Original value will be destroyed after the lock is released.
picker_.swap(picker); picker_.swap(picker);
// Re-process queued picks. // Reprocess queued picks asynchronously.
for (LbQueuedCall* call = lb_queued_calls_; call != nullptr; for (LoadBalancedCall* call : lb_queued_calls_) {
call = call->next) { call->RemoveCallFromLbQueuedCallsLocked();
// If there are a lot of queued calls here, resuming them all may cause us owning_stack_->EventEngine()->Run([call]() {
// to stay inside C-core for a long period of time. All of that work would ApplicationCallbackExecCtx application_exec_ctx;
// be done using the same ExecCtx instance and therefore the same cached ExecCtx exec_ctx;
// value of "now". The longer it takes to finish all of this work and exit call->PickSubchannel(/*was_queued=*/true);
// from C-core, the more stale the cached value of "now" may become. This });
// can cause problems whereby (e.g.) we calculate a timer deadline based
// on the stale value, which results in the timer firing too early. To
// avoid this, we invalidate the cached value for each call we process.
ExecCtx::Get()->InvalidateNow();
grpc_error_handle error;
if (call->lb_call->PickSubchannelLocked(&error)) {
call->lb_call->AsyncPickDone(error);
}
} }
lb_queued_calls_.clear();
} }
} }
@ -1628,7 +1563,7 @@ grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) {
} }
LoadBalancingPolicy::PickResult result; LoadBalancingPolicy::PickResult result;
{ {
MutexLock lock(&data_plane_mu_); MutexLock lock(&lb_mu_);
result = picker_->Pick(LoadBalancingPolicy::PickArgs()); result = picker_->Pick(LoadBalancingPolicy::PickArgs());
} }
return HandlePickResult<grpc_error_handle>( return HandlePickResult<grpc_error_handle>(
@ -1712,6 +1647,9 @@ void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API", GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>( MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(
grpc_error_to_absl_status(op->disconnect_with_error))); grpc_error_to_absl_status(op->disconnect_with_error)));
// TODO(roth): If this happens when we're still waiting for a
// resolver result, we need to trigger failures for all calls in
// the resolver queue here.
} }
} }
GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op"); GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
@ -1748,30 +1686,6 @@ void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
} }
} }
void ClientChannel::AddLbQueuedCall(LbQueuedCall* call,
grpc_polling_entity* pollent) {
// Add call to queued picks list.
call->next = lb_queued_calls_;
lb_queued_calls_ = call;
// Add call's pollent to channel's interested_parties, so that I/O
// can be done under the call's CQ.
grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
}
void ClientChannel::RemoveLbQueuedCall(LbQueuedCall* to_remove,
grpc_polling_entity* pollent) {
// Remove call's pollent from channel's interested_parties.
grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
// Remove from queued picks list.
for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr;
call = &(*call)->next) {
if (*call == to_remove) {
*call = to_remove->next;
return;
}
}
}
void ClientChannel::TryToConnectLocked() { void ClientChannel::TryToConnectLocked() {
if (lb_policy_ != nullptr) { if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked(); lb_policy_->ExitIdleLocked();
@ -1938,7 +1852,23 @@ void ClientChannel::CallData::StartTransportStreamOpBatch(
"config", "config",
chand, calld); chand, calld);
} }
CheckResolution(elem, absl::OkStatus()); // If we're still in IDLE, we need to start resolving.
if (GPR_UNLIKELY(chand->CheckConnectivityState(false) ==
GRPC_CHANNEL_IDLE)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand,
calld);
}
// Bounce into the control plane work serializer to start resolving.
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ExitIdle");
chand->work_serializer_->Run(
[chand]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
chand->CheckConnectivityState(/*try_to_connect=*/true);
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
},
DEBUG_LOCATION);
}
calld->CheckResolution(elem, /*was_queued=*/false);
} else { } else {
// For all other batches, release the call combiner. // For all other batches, release the call combiner.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
@ -1964,8 +1894,7 @@ void ClientChannel::CallData::SetPollent(grpc_call_element* elem,
size_t ClientChannel::CallData::GetBatchIndex( size_t ClientChannel::CallData::GetBatchIndex(
grpc_transport_stream_op_batch* batch) { grpc_transport_stream_op_batch* batch) {
// Note: It is important the send_initial_metadata be the first entry // Note: It is important the send_initial_metadata be the first entry
// here, since the code in ApplyServiceConfigToCallLocked() and // here, since the code in CheckResolution() assumes it will be.
// CheckResolutionLocked() assumes it will be.
if (batch->send_initial_metadata) return 0; if (batch->send_initial_metadata) return 0;
if (batch->send_message) return 1; if (batch->send_message) return 1;
if (batch->send_trailing_metadata) return 2; if (batch->send_trailing_metadata) return 2;
@ -2109,7 +2038,8 @@ class ClientChannel::CallData::ResolverQueuedCallCanceller {
} }
if (calld->resolver_call_canceller_ == self && !error.ok()) { if (calld->resolver_call_canceller_ == self && !error.ok()) {
// Remove pick from list of queued picks. // Remove pick from list of queued picks.
calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_); calld->RemoveCallFromResolverQueuedCallsLocked(self->elem_);
chand->resolver_queued_calls_.erase(self->elem_);
// Fail pending batches on the call. // Fail pending batches on the call.
calld->PendingBatchesFail(self->elem_, error, calld->PendingBatchesFail(self->elem_, error,
YieldCallCombinerIfPendingBatchesFound); YieldCallCombinerIfPendingBatchesFound);
@ -2123,57 +2053,56 @@ class ClientChannel::CallData::ResolverQueuedCallCanceller {
grpc_closure closure_; grpc_closure closure_;
}; };
void ClientChannel::CallData::MaybeRemoveCallFromResolverQueuedCallsLocked( void ClientChannel::CallData::RemoveCallFromResolverQueuedCallsLocked(
grpc_call_element* elem) { grpc_call_element* elem) {
if (!queued_pending_resolver_result_) return;
auto* chand = static_cast<ClientChannel*>(elem->channel_data); auto* chand = static_cast<ClientChannel*>(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p calld=%p: removing from resolver queued picks list", "chand=%p calld=%p: removing from resolver queued picks list",
chand, this); chand, this);
} }
chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_); // Remove call's pollent from channel's interested_parties.
queued_pending_resolver_result_ = false; grpc_polling_entity_del_from_pollset_set(pollent_,
chand->interested_parties_);
// Lame the call combiner canceller. // Lame the call combiner canceller.
resolver_call_canceller_ = nullptr; resolver_call_canceller_ = nullptr;
// Add trace annotation // Note: There's no need to actually remove the call from the queue
auto* call_tracer = // here, beacuse that will be done in
static_cast<CallTracer*>(call_context_[GRPC_CONTEXT_CALL_TRACER].value); // ResolverQueuedCallCanceller::CancelLocked() or
if (call_tracer != nullptr) { // ClientChannel::ReprocessQueuedResolverCalls().
call_tracer->RecordAnnotation("Delayed name resolution complete.");
}
} }
void ClientChannel::CallData::MaybeAddCallToResolverQueuedCallsLocked( void ClientChannel::CallData::AddCallToResolverQueuedCallsLocked(
grpc_call_element* elem) { grpc_call_element* elem) {
if (queued_pending_resolver_result_) return;
auto* chand = static_cast<ClientChannel*>(elem->channel_data); auto* chand = static_cast<ClientChannel*>(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list", gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
chand, this); chand, this);
} }
queued_pending_resolver_result_ = true; // Add call's pollent to channel's interested_parties, so that I/O
resolver_queued_call_.elem = elem; // can be done under the call's CQ.
chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_); grpc_polling_entity_add_to_pollset_set(pollent_, chand->interested_parties_);
// Add to queue.
chand->resolver_queued_calls_.insert(elem);
// Register call combiner cancellation callback. // Register call combiner cancellation callback.
resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem); resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem);
} }
grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked( grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata) { grpc_call_element* elem, grpc_metadata_batch* initial_metadata,
const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) {
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data); ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
chand, this); chand, this);
} }
ConfigSelector* config_selector = chand->config_selector_.get(); if (!config_selector.ok()) return config_selector.status();
if (config_selector != nullptr) {
// Use the ConfigSelector to determine the config for the call. // Use the ConfigSelector to determine the config for the call.
auto call_config = auto call_config =
config_selector->GetCallConfig({&path_, initial_metadata, arena()}); (*config_selector)->GetCallConfig({&path_, initial_metadata, arena()});
if (!call_config.ok()) { if (!call_config.ok()) {
return absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode( return absl_status_to_grpc_error(
call_config.status(), "ConfigSelector")); MaybeRewriteIllegalStatusCode(call_config.status(), "ConfigSelector"));
} }
// Create a ClientChannelServiceConfigCallData for the call. This stores // Create a ClientChannelServiceConfigCallData for the call. This stores
// a ref to the ServiceConfig and caches the right set of parsed configs // a ref to the ServiceConfig and caches the right set of parsed configs
@ -2204,18 +2133,12 @@ grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
} }
// If the service config set wait_for_ready and the application // If the service config set wait_for_ready and the application
// did not explicitly set it, use the value from the service config. // did not explicitly set it, use the value from the service config.
auto* wait_for_ready = auto* wait_for_ready = initial_metadata->GetOrCreatePointer(WaitForReady());
pending_batches_[0]
->payload->send_initial_metadata.send_initial_metadata
->GetOrCreatePointer(WaitForReady());
if (method_params->wait_for_ready().has_value() && if (method_params->wait_for_ready().has_value() &&
!wait_for_ready->explicitly_set) { !wait_for_ready->explicitly_set) {
wait_for_ready->value = method_params->wait_for_ready().value(); wait_for_ready->value = method_params->wait_for_ready().value();
} }
} }
// Set the dynamic filter stack.
dynamic_filters_ = chand->dynamic_filters_;
}
return absl::OkStatus(); return absl::OkStatus();
} }
@ -2243,80 +2166,60 @@ void ClientChannel::CallData::
error); error);
} }
void ClientChannel::CallData::AsyncResolutionDone(grpc_call_element* elem, void ClientChannel::CallData::CheckResolution(grpc_call_element* elem,
grpc_error_handle error) { bool was_queued) {
// TODO(roth): Does this callback need to hold a ref to the call stack?
GRPC_CLOSURE_INIT(&resolution_done_closure_, ResolutionDone, elem, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &resolution_done_closure_, error);
}
void ClientChannel::CallData::ResolutionDone(void* arg,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data); ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
CallData* calld = static_cast<CallData*>(elem->call_data); grpc_metadata_batch* initial_metadata =
pending_batches_[0]->payload->send_initial_metadata.send_initial_metadata;
// Check if we have a resolver result to use.
absl::StatusOr<RefCountedPtr<ConfigSelector>> config_selector;
{
MutexLock lock(&chand->resolution_mu_);
bool result_ready =
CheckResolutionLocked(elem, initial_metadata, &config_selector);
// If no result is available, queue the call.
if (!result_ready) {
AddCallToResolverQueuedCallsLocked(elem);
return;
}
}
// We have a result. Apply service config to call.
grpc_error_handle error =
ApplyServiceConfigToCallLocked(elem, initial_metadata, config_selector);
// ConfigSelector must be unreffed inside the WorkSerializer.
if (config_selector.ok()) {
chand->work_serializer_->Run(
[config_selector = std::move(*config_selector)]() mutable {
config_selector.reset();
},
DEBUG_LOCATION);
}
// Handle errors.
if (!error.ok()) { if (!error.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p calld=%p: error applying config to call: error=%s", "chand=%p calld=%p: error applying config to call: error=%s",
chand, calld, StatusToString(error).c_str()); chand, this, StatusToString(error).c_str());
} }
calld->PendingBatchesFail(elem, error, YieldCallCombiner); PendingBatchesFail(elem, error, YieldCallCombiner);
return; return;
} }
calld->CreateDynamicCall(elem); // If the call was queued, add trace annotation.
} if (was_queued) {
auto* call_tracer =
void ClientChannel::CallData::CheckResolution(void* arg, static_cast<CallTracer*>(call_context_[GRPC_CONTEXT_CALL_TRACER].value);
grpc_error_handle error) { if (call_tracer != nullptr) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); call_tracer->RecordAnnotation("Delayed name resolution complete.");
CallData* calld = static_cast<CallData*>(elem->call_data);
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
bool resolution_complete;
{
MutexLock lock(&chand->resolution_mu_);
resolution_complete = calld->CheckResolutionLocked(elem, &error);
} }
if (resolution_complete) {
ResolutionDone(elem, error);
} }
// Create dynamic call.
CreateDynamicCall(elem);
} }
bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem, bool ClientChannel::CallData::CheckResolutionLocked(
grpc_error_handle* error) { grpc_call_element* elem, grpc_metadata_batch* initial_metadata,
absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) {
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data); ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
// If we're still in IDLE, we need to start resolving.
if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand, this);
}
// Bounce into the control plane work serializer to start resolving,
// in case we are still in IDLE state. Since we are holding on to the
// resolution mutex here, we offload it on the ExecCtx so that we don't
// deadlock with ourselves.
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "CheckResolutionLocked");
ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle /*error*/) {
auto* chand = static_cast<ClientChannel*>(arg);
chand->work_serializer_->Run(
[chand]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
chand->CheckConnectivityState(/*try_to_connect=*/true);
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_,
"CheckResolutionLocked");
},
DEBUG_LOCATION);
},
chand, nullptr),
absl::OkStatus());
}
// Get send_initial_metadata batch and flags.
auto& send_initial_metadata =
pending_batches_[0]->payload->send_initial_metadata;
grpc_metadata_batch* initial_metadata_batch =
send_initial_metadata.send_initial_metadata;
// If we don't yet have a resolver result, we need to queue the call // If we don't yet have a resolver result, we need to queue the call
// until we get one. // until we get one.
if (GPR_UNLIKELY(!chand->received_service_config_data_)) { if (GPR_UNLIKELY(!chand->received_service_config_data_)) {
@ -2324,31 +2227,26 @@ bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem,
// first service config, fail any non-wait_for_ready calls. // first service config, fail any non-wait_for_ready calls.
absl::Status resolver_error = chand->resolver_transient_failure_error_; absl::Status resolver_error = chand->resolver_transient_failure_error_;
if (!resolver_error.ok() && if (!resolver_error.ok() &&
!initial_metadata_batch->GetOrCreatePointer(WaitForReady())->value) { !initial_metadata->GetOrCreatePointer(WaitForReady())->value) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call", gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call",
chand, this); chand, this);
} }
MaybeRemoveCallFromResolverQueuedCallsLocked(elem); *config_selector = absl_status_to_grpc_error(resolver_error);
*error = absl_status_to_grpc_error(resolver_error);
return true; return true;
} }
// Either the resolver has not yet returned a result, or it has // Either the resolver has not yet returned a result, or it has
// returned transient failure but the call is wait_for_ready. In // returned transient failure but the call is wait_for_ready. In
// either case, queue the call. // either case, queue the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: queuing to wait for resolution", gpr_log(GPR_INFO, "chand=%p calld=%p: no resolver result yet", chand,
chand, this); this);
} }
MaybeAddCallToResolverQueuedCallsLocked(elem);
return false; return false;
} }
// Apply service config to call if not yet applied. // Result found.
if (GPR_LIKELY(!service_config_applied_)) { *config_selector = chand->config_selector_;
service_config_applied_ = true; dynamic_filters_ = chand->dynamic_filters_;
*error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
}
MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
return true; return true;
} }
@ -2591,7 +2489,7 @@ void ClientChannel::LoadBalancedCall::Orphan() {
size_t ClientChannel::LoadBalancedCall::GetBatchIndex( size_t ClientChannel::LoadBalancedCall::GetBatchIndex(
grpc_transport_stream_op_batch* batch) { grpc_transport_stream_op_batch* batch) {
// Note: It is important the send_initial_metadata be the first entry // Note: It is important the send_initial_metadata be the first entry
// here, since the code in PickSubchannelLocked() assumes it will be. // here, since the code in PickSubchannelImpl() assumes it will be.
if (batch->send_initial_metadata) return 0; if (batch->send_initial_metadata) return 0;
if (batch->send_message) return 1; if (batch->send_message) return 1;
if (batch->send_trailing_metadata) return 2; if (batch->send_trailing_metadata) return 2;
@ -2817,12 +2715,7 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
// For batches containing a send_initial_metadata op, acquire the // For batches containing a send_initial_metadata op, acquire the
// channel's data plane mutex to pick a subchannel. // channel's data plane mutex to pick a subchannel.
if (GPR_LIKELY(batch->send_initial_metadata)) { if (GPR_LIKELY(batch->send_initial_metadata)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { PickSubchannel(/*was_queued=*/false);
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
chand_, this);
}
PickSubchannel(this, absl::OkStatus());
} else { } else {
// For all other batches, release the call combiner. // For all other batches, release the call combiner.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
@ -2998,7 +2891,7 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller {
auto* lb_call = self->lb_call_.get(); auto* lb_call = self->lb_call_.get();
auto* chand = lb_call->chand_; auto* chand = lb_call->chand_;
{ {
MutexLock lock(&chand->data_plane_mu_); MutexLock lock(&chand->lb_mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p lb_call=%p: cancelling queued pick: " "chand=%p lb_call=%p: cancelling queued pick: "
@ -3009,7 +2902,9 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller {
if (lb_call->lb_call_canceller_ == self && !error.ok()) { if (lb_call->lb_call_canceller_ == self && !error.ok()) {
lb_call->call_dispatch_controller_->Commit(); lb_call->call_dispatch_controller_->Commit();
// Remove pick from list of queued picks. // Remove pick from list of queued picks.
lb_call->MaybeRemoveCallFromLbQueuedCallsLocked(); lb_call->RemoveCallFromLbQueuedCallsLocked();
// Remove from queued picks list.
chand->lb_queued_calls_.erase(lb_call);
// Fail pending batches on the call. // Fail pending batches on the call.
lb_call->PendingBatchesFail(error, lb_call->PendingBatchesFail(error,
YieldCallCombinerIfPendingBatchesFound); YieldCallCombinerIfPendingBatchesFound);
@ -3023,72 +2918,110 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller {
grpc_closure closure_; grpc_closure closure_;
}; };
void ClientChannel::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() { void ClientChannel::LoadBalancedCall::RemoveCallFromLbQueuedCallsLocked() {
if (!queued_pending_lb_pick_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list", gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
chand_, this); chand_, this);
} }
chand_->RemoveLbQueuedCall(&queued_call_, pollent_); // Remove pollset_set linkage.
queued_pending_lb_pick_ = false; grpc_polling_entity_del_from_pollset_set(pollent_,
chand_->interested_parties_);
// Lame the call combiner canceller. // Lame the call combiner canceller.
lb_call_canceller_ = nullptr; lb_call_canceller_ = nullptr;
// Add trace annotation // Note: There's no need to actually remove the call from the queue
if (call_attempt_tracer_ != nullptr) { // here, beacuse that will be done in either
call_attempt_tracer_->RecordAnnotation("Delayed LB pick complete."); // LbQueuedCallCanceller::CancelLocked() or
} // in ClientChannel::UpdateStateAndPickerLocked().
} }
void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() { void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {
if (queued_pending_lb_pick_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list", gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
chand_, this); chand_, this);
} }
queued_pending_lb_pick_ = true; // Add call's pollent to channel's interested_parties, so that I/O
queued_call_.lb_call = this; // can be done under the call's CQ.
chand_->AddLbQueuedCall(&queued_call_, pollent_); grpc_polling_entity_add_to_pollset_set(pollent_, chand_->interested_parties_);
// Add to queue.
chand_->lb_queued_calls_.insert(this);
// Register call combiner cancellation callback. // Register call combiner cancellation callback.
lb_call_canceller_ = new LbQueuedCallCanceller(Ref()); lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
} }
void ClientChannel::LoadBalancedCall::AsyncPickDone(grpc_error_handle error) { void ClientChannel::LoadBalancedCall::PickSubchannel(bool was_queued) {
// TODO(roth): Does this callback need to hold a ref to LoadBalancedCall? if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx); gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to perform pick",
ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); chand_, this);
} }
// Grab mutex and take a ref to the picker.
void ClientChannel::LoadBalancedCall::PickDone(void* arg, RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
grpc_error_handle error) { {
auto* self = static_cast<LoadBalancedCall*>(arg); MutexLock lock(&chand_->lb_mu_);
picker = chand_->picker_;
}
// We may accumulate multiple pickers here, because if a picker says
// to queue the call, we check again to see if the picker has been
// updated before we queue it.
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>
pickers_to_unref;
while (true) {
// Do pick.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: performing pick with picker=%p",
chand_, this, picker.get());
}
grpc_error_handle error;
bool pick_complete = PickSubchannelImpl(picker.get(), &error);
if (!pick_complete) {
MutexLock lock(&chand_->lb_mu_);
// If picker has been swapped out since we grabbed it, try again.
if (chand_->picker_ != picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: pick not complete, but picker changed",
chand_, this);
}
pickers_to_unref.emplace_back(std::move(picker));
picker = chand_->picker_;
continue;
}
// Otherwise queue the pick to try again later when we get a new picker.
AddCallToLbQueuedCallsLocked();
break;
}
// Pick is complete.
// If it was queued, add a trace annotation.
if (was_queued && call_attempt_tracer_ != nullptr) {
call_attempt_tracer_->RecordAnnotation("Delayed LB pick complete.");
}
// If the pick failed, fail the call.
if (!error.ok()) { if (!error.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p lb_call=%p: failed to pick subchannel: error=%s", "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
self->chand_, self, StatusToString(error).c_str()); chand_, this, StatusToString(error).c_str());
}
self->PendingBatchesFail(error, YieldCallCombiner);
return;
} }
self->call_dispatch_controller_->Commit(); PendingBatchesFail(error, YieldCallCombiner);
self->CreateSubchannelCall(); break;
} }
// Pick succeeded.
void ClientChannel::LoadBalancedCall::PickSubchannel(void* arg, call_dispatch_controller_->Commit();
grpc_error_handle error) { CreateSubchannelCall();
auto* self = static_cast<LoadBalancedCall*>(arg); break;
bool pick_complete;
{
MutexLock lock(&self->chand_->data_plane_mu_);
pick_complete = self->PickSubchannelLocked(&error);
} }
if (pick_complete) { pickers_to_unref.emplace_back(std::move(picker));
PickDone(self, error); // Unref pickers in WorkSerializer.
chand_->work_serializer_->Run(
[pickers_to_unref = std::move(pickers_to_unref)]() mutable {
for (auto& picker : pickers_to_unref) {
picker.reset(DEBUG_LOCATION, "PickSubchannel");
} }
},
DEBUG_LOCATION);
} }
bool ClientChannel::LoadBalancedCall::PickSubchannelLocked( bool ClientChannel::LoadBalancedCall::PickSubchannelImpl(
grpc_error_handle* error) { LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) {
GPR_ASSERT(connected_subchannel_ == nullptr); GPR_ASSERT(connected_subchannel_ == nullptr);
GPR_ASSERT(subchannel_call_ == nullptr); GPR_ASSERT(subchannel_call_ == nullptr);
// Grab initial metadata. // Grab initial metadata.
@ -3103,12 +3036,11 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
pick_args.call_state = &lb_call_state; pick_args.call_state = &lb_call_state;
Metadata initial_metadata(initial_metadata_batch); Metadata initial_metadata(initial_metadata_batch);
pick_args.initial_metadata = &initial_metadata; pick_args.initial_metadata = &initial_metadata;
auto result = chand_->picker_->Pick(pick_args); auto result = picker->Pick(pick_args);
return HandlePickResult<bool>( return HandlePickResult<bool>(
&result, &result,
// CompletePick // CompletePick
[this](LoadBalancingPolicy::PickResult::Complete* complete_pick) [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p lb_call=%p: LB pick succeeded: subchannel=%p", "chand=%p lb_call=%p: LB pick succeeded: subchannel=%p",
@ -3117,8 +3049,8 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
GPR_ASSERT(complete_pick->subchannel != nullptr); GPR_ASSERT(complete_pick->subchannel != nullptr);
// Grab a ref to the connected subchannel while we're still // Grab a ref to the connected subchannel while we're still
// holding the data plane mutex. // holding the data plane mutex.
SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>( SubchannelWrapper* subchannel =
complete_pick->subchannel.get()); static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
connected_subchannel_ = subchannel->connected_subchannel(); connected_subchannel_ = subchannel->connected_subchannel();
// If the subchannel has no connected subchannel (e.g., if the // If the subchannel has no connected subchannel (e.g., if the
// subchannel has moved out of state READY but the LB policy hasn't // subchannel has moved out of state READY but the LB policy hasn't
@ -3131,7 +3063,6 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
"has no connected subchannel; queueing pick", "has no connected subchannel; queueing pick",
chand_, this); chand_, this);
} }
MaybeAddCallToLbQueuedCallsLocked();
return false; return false;
} }
lb_subchannel_call_tracker_ = lb_subchannel_call_tracker_ =
@ -3139,26 +3070,22 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
if (lb_subchannel_call_tracker_ != nullptr) { if (lb_subchannel_call_tracker_ != nullptr) {
lb_subchannel_call_tracker_->Start(); lb_subchannel_call_tracker_->Start();
} }
MaybeRemoveCallFromLbQueuedCallsLocked();
return true; return true;
}, },
// QueuePick // QueuePick
[this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_, gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_,
this); this);
} }
MaybeAddCallToLbQueuedCallsLocked();
return false; return false;
}, },
// FailPick // FailPick
[this, initial_metadata_batch, [this, initial_metadata_batch,
&error](LoadBalancingPolicy::PickResult::Fail* fail_pick) &error](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", chand_,
chand_, this, fail_pick->status.ToString().c_str()); this, fail_pick->status.ToString().c_str());
} }
// If wait_for_ready is false, then the error indicates the RPC // If wait_for_ready is false, then the error indicates the RPC
// attempt's final status. // attempt's final status.
@ -3166,26 +3093,22 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
->value) { ->value) {
*error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode( *error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
std::move(fail_pick->status), "LB pick")); std::move(fail_pick->status), "LB pick"));
MaybeRemoveCallFromLbQueuedCallsLocked();
return true; return true;
} }
// If wait_for_ready is true, then queue to retry when we get a new // If wait_for_ready is true, then queue to retry when we get a new
// picker. // picker.
MaybeAddCallToLbQueuedCallsLocked();
return false; return false;
}, },
// DropPick // DropPick
[this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", chand_,
chand_, this, drop_pick->status.ToString().c_str()); this, drop_pick->status.ToString().c_str());
} }
*error = grpc_error_set_int( *error = grpc_error_set_int(
absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode( absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
std::move(drop_pick->status), "LB drop")), std::move(drop_pick->status), "LB drop")),
StatusIntProperty::kLbPolicyDrop, 1); StatusIntProperty::kLbPolicyDrop, 1);
MaybeRemoveCallFromLbQueuedCallsLocked();
return true; return true;
}); });
} }

@ -24,11 +24,11 @@
#include <atomic> #include <atomic>
#include <map> #include <map>
#include <memory> #include <memory>
#include <set>
#include <string> #include <string>
#include <utility> #include <utility>
#include "absl/base/thread_annotations.h" #include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
@ -222,15 +222,6 @@ class ClientChannel {
std::atomic<bool> done_{false}; std::atomic<bool> done_{false};
}; };
struct ResolverQueuedCall {
grpc_call_element* elem;
ResolverQueuedCall* next = nullptr;
};
struct LbQueuedCall {
LoadBalancedCall* lb_call;
LbQueuedCall* next = nullptr;
};
ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error); ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error);
~ClientChannel(); ~ClientChannel();
@ -246,6 +237,9 @@ class ClientChannel {
// Note: All methods with "Locked" suffix must be invoked from within // Note: All methods with "Locked" suffix must be invoked from within
// work_serializer_. // work_serializer_.
void ReprocessQueuedResolverCalls()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&resolution_mu_);
void OnResolverResultChangedLocked(Resolver::Result result) void OnResolverResultChangedLocked(Resolver::Result result)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void OnResolverErrorLocked(absl::Status status) void OnResolverErrorLocked(absl::Status status)
@ -284,20 +278,6 @@ class ClientChannel {
void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
// These methods all require holding resolution_mu_.
void AddResolverQueuedCall(ResolverQueuedCall* call,
grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
// These methods all require holding data_plane_mu_.
void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
// //
// Fields set at construction and never modified. // Fields set at construction and never modified.
// //
@ -316,9 +296,9 @@ class ClientChannel {
// Fields related to name resolution. Guarded by resolution_mu_. // Fields related to name resolution. Guarded by resolution_mu_.
// //
mutable Mutex resolution_mu_; mutable Mutex resolution_mu_;
// Linked list of calls queued waiting for resolver result. // List of calls queued waiting for resolver result.
ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) = absl::flat_hash_set<grpc_call_element*> resolver_queued_calls_
nullptr; ABSL_GUARDED_BY(resolution_mu_);
// Data from service config. // Data from service config.
absl::Status resolver_transient_failure_error_ absl::Status resolver_transient_failure_error_
ABSL_GUARDED_BY(resolution_mu_); ABSL_GUARDED_BY(resolution_mu_);
@ -330,13 +310,13 @@ class ClientChannel {
ABSL_GUARDED_BY(resolution_mu_); ABSL_GUARDED_BY(resolution_mu_);
// //
// Fields used in the data plane. Guarded by data_plane_mu_. // Fields related to LB picks. Guarded by lb_mu_.
// //
mutable Mutex data_plane_mu_; mutable Mutex lb_mu_;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_ RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
ABSL_GUARDED_BY(data_plane_mu_); ABSL_GUARDED_BY(lb_mu_);
// Linked list of calls queued waiting for LB pick. absl::flat_hash_set<LoadBalancedCall*> lb_queued_calls_
LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr; ABSL_GUARDED_BY(lb_mu_);
// //
// Fields used in the control plane. Guarded by work_serializer. // Fields used in the control plane. Guarded by work_serializer.
@ -360,7 +340,7 @@ class ClientChannel {
// The set of SubchannelWrappers that currently exist. // The set of SubchannelWrappers that currently exist.
// No need to hold a ref, since the map is updated in the control-plane // No need to hold a ref, since the map is updated in the control-plane
// work_serializer when the SubchannelWrappers are created and destroyed. // work_serializer when the SubchannelWrappers are created and destroyed.
std::set<SubchannelWrapper*> subchannel_wrappers_ absl::flat_hash_set<SubchannelWrapper*> subchannel_wrappers_
ABSL_GUARDED_BY(*work_serializer_); ABSL_GUARDED_BY(*work_serializer_);
int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1; int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1;
grpc_error_handle disconnect_error_ ABSL_GUARDED_BY(*work_serializer_); grpc_error_handle disconnect_error_ ABSL_GUARDED_BY(*work_serializer_);
@ -422,16 +402,11 @@ class ClientChannel::LoadBalancedCall
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Invoked by channel for queued LB picks when the picker is updated. void PickSubchannel(bool was_queued);
static void PickSubchannel(void* arg, grpc_error_handle error);
// Helper function for performing an LB pick while holding the data plane // Called by channel when removing a call from the list of queued calls.
// mutex. Returns true if the pick is complete, in which case the caller void RemoveCallFromLbQueuedCallsLocked()
// must invoke PickDone() or AsyncPickDone() with the returned error. ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);
bool PickSubchannelLocked(grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
// Schedules a callback to process the completed pick. The callback
// will not run until after this method returns.
void AsyncPickDone(grpc_error_handle error);
RefCountedPtr<SubchannelCall> subchannel_call() const { RefCountedPtr<SubchannelCall> subchannel_call() const {
return subchannel_call_; return subchannel_call_;
@ -479,14 +454,14 @@ class ClientChannel::LoadBalancedCall
void RecordCallCompletion(absl::Status status); void RecordCallCompletion(absl::Status status);
void CreateSubchannelCall(); void CreateSubchannelCall();
// Invoked when a pick is completed, on both success or failure.
static void PickDone(void* arg, grpc_error_handle error); // Helper function for performing an LB pick with a specified picker.
// Removes the call from the channel's list of queued picks if present. // Returns true if the pick is complete.
void MaybeRemoveCallFromLbQueuedCallsLocked() bool PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker* picker,
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); grpc_error_handle* error);
// Adds the call to the channel's list of queued picks if not already present. // Adds the call to the channel's list of queued picks if not already present.
void MaybeAddCallToLbQueuedCallsLocked() void AddCallToLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);
ClientChannel* chand_; ClientChannel* chand_;
@ -513,15 +488,9 @@ class ClientChannel::LoadBalancedCall
// Set when we fail inside the LB call. // Set when we fail inside the LB call.
grpc_error_handle failure_error_; grpc_error_handle failure_error_;
grpc_closure pick_closure_; // Accessed while holding ClientChannel::lb_mu_.
// Accessed while holding ClientChannel::data_plane_mu_.
ClientChannel::LbQueuedCall queued_call_
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_);
bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) =
false;
LbQueuedCallCanceller* lb_call_canceller_ LbQueuedCallCanceller* lb_call_canceller_
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr; ABSL_GUARDED_BY(&ClientChannel::lb_mu_) = nullptr;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
const BackendMetricData* backend_metric_data_ = nullptr; const BackendMetricData* backend_metric_data_ = nullptr;

@ -61,6 +61,7 @@
#include <string.h> #include <string.h>
#include <algorithm> #include <algorithm>
#include <atomic>
#include <initializer_list> #include <initializer_list>
#include <map> #include <map>
#include <memory> #include <memory>
@ -389,19 +390,15 @@ class GrpcLb : public LoadBalancingPolicy {
// Returns the LB token to use for a drop, or null if the call // Returns the LB token to use for a drop, or null if the call
// should not be dropped. // should not be dropped.
// //
// Note: This is called from the picker, so it will be invoked in // Note: This is called from the picker, NOT from inside the control
// the channel's data plane mutex, NOT the control plane // plane work_serializer.
// work_serializer. It should not be accessed by any other part of the LB
// policy.
const char* ShouldDrop(); const char* ShouldDrop();
private: private:
std::vector<GrpcLbServer> serverlist_; std::vector<GrpcLbServer> serverlist_;
// Guarded by the channel's data plane mutex, NOT the control // Accessed from the picker, so needs synchronization.
// plane work_serializer. It should not be accessed by anything but the std::atomic<size_t> drop_index_{0};
// picker via the ShouldDrop() method.
size_t drop_index_ = 0;
}; };
class Picker : public SubchannelPicker { class Picker : public SubchannelPicker {
@ -717,8 +714,8 @@ bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
const char* GrpcLb::Serverlist::ShouldDrop() { const char* GrpcLb::Serverlist::ShouldDrop() {
if (serverlist_.empty()) return nullptr; if (serverlist_.empty()) return nullptr;
GrpcLbServer& server = serverlist_[drop_index_]; size_t index = drop_index_.fetch_add(1, std::memory_order_relaxed);
drop_index_ = (drop_index_ + 1) % serverlist_.size(); GrpcLbServer& server = serverlist_[index % serverlist_.size()];
return server.drop ? server.load_balance_token : nullptr; return server.drop ? server.load_balance_token : nullptr;
} }

@ -22,7 +22,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <algorithm> #include <algorithm>
#include <atomic>
#include <cmath> #include <cmath>
#include <memory> #include <memory>
#include <string> #include <string>
@ -30,7 +29,6 @@
#include <vector> #include <vector>
#include "absl/base/attributes.h" #include "absl/base/attributes.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/inlined_vector.h" #include "absl/container/inlined_vector.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
@ -39,11 +37,10 @@
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include <grpc/grpc.h>
#define XXH_INLINE_ALL #define XXH_INLINE_ALL
#include "xxhash.h" #include "xxhash.h"
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h> #include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
@ -55,8 +52,8 @@
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
@ -143,8 +140,6 @@ class RingHash : public LoadBalancingPolicy {
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
private: private:
~RingHash() override;
// Forward declaration. // Forward declaration.
class RingHashSubchannelList; class RingHashSubchannelList;
@ -165,13 +160,11 @@ class RingHash : public LoadBalancingPolicy {
const ServerAddress& address() const { return address_; } const ServerAddress& address() const { return address_; }
grpc_connectivity_state GetConnectivityState() const { grpc_connectivity_state logical_connectivity_state() const {
return connectivity_state_.load(std::memory_order_relaxed); return logical_connectivity_state_;
} }
const absl::Status& logical_connectivity_status() const {
absl::Status GetConnectivityStatus() const { return logical_connectivity_status_;
MutexLock lock(&mu_);
return connectivity_status_;
} }
private: private:
@ -188,20 +181,28 @@ class RingHash : public LoadBalancingPolicy {
// subchannel in some cases; for example, once this is set to // subchannel in some cases; for example, once this is set to
// TRANSIENT_FAILURE, we do not change it again until we get READY, // TRANSIENT_FAILURE, we do not change it again until we get READY,
// so we skip any interim stops in CONNECTING. // so we skip any interim stops in CONNECTING.
// Uses an atomic so that it can be accessed outside of the WorkSerializer. grpc_connectivity_state logical_connectivity_state_ = GRPC_CHANNEL_IDLE;
std::atomic<grpc_connectivity_state> connectivity_state_{GRPC_CHANNEL_IDLE}; absl::Status logical_connectivity_status_;
mutable Mutex mu_;
absl::Status connectivity_status_ ABSL_GUARDED_BY(&mu_);
}; };
// A list of subchannels and the ring containing those subchannels. // A list of subchannels and the ring containing those subchannels.
class RingHashSubchannelList class RingHashSubchannelList
: public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> { : public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
public:
class Ring : public RefCounted<Ring> {
public: public:
struct RingEntry { struct RingEntry {
uint64_t hash; uint64_t hash;
RingHashSubchannelData* subchannel; size_t subchannel_index;
};
Ring(RingHashLbConfig* config, RingHashSubchannelList* subchannel_list,
const ChannelArgs& args);
const std::vector<RingEntry>& ring() const { return ring_; }
private:
std::vector<RingEntry> ring_;
}; };
RingHashSubchannelList(RingHash* policy, ServerAddressList addresses, RingHashSubchannelList(RingHash* policy, ServerAddressList addresses,
@ -212,7 +213,7 @@ class RingHash : public LoadBalancingPolicy {
p->Unref(DEBUG_LOCATION, "subchannel_list"); p->Unref(DEBUG_LOCATION, "subchannel_list");
} }
const std::vector<RingEntry>& ring() const { return ring_; } RefCountedPtr<Ring> ring() { return ring_; }
// Updates the counters of subchannels in each state when a // Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state. // subchannel transitions from old_state to new_state.
@ -236,7 +237,7 @@ class RingHash : public LoadBalancingPolicy {
size_t num_connecting_ = 0; size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0; size_t num_transient_failure_ = 0;
std::vector<RingEntry> ring_; RefCountedPtr<Ring> ring_;
// The index of the subchannel currently doing an internally // The index of the subchannel currently doing an internally
// triggered connection attempt, if any. // triggered connection attempt, if any.
@ -251,24 +252,31 @@ class RingHash : public LoadBalancingPolicy {
class Picker : public SubchannelPicker { class Picker : public SubchannelPicker {
public: public:
explicit Picker(RefCountedPtr<RingHashSubchannelList> subchannel_list) Picker(RefCountedPtr<RingHash> ring_hash_lb,
: subchannel_list_(std::move(subchannel_list)) {} RingHashSubchannelList* subchannel_list)
: ring_hash_lb_(std::move(ring_hash_lb)),
~Picker() override { ring_(subchannel_list->ring()) {
// Hop into WorkSerializer to unref the subchannel list, since that may subchannels_.reserve(subchannel_list->num_subchannels());
// trigger the unreffing of the underlying subchannels. for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
MakeOrphanable<WorkSerializerRunner>(std::move(subchannel_list_)); RingHashSubchannelData* subchannel_data =
subchannel_list->subchannel(i);
subchannels_.emplace_back(
SubchannelInfo{subchannel_data->subchannel()->Ref(),
subchannel_data->logical_connectivity_state(),
subchannel_data->logical_connectivity_status()});
}
} }
PickResult Pick(PickArgs args) override; PickResult Pick(PickArgs args) override;
private: private:
// An interface for running a callback in the control plane WorkSerializer. // A fire-and-forget class that schedules subchannel connection attempts
class WorkSerializerRunner : public Orphanable { // on the control plane WorkSerializer.
class SubchannelConnectionAttempter : public Orphanable {
public: public:
explicit WorkSerializerRunner( explicit SubchannelConnectionAttempter(
RefCountedPtr<RingHashSubchannelList> subchannel_list) RefCountedPtr<RingHash> ring_hash_lb)
: subchannel_list_(std::move(subchannel_list)) { : ring_hash_lb_(std::move(ring_hash_lb)) {
GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr); GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
} }
@ -278,56 +286,43 @@ class RingHash : public LoadBalancingPolicy {
ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus()); ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
} }
// Will be invoked inside of the WorkSerializer. void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
virtual void Run() {} subchannels_.push_back(std::move(subchannel));
protected:
RingHash* ring_hash_lb() const {
return static_cast<RingHash*>(subchannel_list_->policy());
} }
private: private:
static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) { static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<WorkSerializerRunner*>(arg); auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
self->ring_hash_lb()->work_serializer()->Run( self->ring_hash_lb_->work_serializer()->Run(
[self]() { [self]() {
self->Run(); if (!self->ring_hash_lb_->shutdown_) {
for (auto& subchannel : self->subchannels_) {
subchannel->RequestConnection();
}
}
delete self; delete self;
}, },
DEBUG_LOCATION); DEBUG_LOCATION);
} }
RefCountedPtr<RingHashSubchannelList> subchannel_list_; RefCountedPtr<RingHash> ring_hash_lb_;
grpc_closure closure_; grpc_closure closure_;
std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
}; };
// A fire-and-forget class that schedules subchannel connection attempts struct SubchannelInfo {
// on the control plane WorkSerializer. RefCountedPtr<SubchannelInterface> subchannel;
class SubchannelConnectionAttempter : public WorkSerializerRunner { grpc_connectivity_state state;
public: absl::Status status;
explicit SubchannelConnectionAttempter(
RefCountedPtr<RingHashSubchannelList> subchannel_list)
: WorkSerializerRunner(std::move(subchannel_list)) {}
void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
subchannels_.push_back(std::move(subchannel));
}
void Run() override {
if (!ring_hash_lb()->shutdown_) {
for (auto& subchannel : subchannels_) {
subchannel->RequestConnection();
}
}
}
private:
std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
}; };
RefCountedPtr<RingHashSubchannelList> subchannel_list_; RefCountedPtr<RingHash> ring_hash_lb_;
RefCountedPtr<RingHashSubchannelList::Ring> ring_;
std::vector<SubchannelInfo> subchannels_;
}; };
~RingHash() override;
void ShutdownLocked() override; void ShutdownLocked() override;
// Current config from resolver. // Current config from resolver.
@ -353,7 +348,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
return PickResult::Fail( return PickResult::Fail(
absl::InternalError("ring hash value is not a number")); absl::InternalError("ring hash value is not a number"));
} }
const auto& ring = subchannel_list_->ring(); const auto& ring = ring_->ring();
// Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
// (ketama_get_server) NOTE: The algorithm depends on using signed integers // (ketama_get_server) NOTE: The algorithm depends on using signed integers
// for lowp, highp, and first_index. Do not change them! // for lowp, highp, and first_index. Do not change them!
@ -386,27 +381,25 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
[&](RefCountedPtr<SubchannelInterface> subchannel) { [&](RefCountedPtr<SubchannelInterface> subchannel) {
if (subchannel_connection_attempter == nullptr) { if (subchannel_connection_attempter == nullptr) {
subchannel_connection_attempter = subchannel_connection_attempter =
MakeOrphanable<SubchannelConnectionAttempter>( MakeOrphanable<SubchannelConnectionAttempter>(ring_hash_lb_->Ref(
subchannel_list_->Ref(DEBUG_LOCATION, DEBUG_LOCATION, "SubchannelConnectionAttempter"));
"SubchannelConnectionAttempter"));
} }
subchannel_connection_attempter->AddSubchannel(std::move(subchannel)); subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
}; };
switch (ring[first_index].subchannel->GetConnectivityState()) { SubchannelInfo& first_subchannel =
subchannels_[ring[first_index].subchannel_index];
switch (first_subchannel.state) {
case GRPC_CHANNEL_READY: case GRPC_CHANNEL_READY:
return PickResult::Complete( return PickResult::Complete(first_subchannel.subchannel);
ring[first_index].subchannel->subchannel()->Ref());
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt( ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel);
ring[first_index].subchannel->subchannel()->Ref());
ABSL_FALLTHROUGH_INTENDED; ABSL_FALLTHROUGH_INTENDED;
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
return PickResult::Queue(); return PickResult::Queue();
default: // GRPC_CHANNEL_TRANSIENT_FAILURE default: // GRPC_CHANNEL_TRANSIENT_FAILURE
break; break;
} }
ScheduleSubchannelConnectionAttempt( ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel);
ring[first_index].subchannel->subchannel()->Ref());
// Loop through remaining subchannels to find one in READY. // Loop through remaining subchannels to find one in READY.
// On the way, we make sure the right set of connection attempts // On the way, we make sure the right set of connection attempts
// will happen. // will happen.
@ -414,19 +407,17 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
bool found_first_non_failed = false; bool found_first_non_failed = false;
for (size_t i = 1; i < ring.size(); ++i) { for (size_t i = 1; i < ring.size(); ++i) {
const auto& entry = ring[(first_index + i) % ring.size()]; const auto& entry = ring[(first_index + i) % ring.size()];
if (entry.subchannel == ring[first_index].subchannel) { if (entry.subchannel_index == ring[first_index].subchannel_index) {
continue; continue;
} }
grpc_connectivity_state connectivity_state = SubchannelInfo& subchannel_info = subchannels_[entry.subchannel_index];
entry.subchannel->GetConnectivityState(); if (subchannel_info.state == GRPC_CHANNEL_READY) {
if (connectivity_state == GRPC_CHANNEL_READY) { return PickResult::Complete(subchannel_info.subchannel);
return PickResult::Complete(entry.subchannel->subchannel()->Ref());
} }
if (!found_second_subchannel) { if (!found_second_subchannel) {
switch (connectivity_state) { switch (subchannel_info.state) {
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt( ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
entry.subchannel->subchannel()->Ref());
ABSL_FALLTHROUGH_INTENDED; ABSL_FALLTHROUGH_INTENDED;
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
return PickResult::Queue(); return PickResult::Queue();
@ -436,13 +427,11 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
found_second_subchannel = true; found_second_subchannel = true;
} }
if (!found_first_non_failed) { if (!found_first_non_failed) {
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (subchannel_info.state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
ScheduleSubchannelConnectionAttempt( ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
entry.subchannel->subchannel()->Ref());
} else { } else {
if (connectivity_state == GRPC_CHANNEL_IDLE) { if (subchannel_info.state == GRPC_CHANNEL_IDLE) {
ScheduleSubchannelConnectionAttempt( ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
entry.subchannel->subchannel()->Ref());
} }
found_first_non_failed = true; found_first_non_failed = true;
} }
@ -450,27 +439,16 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
} }
return PickResult::Fail(absl::UnavailableError(absl::StrCat( return PickResult::Fail(absl::UnavailableError(absl::StrCat(
"ring hash cannot find a connected subchannel; first failure: ", "ring hash cannot find a connected subchannel; first failure: ",
ring[first_index].subchannel->GetConnectivityStatus().ToString()))); first_subchannel.status.ToString())));
} }
// //
// RingHash::RingHashSubchannelList // RingHash::RingHashSubchannelList::Ring
// //
RingHash::RingHashSubchannelList::RingHashSubchannelList( RingHash::RingHashSubchannelList::Ring::Ring(
RingHash* policy, ServerAddressList addresses, const ChannelArgs& args) RingHashLbConfig* config, RingHashSubchannelList* subchannel_list,
: SubchannelList(policy, const ChannelArgs& args) {
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)
? "RingHashSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args),
num_idle_(num_subchannels()) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
// Construct the ring.
// Store the weights while finding the sum. // Store the weights while finding the sum.
struct AddressWeight { struct AddressWeight {
std::string address; std::string address;
@ -481,9 +459,9 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList(
}; };
std::vector<AddressWeight> address_weights; std::vector<AddressWeight> address_weights;
size_t sum = 0; size_t sum = 0;
address_weights.reserve(num_subchannels()); address_weights.reserve(subchannel_list->num_subchannels());
for (size_t i = 0; i < num_subchannels(); ++i) { for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
RingHashSubchannelData* sd = subchannel(i); RingHashSubchannelData* sd = subchannel_list->subchannel(i);
const ServerAddressWeightAttribute* weight_attribute = static_cast< const ServerAddressWeightAttribute* weight_attribute = static_cast<
const ServerAddressWeightAttribute*>(sd->address().GetAttribute( const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
@ -517,10 +495,8 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList(
// to fit. // to fit.
const size_t ring_size_cap = args.GetInt(GRPC_ARG_RING_HASH_LB_RING_SIZE_CAP) const size_t ring_size_cap = args.GetInt(GRPC_ARG_RING_HASH_LB_RING_SIZE_CAP)
.value_or(kRingSizeCapDefault); .value_or(kRingSizeCapDefault);
const size_t min_ring_size = const size_t min_ring_size = std::min(config->min_ring_size(), ring_size_cap);
std::min(policy->config_->min_ring_size(), ring_size_cap); const size_t max_ring_size = std::min(config->max_ring_size(), ring_size_cap);
const size_t max_ring_size =
std::min(policy->config_->max_ring_size(), ring_size_cap);
const double scale = std::min( const double scale = std::min(
std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight, std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
static_cast<double>(max_ring_size)); static_cast<double>(max_ring_size));
@ -537,7 +513,7 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList(
double target_hashes = 0.0; double target_hashes = 0.0;
uint64_t min_hashes_per_host = ring_size; uint64_t min_hashes_per_host = ring_size;
uint64_t max_hashes_per_host = 0; uint64_t max_hashes_per_host = 0;
for (size_t i = 0; i < num_subchannels(); ++i) { for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
const std::string& address_string = address_weights[i].address; const std::string& address_string = address_weights[i].address;
hash_key_buffer.assign(address_string.begin(), address_string.end()); hash_key_buffer.assign(address_string.begin(), address_string.end());
hash_key_buffer.emplace_back('_'); hash_key_buffer.emplace_back('_');
@ -550,7 +526,7 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList(
absl::string_view hash_key(hash_key_buffer.data(), absl::string_view hash_key(hash_key_buffer.data(),
hash_key_buffer.size()); hash_key_buffer.size());
const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0); const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
ring_.push_back({hash, subchannel(i)}); ring_.push_back({hash, i});
++count; ++count;
++current_hashes; ++current_hashes;
hash_key_buffer.erase(offset_start, hash_key_buffer.end()); hash_key_buffer.erase(offset_start, hash_key_buffer.end());
@ -561,14 +537,34 @@ RingHash::RingHashSubchannelList::RingHashSubchannelList(
std::max(static_cast<uint64_t>(i), max_hashes_per_host); std::max(static_cast<uint64_t>(i), max_hashes_per_host);
} }
std::sort(ring_.begin(), ring_.end(), std::sort(ring_.begin(), ring_.end(),
[](const RingHashSubchannelList::RingEntry& lhs, [](const RingEntry& lhs, const RingEntry& rhs) -> bool {
const RingHashSubchannelList::RingEntry& rhs) -> bool {
return lhs.hash < rhs.hash; return lhs.hash < rhs.hash;
}); });
}
//
// RingHash::RingHashSubchannelList
//
RingHash::RingHashSubchannelList::RingHashSubchannelList(
RingHash* policy, ServerAddressList addresses, const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)
? "RingHashSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args),
num_idle_(num_subchannels()) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
// Construct the ring.
ring_ = MakeRefCounted<Ring>(policy->config_.get(), this, args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[RH %p] created subchannel list %p with %" PRIuPTR " ring entries", "[RH %p] created subchannel list %p with %" PRIuPTR " ring entries",
policy, this, ring_.size()); policy, this, ring_->ring().size());
} }
} }
@ -660,7 +656,7 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
// Note that we use our own picker regardless of connectivity state. // Note that we use our own picker regardless of connectivity state.
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
state, status, state, status,
MakeRefCounted<Picker>(Ref(DEBUG_LOCATION, "RingHashPicker"))); MakeRefCounted<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"), this));
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy. // not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to // However, because the ring_hash policy does not attempt to
@ -707,7 +703,6 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state, absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) { grpc_connectivity_state new_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy()); RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
grpc_connectivity_state last_connectivity_state = GetConnectivityState();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log( gpr_log(
GPR_INFO, GPR_INFO,
@ -715,7 +710,7 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(), p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(), subchannel_list()->num_subchannels(),
ConnectivityStateName(last_connectivity_state), ConnectivityStateName(logical_connectivity_state_),
ConnectivityStateName(new_state)); ConnectivityStateName(new_state));
} }
GPR_ASSERT(subchannel() != nullptr); GPR_ASSERT(subchannel() != nullptr);
@ -735,34 +730,23 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
const bool connection_attempt_complete = new_state != GRPC_CHANNEL_CONNECTING; const bool connection_attempt_complete = new_state != GRPC_CHANNEL_CONNECTING;
// Decide what state to report for the purposes of aggregation and // Decide what state to report for the purposes of aggregation and
// picker behavior. // picker behavior.
// If the last recorded state was TRANSIENT_FAILURE, ignore the update // If the last recorded state was TRANSIENT_FAILURE, ignore the change
// unless the new state is READY. // unless the new state is READY (or TF again, in which case we need
bool update_status = true; // to update the status).
absl::Status status = connectivity_status(); if (logical_connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
if (last_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE && new_state == GRPC_CHANNEL_READY ||
new_state != GRPC_CHANNEL_READY && new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
new_state != GRPC_CHANNEL_TRANSIENT_FAILURE) {
new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
{
MutexLock lock(&mu_);
status = connectivity_status_;
}
update_status = false;
}
// Update state counters used for aggregation. // Update state counters used for aggregation.
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state, subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_,
new_state); new_state);
// Update status seen by picker if needed. // Update logical state.
if (update_status) { logical_connectivity_state_ = new_state;
MutexLock lock(&mu_); logical_connectivity_status_ = connectivity_status();
connectivity_status_ = connectivity_status();
} }
// Update last seen state, also used by picker.
connectivity_state_.store(new_state, std::memory_order_relaxed);
// Update the RH policy's connectivity state, creating new picker and new // Update the RH policy's connectivity state, creating new picker and new
// ring. // ring.
subchannel_list()->UpdateRingHashConnectivityStateLocked( subchannel_list()->UpdateRingHashConnectivityStateLocked(
Index(), connection_attempt_complete, status); Index(), connection_attempt_complete, logical_connectivity_status_);
} }
// //

@ -365,7 +365,6 @@ class RlsLb : public LoadBalancingPolicy {
class Picker : public LoadBalancingPolicy::SubchannelPicker { class Picker : public LoadBalancingPolicy::SubchannelPicker {
public: public:
explicit Picker(RefCountedPtr<RlsLb> lb_policy); explicit Picker(RefCountedPtr<RlsLb> lb_policy);
~Picker() override;
PickResult Pick(PickArgs args) override; PickResult Pick(PickArgs args) override;
@ -1008,19 +1007,6 @@ RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
} }
} }
RlsLb::Picker::~Picker() {
// It's not safe to unref the default child policy in the picker,
// since that needs to be done in the WorkSerializer.
if (default_child_policy_ != nullptr) {
auto* default_child_policy = default_child_policy_.release();
lb_policy_->work_serializer()->Run(
[default_child_policy]() {
default_child_policy->Unref(DEBUG_LOCATION, "Picker");
},
DEBUG_LOCATION);
}
}
LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) { LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
// Construct key for request. // Construct key for request.
RequestKey key = {BuildKeyMap(config_->key_builder_map(), args.path, RequestKey key = {BuildKeyMap(config_->key_builder_map(), args.path,

@ -18,13 +18,16 @@
#include <inttypes.h> #include <inttypes.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <algorithm> #include <algorithm>
#include <atomic>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "absl/random/random.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "absl/strings/str_cat.h" #include "absl/strings/str_cat.h"
@ -174,7 +177,7 @@ class RoundRobin : public LoadBalancingPolicy {
// Using pointer value only, no ref held -- do not dereference! // Using pointer value only, no ref held -- do not dereference!
RoundRobin* parent_; RoundRobin* parent_;
size_t last_picked_index_; std::atomic<size_t> last_picked_index_;
std::vector<RefCountedPtr<SubchannelInterface>> subchannels_; std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
}; };
@ -189,6 +192,8 @@ class RoundRobin : public LoadBalancingPolicy {
RefCountedPtr<RoundRobinSubchannelList> latest_pending_subchannel_list_; RefCountedPtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
bool shutdown_ = false; bool shutdown_ = false;
absl::BitGen bit_gen_;
}; };
// //
@ -207,27 +212,26 @@ RoundRobin::Picker::Picker(RoundRobin* parent,
} }
// For discussion on why we generate a random starting index for // For discussion on why we generate a random starting index for
// the picker, see https://github.com/grpc/grpc-go/issues/2580. // the picker, see https://github.com/grpc/grpc-go/issues/2580.
// TODO(roth): rand(3) is not thread-safe. This should be replaced with size_t index =
// something better as part of https://github.com/grpc/grpc/issues/17891. absl::Uniform<size_t>(parent->bit_gen_, 0, subchannels_.size());
last_picked_index_ = rand() % subchannels_.size(); last_picked_index_.store(index, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[RR %p picker %p] created picker from subchannel_list=%p " "[RR %p picker %p] created picker from subchannel_list=%p "
"with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR, "with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR,
parent_, this, subchannel_list, subchannels_.size(), parent_, this, subchannel_list, subchannels_.size(), index);
last_picked_index_);
} }
} }
RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs /*args*/) { RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs /*args*/) {
last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size(); size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
subchannels_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p", "[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p",
parent_, this, last_picked_index_, parent_, this, index, subchannels_[index].get());
subchannels_[last_picked_index_].get());
} }
return PickResult::Complete(subchannels_[last_picked_index_]); return PickResult::Complete(subchannels_[index]);
} }
// //

@ -26,6 +26,7 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/random/random.h" #include "absl/random/random.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
@ -46,6 +47,7 @@
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/gprpp/work_serializer.h"
@ -138,7 +140,11 @@ class WeightedTargetLb : public LoadBalancingPolicy {
private: private:
PickerList pickers_; PickerList pickers_;
absl::BitGen bit_gen_;
// TODO(roth): Consider using a separate thread-local BitGen for each CPU
// to avoid the need for this mutex.
Mutex mu_;
absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_);
}; };
// Each WeightedChild holds a ref to its parent WeightedTargetLb. // Each WeightedChild holds a ref to its parent WeightedTargetLb.
@ -247,8 +253,10 @@ class WeightedTargetLb : public LoadBalancingPolicy {
WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick( WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
PickArgs args) { PickArgs args) {
// Generate a random number in [0, total weight). // Generate a random number in [0, total weight).
const uint64_t key = const uint64_t key = [&]() {
absl::Uniform<uint64_t>(bit_gen_, 0, pickers_.back().first); MutexLock lock(&mu_);
return absl::Uniform<uint64_t>(bit_gen_, 0, pickers_.back().first);
}();
// Find the index in pickers_ corresponding to key. // Find the index in pickers_ corresponding to key.
size_t mid = 0; size_t mid = 0;
size_t start_index = 0; size_t start_index = 0;

@ -1351,8 +1351,9 @@ RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {
this); this);
} }
CallAttempt* call_attempt = std::exchange(call_attempt_, nullptr); CallAttempt* call_attempt = std::exchange(call_attempt_, nullptr);
GRPC_CALL_STACK_UNREF(call_attempt->calld_->owning_call_, "Retry BatchData"); grpc_call_stack* owning_call = call_attempt->calld_->owning_call_;
call_attempt->Unref(DEBUG_LOCATION, "~BatchData"); call_attempt->Unref(DEBUG_LOCATION, "~BatchData");
GRPC_CALL_STACK_UNREF(owning_call, "Retry BatchData");
} }
void RetryFilter::CallData::CallAttempt::BatchData:: void RetryFilter::CallData::CallAttempt::BatchData::

@ -93,11 +93,14 @@ std::string XdsEndpointResource::Priority::ToString() const {
} }
bool XdsEndpointResource::DropConfig::ShouldDrop( bool XdsEndpointResource::DropConfig::ShouldDrop(
const std::string** category_name) const { const std::string** category_name) {
for (size_t i = 0; i < drop_category_list_.size(); ++i) { for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i]; const auto& drop_category = drop_category_list_[i];
// Generate a random number in [0, 1000000). // Generate a random number in [0, 1000000).
const uint32_t random = static_cast<uint32_t>(rand()) % 1000000; const uint32_t random = [&]() {
MutexLock lock(&mu_);
return absl::Uniform<uint32_t>(bit_gen_, 0, 1000000);
}();
if (random < drop_category.parts_per_million) { if (random < drop_category.parts_per_million) {
*category_name = &drop_category.name; *category_name = &drop_category.name;
return true; return true;

@ -28,6 +28,8 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/random/random.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "envoy/config/endpoint/v3/endpoint.upbdefs.h" #include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
#include "upb/def.h" #include "upb/def.h"
@ -38,6 +40,7 @@
#include "src/core/ext/xds/xds_resource_type_impl.h" #include "src/core/ext/xds/xds_resource_type_impl.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/resolver/server_address.h" #include "src/core/lib/resolver/server_address.h"
namespace grpc_core { namespace grpc_core {
@ -90,7 +93,7 @@ struct XdsEndpointResource : public XdsResourceType::ResourceData {
// The only method invoked from outside the WorkSerializer (used in // The only method invoked from outside the WorkSerializer (used in
// the data plane). // the data plane).
bool ShouldDrop(const std::string** category_name) const; bool ShouldDrop(const std::string** category_name);
const DropCategoryList& drop_category_list() const { const DropCategoryList& drop_category_list() const {
return drop_category_list_; return drop_category_list_;
@ -108,6 +111,11 @@ struct XdsEndpointResource : public XdsResourceType::ResourceData {
private: private:
DropCategoryList drop_category_list_; DropCategoryList drop_category_list_;
bool drop_all_ = false; bool drop_all_ = false;
// TODO(roth): Consider using a separate thread-local BitGen for each CPU
// to avoid the need for this mutex.
Mutex mu_;
absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_);
}; };
PriorityList priorities; PriorityList priorities;

@ -69,19 +69,15 @@ LoadBalancingPolicy::SubchannelPicker::SubchannelPicker()
LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
PickArgs /*args*/) { PickArgs /*args*/) {
// We invoke the parent's ExitIdleLocked() via a closure instead // We invoke the parent's ExitIdleLocked() via a closure instead
// of doing it directly here, for two reasons: // of doing it directly here because ExitIdleLocked() may cause the
// 1. ExitIdleLocked() may cause the policy's state to change and // policy's state to change and a new picker to be delivered to the
// a new picker to be delivered to the channel. If that new // channel. If that new picker is delivered before ExitIdleLocked()
// picker is delivered before ExitIdleLocked() returns, then by // returns, then by the time this function returns, the pick will already
// the time this function returns, the pick will already have // have been processed, and we'll be trying to re-process the same pick
// been processed, and we'll be trying to re-process the same // again, leading to a crash.
// pick again, leading to a crash. MutexLock lock(&mu_);
// 2. We are currently running in the data plane mutex, but we if (parent_ != nullptr) {
// need to bounce into the control plane work_serializer to call auto* parent = parent_.release(); // ref held by lambda.
// ExitIdleLocked().
if (!exit_idle_called_ && parent_ != nullptr) {
exit_idle_called_ = true;
auto* parent = parent_->Ref().release(); // ref held by lambda.
ExecCtx::Run(DEBUG_LOCATION, ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE( GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle /*error*/) { [](void* arg, grpc_error_handle /*error*/) {

@ -27,6 +27,7 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
@ -44,6 +45,7 @@
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/load_balancing/subchannel_interface.h" #include "src/core/lib/load_balancing/subchannel_interface.h"
@ -392,8 +394,8 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
PickResult Pick(PickArgs args) override; PickResult Pick(PickArgs args) override;
private: private:
RefCountedPtr<LoadBalancingPolicy> parent_; Mutex mu_;
bool exit_idle_called_ = false; RefCountedPtr<LoadBalancingPolicy> parent_ ABSL_GUARDED_BY(&mu_);
}; };
// A picker that returns PickResult::Fail for all picks. // A picker that returns PickResult::Fail for all picks.

Loading…
Cancel
Save