[client_channel] SubchannelWrapper hops into WorkSerializer before destruction (#34307)

Instead of having the per-call code hop into the `WorkSerializer` to
unref the pickers, have the `SubchannelWrapper` itself hop into the
`WorkSerializer` before it is destroyed.

This also reverts the change made to the WRR picker in #34077, since
that is no longer necessary.
pull/34311/head
Mark D. Roth 1 year ago committed by GitHub
parent 97571ebf81
commit aa2bd10072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 32
      src/core/ext/filters/client_channel/client_channel.cc
  3. 7
      src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc

@ -3005,7 +3005,6 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/cleanup",
"absl/container:flat_hash_set",
"absl/container:inlined_vector",
"absl/functional:any_invocable",

@ -29,7 +29,6 @@
#include <utility>
#include <vector>
#include "absl/cleanup/cleanup.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/cord.h"
@ -662,6 +661,12 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
}
void Orphan() override {
// Make sure we release the last ref inside the WorkSerializer, so
// that we can update the channel's subchannel maps.
chand_->work_serializer_->Run([self = WeakRef()]() {}, DEBUG_LOCATION);
}
void WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
@ -2770,47 +2775,34 @@ void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {
absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
bool was_queued) {
// We may accumulate multiple pickers here, because if a picker says
// to queue the call, we check again to see if the picker has been
// updated before we queue it.
// We need to unref pickers in the WorkSerializer.
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
auto cleanup = absl::MakeCleanup([&]() {
chand_->work_serializer_->Run(
[pickers = std::move(pickers)]() mutable {
for (auto& picker : pickers) {
picker.reset(DEBUG_LOCATION, "PickSubchannel");
}
},
DEBUG_LOCATION);
});
// Grab mutex and take a ref to the picker.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to get picker",
chand_, this);
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
{
MutexLock lock(&chand_->lb_mu_);
pickers.emplace_back(chand_->picker_);
picker = chand_->picker_;
}
while (true) {
// Do pick.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: performing pick with picker=%p",
chand_, this, pickers.back().get());
chand_, this, picker.get());
}
grpc_error_handle error;
bool pick_complete = PickSubchannelImpl(pickers.back().get(), &error);
bool pick_complete = PickSubchannelImpl(picker.get(), &error);
if (!pick_complete) {
MutexLock lock(&chand_->lb_mu_);
// If picker has been swapped out since we grabbed it, try again.
if (chand_->picker_ != pickers.back()) {
if (picker != chand_->picker_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: pick not complete, but picker changed",
chand_, this);
}
pickers.emplace_back(chand_->picker_);
picker = chand_->picker_;
continue;
}
// Otherwise queue the pick to try again later when we get a new picker.

@ -608,9 +608,7 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
// Start timer.
WeakRefCountedPtr<Picker> self = WeakRef();
timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter(
config_->weight_update_period(),
[self = std::move(self),
work_serializer = wrr_->work_serializer()]() mutable {
config_->weight_update_period(), [self = std::move(self)]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
{
@ -623,8 +621,7 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
self->BuildSchedulerAndStartTimerLocked();
}
}
// Release the picker ref inside the WorkSerializer.
work_serializer->Run([self = std::move(self)]() {}, DEBUG_LOCATION);
self.reset();
});
}

Loading…
Cancel
Save