fix LB policy call status notification interface, and other improvements (#27947)

* fix LB policy call status notification interface, and other improvements

* fix build

* address review comments
pull/27955/head
Mark D. Roth 3 years ago committed by GitHub
parent 296f596a96
commit 7541a3acda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      src/core/ext/filters/client_channel/backend_metric.cc
  2. 4
      src/core/ext/filters/client_channel/backend_metric.h
  3. 69
      src/core/ext/filters/client_channel/client_channel.cc
  4. 9
      src/core/ext/filters/client_channel/client_channel.h
  5. 134
      src/core/ext/filters/client_channel/lb_policy.h
  6. 116
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  7. 31
      test/core/util/test_lb_policies.cc
  8. 3
      test/core/util/test_lb_policies.h

@ -48,16 +48,16 @@ std::map<absl::string_view, double> ParseMap(
} // namespace
const LoadBalancingPolicy::BackendMetricData* ParseBackendMetricData(
const grpc_slice& serialized_load_report, Arena* arena) {
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
ParseBackendMetricData(const grpc_slice& serialized_load_report, Arena* arena) {
upb::Arena upb_arena;
xds_data_orca_v3_OrcaLoadReport* msg = xds_data_orca_v3_OrcaLoadReport_parse(
reinterpret_cast<const char*>(
GRPC_SLICE_START_PTR(serialized_load_report)),
GRPC_SLICE_LENGTH(serialized_load_report), upb_arena.ptr());
if (msg == nullptr) return nullptr;
LoadBalancingPolicy::BackendMetricData* backend_metric_data =
arena->New<LoadBalancingPolicy::BackendMetricData>();
auto* backend_metric_data = arena->New<
LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData>();
backend_metric_data->cpu_utilization =
xds_data_orca_v3_OrcaLoadReport_cpu_utilization(msg);
backend_metric_data->mem_utilization =

@ -28,8 +28,8 @@ namespace grpc_core {
// Parses the serialized load report and allocates a BackendMetricData
// object on the arena.
const LoadBalancingPolicy::BackendMetricData* ParseBackendMetricData(
const grpc_slice& serialized_load_report, Arena* arena);
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
ParseBackendMetricData(const grpc_slice& serialized_load_report, Arena* arena);
} // namespace grpc_core

@ -2461,8 +2461,30 @@ class ClientChannel::LoadBalancedCall::LbCallState
void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
override {
absl::string_view ExperimentalGetCallAttribute(const char* key) override {
auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
auto& call_attributes = service_config_call_data->call_attributes();
auto it = call_attributes.find(key);
if (it == call_attributes.end()) return absl::string_view();
return it->second;
}
private:
LoadBalancedCall* lb_call_;
};
//
// ClientChannel::LoadBalancedCall::BackendMetricAccessor
//
class ClientChannel::LoadBalancedCall::BackendMetricAccessor
: public LoadBalancingPolicy::BackendMetricAccessor {
public:
explicit BackendMetricAccessor(LoadBalancedCall* lb_call)
: lb_call_(lb_call) {}
const BackendMetricData* GetBackendMetricData() override {
if (lb_call_->backend_metric_data_ == nullptr) {
grpc_linked_mdelem* md = lb_call_->recv_trailing_metadata_->legacy_index()
->named.x_endpoint_load_metrics_bin;
@ -2474,21 +2496,12 @@ class ClientChannel::LoadBalancedCall::LbCallState
return lb_call_->backend_metric_data_;
}
absl::string_view ExperimentalGetCallAttribute(const char* key) override {
auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
auto& call_attributes = service_config_call_data->call_attributes();
auto it = call_attributes.find(key);
if (it == call_attributes.end()) return absl::string_view();
return it->second;
}
private:
LoadBalancedCall* lb_call_;
};
//
// LoadBalancedCall
// ClientChannel::LoadBalancedCall
//
namespace {
@ -2530,8 +2543,8 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
GRPC_ERROR_UNREF(cancel_error_);
GRPC_ERROR_UNREF(failure_error_);
if (backend_metric_data_ != nullptr) {
backend_metric_data_
->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
backend_metric_data_->LoadBalancingPolicy::BackendMetricAccessor::
BackendMetricData::~BackendMetricData();
}
// Make sure there are no remaining pending batches.
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
@ -2832,7 +2845,7 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
auto* self = static_cast<LoadBalancedCall*>(arg);
// Check if we have a tracer or an LB callback to invoke.
if (self->call_attempt_tracer_ != nullptr ||
self->lb_recv_trailing_metadata_ready_ != nullptr) {
self->lb_subchannel_call_tracker_ != nullptr) {
// Get the call's status.
absl::Status status;
if (error != GRPC_ERROR_NONE) {
@ -2864,11 +2877,13 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
}
// If the LB policy requested a callback for trailing metadata, invoke
// the callback.
if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
if (self->lb_subchannel_call_tracker_ != nullptr) {
Metadata trailing_metadata(self, self->recv_trailing_metadata_);
LbCallState lb_call_state(self);
self->lb_recv_trailing_metadata_ready_(status, &trailing_metadata,
&lb_call_state);
BackendMetricAccessor backend_metric_accessor(self);
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
status, &trailing_metadata, &backend_metric_accessor};
self->lb_subchannel_call_tracker_->Finish(args);
self->lb_subchannel_call_tracker_.reset();
}
}
// Chain to original callback.
@ -3054,15 +3069,21 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
// subchannel has moved out of state READY but the LB policy hasn't
// yet seen that change and given us a new picker), then just
// queue the pick. We'll try again as soon as we get a new picker.
// TODO(roth): In this case, we need to invoke the LB
// policy's recv_trailing_metadata_ready callback to tell it
// that the pick has been abandoned.
if (connected_subchannel_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: subchannel returned by LB picker "
"has no connected subchannel; queueing pick",
chand_, this);
}
MaybeAddCallToLbQueuedCallsLocked();
return false;
}
lb_recv_trailing_metadata_ready_ =
std::move(complete_pick->recv_trailing_metadata_ready);
lb_subchannel_call_tracker_ =
std::move(complete_pick->subchannel_call_tracker);
if (lb_subchannel_call_tracker_ != nullptr) {
lb_subchannel_call_tracker_->Start();
}
MaybeRemoveCallFromLbQueuedCallsLocked();
return true;
},

@ -396,6 +396,7 @@ class ClientChannel::LoadBalancedCall
class LbQueuedCallCanceller;
class Metadata;
class LbCallState;
class BackendMetricAccessor;
// Returns the index into pending_batches_ to be used for batch.
static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
@ -477,10 +478,10 @@ class ClientChannel::LoadBalancedCall
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
std::function<void(absl::Status, LoadBalancingPolicy::MetadataInterface*,
LoadBalancingPolicy::CallState*)>
lb_recv_trailing_metadata_ready_;
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
backend_metric_data_ = nullptr;
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
lb_subchannel_call_tracker_;
RefCountedPtr<SubchannelCall> subchannel_call_;

@ -1,20 +1,18 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
@ -81,26 +79,6 @@ extern DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
// interested_parties() hooks from the API.
class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
public:
// Represents backend metrics reported by the backend to the client.
struct BackendMetricData {
/// CPU utilization expressed as a fraction of available CPU resources.
double cpu_utilization;
/// Memory utilization expressed as a fraction of available memory
/// resources.
double mem_utilization;
/// Total requests per second being served by the backend. This
/// should include all services that a backend is responsible for.
uint64_t requests_per_second;
/// Application-specific requests cost metrics. Metric names are
/// determined by the application. Each value is an absolute cost
/// (e.g. 3487 bytes of storage) associated with the request.
std::map<absl::string_view, double> request_cost;
/// Application-specific resource utilization metrics. Metric names
/// are determined by the application. Each value is expressed as a
/// fraction of total resources available.
std::map<absl::string_view, double> utilization;
};
/// Interface for accessing per-call state.
/// Implemented by the client channel and used by the SubchannelPicker.
class CallState {
@ -114,13 +92,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// for allocations that need to be made on a per-call basis.
virtual void* Alloc(size_t size) = 0;
/// Returns the backend metric data returned by the server for the call,
/// or null if no backend metric data was returned.
// TODO(roth): Move this out of CallState, since it should not be
// accessible to the picker, only to the recv_trailing_metadata_ready
// callback. It should instead be in its own interface.
virtual const BackendMetricData* GetBackendMetricData() = 0;
/// EXPERIMENTAL API.
/// Returns the value of the call attribute \a key.
/// Keys are static strings, so an attribute can be accessed by an LB
@ -172,6 +143,59 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
CallState* call_state;
};
/// Interface for accessing backend metric data.
/// Implemented by the client channel and used by
/// SubchannelCallTrackerInterface.
class BackendMetricAccessor {
public:
// Represents backend metrics reported by the backend to the client.
struct BackendMetricData {
/// CPU utilization expressed as a fraction of available CPU resources.
double cpu_utilization;
/// Memory utilization expressed as a fraction of available memory
/// resources.
double mem_utilization;
/// Total requests per second being served by the backend. This
/// should include all services that a backend is responsible for.
uint64_t requests_per_second;
/// Application-specific requests cost metrics. Metric names are
/// determined by the application. Each value is an absolute cost
/// (e.g. 3487 bytes of storage) associated with the request.
std::map<absl::string_view, double> request_cost;
/// Application-specific resource utilization metrics. Metric names
/// are determined by the application. Each value is expressed as a
/// fraction of total resources available.
std::map<absl::string_view, double> utilization;
};
virtual ~BackendMetricAccessor() = default;
/// Returns the backend metric data returned by the server for the call,
/// or null if no backend metric data was returned.
virtual const BackendMetricData* GetBackendMetricData() = 0;
};
/// Interface for tracking subchannel calls.
/// Implemented by LB policy and used by the channel.
class SubchannelCallTrackerInterface {
public:
virtual ~SubchannelCallTrackerInterface() = default;
/// Called when a subchannel call is started after an LB pick.
virtual void Start() = 0;
/// Called when a subchannel call is completed.
/// The metadata may be modified by the implementation. However, the
/// implementation does not take ownership, so any data that needs to be
/// used after returning must be copied.
struct FinishArgs {
absl::Status status;
MetadataInterface* trailing_metadata;
BackendMetricAccessor* backend_metric_accessor;
};
virtual void Finish(FinishArgs args) = 0;
};
/// The result of picking a subchannel for a call.
struct PickResult {
/// A successful pick.
@ -179,25 +203,17 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// The subchannel to be used for the call. Must be non-null.
RefCountedPtr<SubchannelInterface> subchannel;
/// Callback set by LB policy to be notified of trailing metadata.
/// If non-null, the client channel will invoke the callback
/// when trailing metadata is returned.
/// The metadata may be modified by the callback. However, the callback
/// does not take ownership, so any data that needs to be used after
/// returning must be copied.
/// The call state can be used to obtain backend metric data.
// TODO(roth): The arguments to this callback should be moved into a
// struct, so that we can later add new fields without breaking
// existing implementations.
std::function<void(absl::Status, MetadataInterface*, CallState*)>
recv_trailing_metadata_ready;
/// Optionally set by the LB policy when it wishes to be notified
/// about the resulting subchannel call.
/// Note that if the pick is abandoned by the channel, this may never
/// be used.
std::unique_ptr<SubchannelCallTrackerInterface> subchannel_call_tracker;
explicit Complete(
RefCountedPtr<SubchannelInterface> sc,
std::function<void(absl::Status, MetadataInterface*, CallState*)> cb =
nullptr)
std::unique_ptr<SubchannelCallTrackerInterface> tracker = nullptr)
: subchannel(std::move(sc)),
recv_trailing_metadata_ready(std::move(cb)) {}
subchannel_call_tracker(std::move(tracker)) {}
};
/// Pick cannot be completed until something changes on the control

@ -201,6 +201,8 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
PickResult Pick(PickArgs args) override;
private:
class SubchannelCallTracker;
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
uint32_t max_concurrent_requests_;
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
@ -264,6 +266,71 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
RefCountedPtr<RefCountedPicker> picker_;
};
//
// XdsClusterImplLb::Picker::SubchannelCallTracker
//
class XdsClusterImplLb::Picker::SubchannelCallTracker
: public LoadBalancingPolicy::SubchannelCallTrackerInterface {
public:
SubchannelCallTracker(
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker,
RefCountedPtr<XdsClusterLocalityStats> locality_stats,
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)
: original_subchannel_call_tracker_(
std::move(original_subchannel_call_tracker)),
locality_stats_(std::move(locality_stats)),
call_counter_(std::move(call_counter)) {}
~SubchannelCallTracker() override {
locality_stats_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
call_counter_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
GPR_DEBUG_ASSERT(!started_);
}
void Start() override {
// Increment number of calls in flight.
call_counter_->Increment();
// Record a call started.
if (locality_stats_ != nullptr) {
locality_stats_->AddCallStarted();
}
// Delegate if needed.
if (original_subchannel_call_tracker_ != nullptr) {
original_subchannel_call_tracker_->Start();
}
#ifndef NDEBUG
started_ = true;
#endif
}
void Finish(FinishArgs args) override {
// Delegate if needed.
if (original_subchannel_call_tracker_ != nullptr) {
original_subchannel_call_tracker_->Finish(args);
}
// Record call completion for load reporting.
if (locality_stats_ != nullptr) {
locality_stats_->AddCallFinished(!args.status.ok());
}
// Decrement number of calls in flight.
call_counter_->Decrement();
#ifndef NDEBUG
started_ = false;
#endif
}
private:
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker_;
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
#ifndef NDEBUG
bool started_ = false;
#endif
};
//
// XdsClusterImplLb::Picker
//
@ -291,17 +358,17 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
return PickResult::Drop(absl::UnavailableError(
absl::StrCat("EDS-configured drop: ", *drop_category)));
}
// Handle circuit breaking.
uint32_t current = call_counter_->Load();
// Check and see if we exceeded the max concurrent requests count.
if (current >= max_concurrent_requests_) {
// Check if we exceeded the max concurrent requests circuit breaking limit.
// Note: We check the value here, but we don't actually increment the
// counter for the current request until the channel calls the subchannel
// call tracker's Start() method. This means that we may wind up
// allowing more concurrent requests than the configured limit.
if (call_counter_->Load() >= max_concurrent_requests_) {
if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
return PickResult::Drop(absl::UnavailableError("circuit breaker drop"));
}
call_counter_->Increment();
// If we're not dropping the call, we should always have a child picker.
if (picker_ == nullptr) { // Should never happen.
call_counter_->Decrement();
return PickResult::Fail(absl::InternalError(
"xds_cluster_impl picker not given any child picker"));
}
@ -309,46 +376,27 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
PickResult result = picker_->Pick(args);
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
XdsClusterLocalityStats* locality_stats = nullptr;
RefCountedPtr<XdsClusterLocalityStats> locality_stats;
if (drop_stats_ != nullptr) { // If load reporting is enabled.
auto* subchannel_wrapper =
static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
// Handle load reporting.
locality_stats = subchannel_wrapper->locality_stats()->Ref().release();
// Record a call started.
locality_stats->AddCallStarted();
locality_stats = subchannel_wrapper->locality_stats()->Ref(
DEBUG_LOCATION, "SubchannelCallTracker");
// Unwrap subchannel to pass back up the stack.
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
}
// Intercept the recv_trailing_metadata op to record call completion.
auto* call_counter = call_counter_->Ref(DEBUG_LOCATION, "call").release();
auto original_recv_trailing_metadata_ready =
complete_pick->recv_trailing_metadata_ready;
complete_pick->recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// work serializer or in the data plane mutex.
[locality_stats, original_recv_trailing_metadata_ready, call_counter](
absl::Status status, MetadataInterface* metadata,
CallState* call_state) {
// Record call completion for load reporting.
if (locality_stats != nullptr) {
locality_stats->AddCallFinished(!status.ok());
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
}
// Decrement number of calls in flight.
call_counter->Decrement();
call_counter->Unref(DEBUG_LOCATION, "call");
// Invoke the original recv_trailing_metadata_ready callback, if any.
if (original_recv_trailing_metadata_ready != nullptr) {
original_recv_trailing_metadata_ready(status, metadata, call_state);
}
};
// Inject subchannel call tracker to record call completion.
complete_pick->subchannel_call_tracker =
absl::make_unique<SubchannelCallTracker>(
std::move(complete_pick->subchannel_call_tracker),
std::move(locality_stats),
call_counter_->Ref(DEBUG_LOCATION, "SubchannelCallTracker"));
} else {
// TODO(roth): We should ideally also record call failures here in the case
// where a pick fails. This is challenging, because we don't know which
// picks are for wait_for_ready RPCs or how many times we'll return a
// failure for the same wait_for_ready RPC.
call_counter_->Decrement();
}
return result;
}

@ -224,8 +224,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
// Intercept trailing metadata.
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
new (args.call_state->Alloc(sizeof(TrailingMetadataHandler)))
TrailingMetadataHandler(complete_pick, cb_);
complete_pick->subchannel_call_tracker =
absl::make_unique<SubchannelCallTracker>(cb_);
}
return result;
}
@ -272,29 +272,22 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
InterceptRecvTrailingMetadataCallback cb_;
};
class TrailingMetadataHandler {
class SubchannelCallTracker : public SubchannelCallTrackerInterface {
public:
TrailingMetadataHandler(PickResult::Complete* result,
InterceptRecvTrailingMetadataCallback cb)
: cb_(std::move(cb)) {
result->recv_trailing_metadata_ready = [this](absl::Status /*status*/,
MetadataInterface* metadata,
CallState* call_state) {
RecordRecvTrailingMetadata(metadata, call_state);
};
}
explicit SubchannelCallTracker(InterceptRecvTrailingMetadataCallback cb)
: cb_(std::move(cb)) {}
private:
void RecordRecvTrailingMetadata(MetadataInterface* recv_trailing_metadata,
CallState* call_state) {
void Start() override {}
void Finish(FinishArgs args) override {
TrailingMetadataArgsSeen args_seen;
args_seen.backend_metric_data = call_state->GetBackendMetricData();
GPR_ASSERT(recv_trailing_metadata != nullptr);
args_seen.metadata = recv_trailing_metadata->TestOnlyCopyToVector();
args_seen.backend_metric_data =
args.backend_metric_accessor->GetBackendMetricData();
args_seen.metadata = args.trailing_metadata->TestOnlyCopyToVector();
cb_(args_seen);
this->~TrailingMetadataHandler();
}
private:
InterceptRecvTrailingMetadataCallback cb_;
};
};

@ -36,7 +36,8 @@ void RegisterTestPickArgsLoadBalancingPolicy(
TestPickArgsCallback cb, const char* delegate_policy_name = "pick_first");
struct TrailingMetadataArgsSeen {
const LoadBalancingPolicy::BackendMetricData* backend_metric_data;
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
backend_metric_data;
MetadataVector metadata;
};

Loading…
Cancel
Save