Move locality load reporting to EDS policy.

pull/24291/head
Mark D. Roth 4 years ago
parent 24f54fbb4b
commit 799e805e8a
  1. 14
      BUILD
  2. 1
      BUILD.gn
  3. 1
      CMakeLists.txt
  4. 2
      Makefile
  5. 1
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 1
      gRPC-Core.podspec
  9. 1
      grpc.gemspec
  10. 1
      grpc.gyp
  11. 1
      package.xml
  12. 158
      src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
  13. 537
      src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
  14. 38
      src/core/ext/filters/client_channel/subchannel_interface.h
  15. 2
      src/core/ext/xds/xds_client_stats.h
  16. 4
      src/core/plugin_registry/grpc_plugin_registry.cc
  17. 1
      src/python/grpcio/grpc_core_dependencies.py
  18. 4
      test/cpp/end2end/xds_end2end_test.cc
  19. 1
      tools/doxygen/Doxyfile.c++.internal
  20. 1
      tools/doxygen/Doxyfile.core.internal

14
BUILD

@ -327,7 +327,6 @@ grpc_cc_library(
"grpc_lb_policy_cds",
"grpc_lb_policy_eds",
"grpc_lb_policy_eds_drop",
"grpc_lb_policy_lrs",
"grpc_lb_policy_xds_cluster_manager",
"grpc_resolver_xds",
],
@ -1411,19 +1410,6 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_lb_policy_lrs",
srcs = [
"src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc",
],
language = "c++",
deps = [
"grpc_base",
"grpc_client_channel",
"grpc_xds_client",
],
)
grpc_cc_library(
name = "grpc_lb_policy_xds_cluster_manager",
srcs = [

@ -250,7 +250,6 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/lb_policy/xds/cds.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/eds.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/xds.h",
"src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc",
"src/core/ext/filters/client_channel/lb_policy_factory.h",

@ -1444,7 +1444,6 @@ add_library(grpc
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
src/core/ext/filters/client_channel/lb_policy_registry.cc
src/core/ext/filters/client_channel/local_subchannel_pool.cc

@ -1847,7 +1847,6 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
@ -4511,7 +4510,6 @@ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP)
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc: $(OPENSSL_DEP)

@ -801,7 +801,6 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc
- src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
- src/core/ext/filters/client_channel/lb_policy_registry.cc
- src/core/ext/filters/client_channel/local_subchannel_pool.cc

@ -68,7 +68,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \

@ -35,7 +35,6 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds_drop.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\lrs.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy_registry.cc " +
"src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " +

@ -236,7 +236,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds.h',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_factory.h',

@ -154,7 +154,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/cds.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_factory.h )

@ -473,7 +473,6 @@
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',

@ -134,7 +134,6 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/cds.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/eds.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_factory.h" role="src" />

@ -54,6 +54,8 @@ namespace {
constexpr char kEds[] = "eds_experimental";
const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
// Config for EDS LB policy.
class EdsLbConfig : public LoadBalancingPolicy::Config {
public:
@ -106,6 +108,49 @@ class EdsLb : public LoadBalancingPolicy {
void ResetBackoffLocked() override;
private:
class XdsLocalityAttribute : public ServerAddress::AttributeInterface {
public:
explicit XdsLocalityAttribute(RefCountedPtr<XdsLocalityName> locality_name)
: locality_name_(std::move(locality_name)) {}
RefCountedPtr<XdsLocalityName> locality_name() const {
return locality_name_;
}
std::unique_ptr<AttributeInterface> Copy() const override {
return absl::make_unique<XdsLocalityAttribute>(locality_name_->Ref());
}
int Cmp(const AttributeInterface* other) const override {
const auto* other_locality_attr =
static_cast<const XdsLocalityAttribute*>(other);
return locality_name_->Compare(*other_locality_attr->locality_name_);
}
std::string ToString() const override {
return locality_name_->AsHumanReadableString();
}
private:
RefCountedPtr<XdsLocalityName> locality_name_;
};
class StatsSubchannelWrapper : public DelegatingSubchannel {
public:
StatsSubchannelWrapper(
RefCountedPtr<SubchannelInterface> wrapped_subchannel,
RefCountedPtr<XdsClusterLocalityStats> locality_stats)
: DelegatingSubchannel(std::move(wrapped_subchannel)),
locality_stats_(std::move(locality_stats)) {}
XdsClusterLocalityStats* locality_stats() const {
return locality_stats_.get();
}
private:
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
};
class EndpointWatcher : public XdsClient::EndpointWatcherInterface {
public:
explicit EndpointWatcher(RefCountedPtr<EdsLb> parent)
@ -150,9 +195,9 @@ class EdsLb : public LoadBalancingPolicy {
};
// A picker that handles drops.
class DropPicker : public SubchannelPicker {
class EdsPicker : public SubchannelPicker {
public:
explicit DropPicker(RefCountedPtr<EdsLb> eds_policy);
explicit EdsPicker(RefCountedPtr<EdsLb> eds_policy);
PickResult Pick(PickArgs args) override;
@ -203,7 +248,7 @@ class EdsLb : public LoadBalancingPolicy {
RefCountedPtr<Config> CreateChildPolicyConfigLocked();
grpc_channel_args* CreateChildPolicyArgsLocked(
const grpc_channel_args* args_in);
void MaybeUpdateDropPickerLocked();
void MaybeUpdateEdsPickerLocked();
// Caller must ensure that config_ is set before calling.
const absl::string_view GetEdsResourceName() const {
@ -257,10 +302,10 @@ class EdsLb : public LoadBalancingPolicy {
};
//
// EdsLb::DropPicker
// EdsLb::EdsPicker
//
EdsLb::DropPicker::DropPicker(RefCountedPtr<EdsLb> eds_policy)
EdsLb::EdsPicker::EdsPicker(RefCountedPtr<EdsLb> eds_policy)
: eds_policy_(std::move(eds_policy)),
drop_stats_(eds_policy_->drop_stats_),
child_picker_(eds_policy_->child_picker_),
@ -272,7 +317,7 @@ EdsLb::DropPicker::DropPicker(RefCountedPtr<EdsLb> eds_policy)
}
}
EdsLb::PickResult EdsLb::DropPicker::Pick(PickArgs args) {
EdsLb::PickResult EdsLb::EdsPicker::Pick(PickArgs args) {
// Check and see if we exceeded the max concurrent requests count.
uint32_t current = eds_policy_->concurrent_requests_.FetchAdd(1);
if (current >= max_concurrent_requests_) {
@ -297,22 +342,48 @@ EdsLb::PickResult EdsLb::DropPicker::Pick(PickArgs args) {
}
// Not dropping, so delegate to child's picker.
PickResult result = child_picker_->Pick(args);
if (result.type == PickResult::PICK_COMPLETE) {
if (result.type == result.PICK_COMPLETE && result.subchannel != nullptr) {
XdsClusterLocalityStats* locality_stats = nullptr;
if (drop_stats_ != nullptr) { // If load reporting is enabled.
auto* subchannel_wrapper =
static_cast<StatsSubchannelWrapper*>(result.subchannel.get());
// Handle load reporting.
locality_stats = subchannel_wrapper->locality_stats()->Ref().release();
// Record a call started.
locality_stats->AddCallStarted();
// Unwrap subchannel to pass back up the stack.
result.subchannel = subchannel_wrapper->wrapped_subchannel();
}
// Intercept the recv_trailing_metadata op to record call completion.
EdsLb* eds_policy = static_cast<EdsLb*>(
eds_policy_->Ref(DEBUG_LOCATION, "DropPickPicker+call").release());
auto original_recv_trailing_metadata_ready =
result.recv_trailing_metadata_ready;
result.recv_trailing_metadata_ready =
[original_recv_trailing_metadata_ready, eds_policy](
// Note: This callback does not run in either the control plane
// work serializer or in the data plane mutex.
[locality_stats, original_recv_trailing_metadata_ready, eds_policy](
grpc_error* error, MetadataInterface* metadata,
CallState* call_state) {
if (original_recv_trailing_metadata_ready != nullptr) {
original_recv_trailing_metadata_ready(error, metadata, call_state);
// Record call completion for load reporting.
if (locality_stats != nullptr) {
const bool call_failed = error != GRPC_ERROR_NONE;
locality_stats->AddCallFinished(call_failed);
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
}
// Decrement number of calls in flight.
eds_policy->concurrent_requests_.FetchSub(1);
eds_policy->Unref(DEBUG_LOCATION, "DropPickPicker+call");
// Invoke the original recv_trailing_metadata_ready callback, if any.
if (original_recv_trailing_metadata_ready != nullptr) {
original_recv_trailing_metadata_ready(error, metadata, call_state);
}
};
} else {
// TODO(roth): We should ideally also record call failures here in the case
// where a pick fails. This is challenging, because we don't know which
// picks are for wait_for_ready RPCs or how many times we'll return a
// failure for the same wait_for_ready RPC.
eds_policy_->concurrent_requests_.FetchSub(1);
}
return result;
@ -325,6 +396,27 @@ EdsLb::PickResult EdsLb::DropPicker::Pick(PickArgs args) {
RefCountedPtr<SubchannelInterface> EdsLb::Helper::CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) {
if (eds_policy_->shutting_down_) return nullptr;
// If load reporting is enabled, wrap the subchannel such that it
// includes the locality stats object, which will be used by the EdsPicker.
if (eds_policy_->config_->lrs_load_reporting_server_name().has_value()) {
RefCountedPtr<XdsLocalityName> locality_name;
auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey);
if (attribute != nullptr) {
const auto* locality_attr =
static_cast<const XdsLocalityAttribute*>(attribute);
locality_name = locality_attr->locality_name();
}
RefCountedPtr<XdsClusterLocalityStats> locality_stats =
eds_policy_->xds_client_->AddClusterLocalityStats(
*eds_policy_->config_->lrs_load_reporting_server_name(),
eds_policy_->config_->cluster_name(),
eds_policy_->config_->eds_service_name(), std::move(locality_name));
return MakeRefCounted<StatsSubchannelWrapper>(
eds_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args),
std::move(locality_stats));
}
// Load reporting not enabled, so don't wrap the subchannel.
return eds_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
}
@ -347,8 +439,8 @@ void EdsLb::Helper::UpdateState(grpc_connectivity_state state,
eds_policy_->child_status_ = status;
eds_policy_->child_picker_ =
MakeRefCounted<ChildPickerWrapper>(std::move(picker));
// Wrap the picker in a DropPicker and pass it up.
eds_policy_->MaybeUpdateDropPickerLocked();
// Wrap the picker in a EdsPicker and pass it up.
eds_policy_->MaybeUpdateEdsPickerLocked();
}
void EdsLb::Helper::AddTraceEvent(TraceSeverity severity,
@ -522,7 +614,7 @@ void EdsLb::UpdateLocked(UpdateArgs args) {
}
}
if (lrs_server_changed || max_concurrent_requests_changed) {
MaybeUpdateDropPickerLocked();
MaybeUpdateEdsPickerLocked();
}
// Update child policy if needed.
// Note that this comes after updating drop_stats_, since we want that
@ -671,9 +763,13 @@ ServerAddressList EdsLb::CreateChildPolicyAddressesLocked() {
std::vector<std::string> hierarchical_path = {
priority_child_name, locality_name->AsHumanReadableString()};
for (const auto& endpoint : locality.endpoints) {
addresses.emplace_back(endpoint.WithAttribute(
kHierarchicalPathAttributeKey,
MakeHierarchicalPathAttribute(hierarchical_path)));
addresses.emplace_back(
endpoint
.WithAttribute(kHierarchicalPathAttributeKey,
MakeHierarchicalPathAttribute(hierarchical_path))
.WithAttribute(kXdsLocalityNameAttributeKey,
absl::make_unique<XdsLocalityAttribute>(
locality_name->Ref())));
}
}
}
@ -702,30 +798,10 @@ EdsLb::CreateChildPolicyConfigLocked() {
if (!locality_name->sub_zone().empty()) {
locality_name_json["subzone"] = locality_name->sub_zone();
}
// Construct endpoint-picking policy.
// Wrap it in the LRS policy if load reporting is enabled.
Json endpoint_picking_policy;
if (config_->lrs_load_reporting_server_name().has_value()) {
Json::Object lrs_config = {
{"clusterName", std::string(lrs_key.first)},
{"locality", std::move(locality_name_json)},
{"lrsLoadReportingServerName",
config_->lrs_load_reporting_server_name().value()},
{"childPolicy", config_->endpoint_picking_policy()},
};
if (!lrs_key.second.empty()) {
lrs_config["edsServiceName"] = std::string(lrs_key.second);
}
endpoint_picking_policy = Json::Array{Json::Object{
{"lrs_experimental", std::move(lrs_config)},
}};
} else {
endpoint_picking_policy = config_->endpoint_picking_policy();
}
// Add weighted target entry.
weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{
{"weight", locality.lb_weight},
{"childPolicy", std::move(endpoint_picking_policy)},
{"childPolicy", config_->endpoint_picking_policy()},
};
}
// Construct locality-picking policy.
@ -865,12 +941,12 @@ OrphanablePtr<LoadBalancingPolicy> EdsLb::CreateChildPolicyLocked(
return lb_policy;
}
void EdsLb::MaybeUpdateDropPickerLocked() {
void EdsLb::MaybeUpdateEdsPickerLocked() {
// Update only if we have a child picker.
if (child_picker_ != nullptr) {
channel_control_helper()->UpdateState(
child_state_, child_status_,
absl::make_unique<DropPicker>(Ref(DEBUG_LOCATION, "DropPicker")));
absl::make_unique<EdsPicker>(Ref(DEBUG_LOCATION, "EdsPicker")));
}
}
@ -1028,7 +1104,9 @@ class EdsLbFactory : public LoadBalancingPolicyFactory {
EdsLbConfig* new_eds_config = static_cast<EdsLbConfig*>(new_config);
return old_eds_config->cluster_name() != new_eds_config->cluster_name() ||
old_eds_config->eds_service_name() !=
new_eds_config->eds_service_name();
new_eds_config->eds_service_name() ||
old_eds_config->lrs_load_reporting_server_name() !=
new_eds_config->lrs_load_reporting_server_name();
}
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(

@ -1,537 +0,0 @@
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/work_serializer.h"
namespace grpc_core {
TraceFlag grpc_lb_lrs_trace(false, "lrs_lb");
namespace {
constexpr char kLrs[] = "lrs_experimental";
// Config for LRS LB policy.
class LrsLbConfig : public LoadBalancingPolicy::Config {
public:
LrsLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
std::string cluster_name, std::string eds_service_name,
std::string lrs_load_reporting_server_name,
RefCountedPtr<XdsLocalityName> locality_name)
: child_policy_(std::move(child_policy)),
cluster_name_(std::move(cluster_name)),
eds_service_name_(std::move(eds_service_name)),
lrs_load_reporting_server_name_(
std::move(lrs_load_reporting_server_name)),
locality_name_(std::move(locality_name)) {}
const char* name() const override { return kLrs; }
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
const std::string& cluster_name() const { return cluster_name_; }
const std::string& eds_service_name() const { return eds_service_name_; }
const std::string& lrs_load_reporting_server_name() const {
return lrs_load_reporting_server_name_;
};
RefCountedPtr<XdsLocalityName> locality_name() const {
return locality_name_;
}
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
std::string cluster_name_;
std::string eds_service_name_;
std::string lrs_load_reporting_server_name_;
RefCountedPtr<XdsLocalityName> locality_name_;
};
// LRS LB policy.
class LrsLb : public LoadBalancingPolicy {
public:
LrsLb(RefCountedPtr<XdsClient> xds_client, Args args);
const char* name() const override { return kLrs; }
void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
// A simple wrapper for ref-counting a picker from the child policy.
class RefCountedPicker : public RefCounted<RefCountedPicker> {
public:
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// A picker that wraps the picker from the child to perform load reporting.
class LoadReportingPicker : public SubchannelPicker {
public:
LoadReportingPicker(RefCountedPtr<RefCountedPicker> picker,
RefCountedPtr<XdsClusterLocalityStats> locality_stats)
: picker_(std::move(picker)),
locality_stats_(std::move(locality_stats)) {}
PickResult Pick(PickArgs args);
private:
RefCountedPtr<RefCountedPicker> picker_;
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
};
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<LrsLb> lrs_policy)
: lrs_policy_(std::move(lrs_policy)) {}
~Helper() { lrs_policy_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<LrsLb> lrs_policy_;
};
~LrsLb();
void ShutdownLocked() override;
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const grpc_channel_args* args);
void UpdateChildPolicyLocked(ServerAddressList addresses,
const grpc_channel_args* args);
void MaybeUpdatePickerLocked();
// Current config from the resolver.
RefCountedPtr<LrsLbConfig> config_;
// Internal state.
bool shutting_down_ = false;
// The xds client.
RefCountedPtr<XdsClient> xds_client_;
// The stats for client-side load reporting.
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// Latest state and picker reported by the child policy.
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
absl::Status status_;
RefCountedPtr<RefCountedPicker> picker_;
};
//
// LrsLb::LoadReportingPicker
//
LoadBalancingPolicy::PickResult LrsLb::LoadReportingPicker::Pick(
LoadBalancingPolicy::PickArgs args) {
// Forward the pick to the picker returned from the child policy.
PickResult result = picker_->Pick(args);
if (result.type == PickResult::PICK_COMPLETE &&
result.subchannel != nullptr) {
// Record a call started.
locality_stats_->AddCallStarted();
// Intercept the recv_trailing_metadata op to record call completion.
XdsClusterLocalityStats* locality_stats =
locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release();
result.recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// work serializer or in the data plane mutex.
[locality_stats](grpc_error* error, MetadataInterface* /*metadata*/,
CallState* /*call_state*/) {
const bool call_failed = error != GRPC_ERROR_NONE;
locality_stats->AddCallFinished(call_failed);
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
};
}
return result;
}
//
// LrsLb
//
LrsLb::LrsLb(RefCountedPtr<XdsClient> xds_client, Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
gpr_log(GPR_INFO, "[lrs_lb %p] created -- using xds client %p", this,
xds_client_.get());
}
}
LrsLb::~LrsLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
gpr_log(GPR_INFO, "[lrs_lb %p] destroying xds LB policy", this);
}
}
void LrsLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
gpr_log(GPR_INFO, "[lrs_lb %p] shutting down", this);
}
shutting_down_ = true;
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_.reset();
}
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_.reset();
locality_stats_.reset();
xds_client_.reset();
}
void LrsLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
void LrsLb::ResetBackoffLocked() {
// The XdsClient will have its backoff reset by the xds resolver, so we
// don't need to do it here.
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
void LrsLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
gpr_log(GPR_INFO, "[lrs_lb %p] Received update", this);
}
// Update config.
auto old_config = std::move(config_);
config_ = std::move(args.config);
// Update load reporting if needed.
if (old_config == nullptr ||
config_->lrs_load_reporting_server_name() !=
old_config->lrs_load_reporting_server_name() ||
config_->cluster_name() != old_config->cluster_name() ||
config_->eds_service_name() != old_config->eds_service_name() ||
*config_->locality_name() != *old_config->locality_name()) {
locality_stats_ = xds_client_->AddClusterLocalityStats(
config_->lrs_load_reporting_server_name(), config_->cluster_name(),
config_->eds_service_name(), config_->locality_name());
MaybeUpdatePickerLocked();
}
// Update child policy.
UpdateChildPolicyLocked(std::move(args.addresses), args.args);
args.args = nullptr;
}
void LrsLb::MaybeUpdatePickerLocked() {
if (picker_ != nullptr) {
auto lrs_picker =
absl::make_unique<LoadReportingPicker>(picker_, locality_stats_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
gpr_log(
GPR_INFO,
"[lrs_lb %p] updating connectivity: state=%s status=(%s) picker=%p",
this, ConnectivityStateName(state_), status_.ToString().c_str(),
lrs_picker.get());
}
channel_control_helper()->UpdateState(state_, status_,
std::move(lrs_picker));
}
}
OrphanablePtr<LoadBalancingPolicy> LrsLb::CreateChildPolicyLocked(
const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_lrs_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
gpr_log(GPR_INFO, "[lrs_lb %p] Created new child policy handler %p", this,
lb_policy.get());
}
// Add our interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// this policy, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
void LrsLb::UpdateChildPolicyLocked(ServerAddressList addresses,
const grpc_channel_args* args) {
// Create policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args);
}
// Construct update args.
UpdateArgs update_args;
update_args.addresses = std::move(addresses);
update_args.config = config_->child_policy();
update_args.args = args;
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
gpr_log(GPR_INFO, "[lrs_lb %p] Updating child policy handler %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
}
//
// LrsLb::Helper
//
RefCountedPtr<SubchannelInterface> LrsLb::Helper::CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) {
if (lrs_policy_->shutting_down_) return nullptr;
return lrs_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
}
void LrsLb::Helper::UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
if (lrs_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
gpr_log(
GPR_INFO,
"[lrs_lb %p] child connectivity state update: state=%s (%s) picker=%p",
lrs_policy_.get(), ConnectivityStateName(state),
status.ToString().c_str(), picker.get());
}
// Save the state and picker.
lrs_policy_->state_ = state;
lrs_policy_->status_ = status;
lrs_policy_->picker_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
// Wrap the picker and return it to the channel.
lrs_policy_->MaybeUpdatePickerLocked();
}
void LrsLb::Helper::RequestReresolution() {
if (lrs_policy_->shutting_down_) return;
lrs_policy_->channel_control_helper()->RequestReresolution();
}
void LrsLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (lrs_policy_->shutting_down_) return;
lrs_policy_->channel_control_helper()->AddTraceEvent(severity, message);
}
//
// factory
//
class LrsLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
grpc_error* error = GRPC_ERROR_NONE;
RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"cannot get XdsClient to instantiate lrs LB policy: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
return nullptr;
}
return MakeOrphanable<LrsLb>(std::move(xds_client), std::move(args));
}
const char* name() const override { return kLrs; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
// lrs was mentioned as a policy in the deprecated loadBalancingPolicy
// field or in the client API.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:lrs policy requires configuration. "
"Please use loadBalancingConfig field of service config instead.");
return nullptr;
}
std::vector<grpc_error*> error_list;
// Child policy.
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:childPolicy error:required field missing"));
} else {
grpc_error* parse_error = GRPC_ERROR_NONE;
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
it->second, &parse_error);
if (child_policy == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
}
// Cluster name.
std::string cluster_name;
it = json.object_value().find("clusterName");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:clusterName error:required field missing"));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:clusterName error:type should be string"));
} else {
cluster_name = it->second.string_value();
}
// EDS service name.
std::string eds_service_name;
it = json.object_value().find("edsServiceName");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:edsServiceName error:type should be string"));
} else {
eds_service_name = it->second.string_value();
}
}
// Locality.
RefCountedPtr<XdsLocalityName> locality_name;
it = json.object_value().find("locality");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:locality error:required field missing"));
} else {
std::vector<grpc_error*> child_errors =
ParseLocality(it->second, &locality_name);
if (!child_errors.empty()) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:locality", &child_errors));
}
}
// LRS load reporting server name.
std::string lrs_load_reporting_server_name;
it = json.object_value().find("lrsLoadReportingServerName");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:lrsLoadReportingServerName error:required field missing"));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:lrsLoadReportingServerName error:type should be string"));
} else {
lrs_load_reporting_server_name = it->second.string_value();
}
if (!error_list.empty()) {
*error = GRPC_ERROR_CREATE_FROM_VECTOR(
"lrs_experimental LB policy config", &error_list);
return nullptr;
}
return MakeRefCounted<LrsLbConfig>(
std::move(child_policy), std::move(cluster_name),
std::move(eds_service_name), std::move(lrs_load_reporting_server_name),
std::move(locality_name));
}
private:
static std::vector<grpc_error*> ParseLocality(
const Json& json, RefCountedPtr<XdsLocalityName>* name) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"locality field is not an object"));
return error_list;
}
std::string region;
auto it = json.object_value().find("region");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"region\" field is not a string"));
} else {
region = it->second.string_value();
}
}
std::string zone;
it = json.object_value().find("zone");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"zone\" field is not a string"));
} else {
zone = it->second.string_value();
}
}
std::string subzone;
it = json.object_value().find("subzone");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"subzone\" field is not a string"));
} else {
subzone = it->second.string_value();
}
}
if (region.empty() && zone.empty() && subzone.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"at least one of region, zone, or subzone must be set"));
}
if (error_list.empty()) {
*name = MakeRefCounted<XdsLocalityName>(region, zone, subzone);
}
return error_list;
}
};
} // namespace
} // namespace grpc_core
//
// Plugin registration
//
void grpc_lb_policy_lrs_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::LrsLbFactory>());
}
void grpc_lb_policy_lrs_shutdown() {}

@ -95,6 +95,44 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
const char* key) const = 0;
};
// A class that delegates to another subchannel, to be used in cases
// where an LB policy needs to wrap a subchannel.
class DelegatingSubchannel : public SubchannelInterface {
public:
explicit DelegatingSubchannel(RefCountedPtr<SubchannelInterface> subchannel)
: wrapped_subchannel_(std::move(subchannel)) {}
RefCountedPtr<SubchannelInterface> wrapped_subchannel() const {
return wrapped_subchannel_;
}
grpc_connectivity_state CheckConnectivityState() override {
return wrapped_subchannel_->CheckConnectivityState();
}
void WatchConnectivityState(
grpc_connectivity_state initial_state,
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override {
return wrapped_subchannel_->WatchConnectivityState(initial_state,
std::move(watcher));
}
void CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) override {
return wrapped_subchannel_->CancelConnectivityStateWatch(watcher);
}
void AttemptToConnect() override { wrapped_subchannel_->AttemptToConnect(); }
void ResetBackoff() override { wrapped_subchannel_->ResetBackoff(); }
const grpc_channel_args* channel_args() override {
return wrapped_subchannel_->channel_args();
}
const ServerAddress::AttributeInterface* GetAttribute(
const char* key) const override {
return wrapped_subchannel_->GetAttribute(key);
}
private:
RefCountedPtr<SubchannelInterface> wrapped_subchannel_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H */

@ -28,6 +28,7 @@
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -45,6 +46,7 @@ class XdsLocalityName : public RefCounted<XdsLocalityName> {
struct Less {
bool operator()(const XdsLocalityName* lhs,
const XdsLocalityName* rhs) const {
if (lhs == nullptr || rhs == nullptr) return GPR_ICMP(lhs, rhs);
return lhs->Compare(*rhs) < 0;
}

@ -74,8 +74,6 @@ void grpc_lb_policy_eds_init(void);
void grpc_lb_policy_eds_shutdown(void);
void grpc_lb_policy_eds_drop_init(void);
void grpc_lb_policy_eds_drop_shutdown(void);
void grpc_lb_policy_lrs_init(void);
void grpc_lb_policy_lrs_shutdown(void);
void grpc_lb_policy_xds_cluster_manager_init(void);
void grpc_lb_policy_xds_cluster_manager_shutdown(void);
void grpc_resolver_xds_init(void);
@ -134,8 +132,6 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_eds_shutdown);
grpc_register_plugin(grpc_lb_policy_eds_drop_init,
grpc_lb_policy_eds_drop_shutdown);
grpc_register_plugin(grpc_lb_policy_lrs_init,
grpc_lb_policy_lrs_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_cluster_manager_init,
grpc_lb_policy_xds_cluster_manager_shutdown);
grpc_register_plugin(grpc_resolver_xds_init,

@ -44,7 +44,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',

@ -130,7 +130,7 @@ constexpr char kDefaultServiceConfig[] =
" \"loadBalancingConfig\":[\n"
" { \"does_not_exist\":{} },\n"
" { \"eds_experimental\":{\n"
" \"clusterName\": \"application_target_name\",\n"
" \"clusterName\": \"server.example.com\",\n"
" \"lrsLoadReportingServerName\": \"\"\n"
" } }\n"
" ]\n"
@ -140,7 +140,7 @@ constexpr char kDefaultServiceConfigWithoutLoadReporting[] =
" \"loadBalancingConfig\":[\n"
" { \"does_not_exist\":{} },\n"
" { \"eds_experimental\":{\n"
" \"clusterName\": \"application_target_name\"\n"
" \"clusterName\": \"server.example.com\"\n"
" } }\n"
" ]\n"
"}";

@ -1090,7 +1090,6 @@ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \

@ -915,7 +915,6 @@ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \

Loading…
Cancel
Save