Addressing code review comments.

reviewable/pr22280/r3
Donna Dionne 5 years ago
parent 70ac4b6418
commit f487d1be61
  1. 281
      src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
  2. 34
      src/core/ext/filters/client_channel/xds/xds_api.cc
  3. 3
      src/core/ext/filters/client_channel/xds/xds_api.h
  4. 165
      src/core/ext/filters/client_channel/xds/xds_client.cc
  5. 8
      src/core/ext/filters/client_channel/xds/xds_client.h
  6. 4
      src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
  7. 23
      test/cpp/end2end/xds_end2end_test.cc

@ -22,6 +22,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
@ -52,24 +53,26 @@ class XdsRoutingLbConfig : public LoadBalancingPolicy::Config {
struct ChildConfig {
RefCountedPtr<LoadBalancingPolicy::Config> config;
};
using Matcher = std::pair<std::string, std::string>;
using RouteVector = std::vector<std::pair<Matcher, std::string>>;
struct Matcher {
std::string service;
std::string method;
};
using RouteTable = std::vector<std::pair<Matcher, std::string>>;
using ActionMap = std::map<std::string, ChildConfig>;
explicit XdsRoutingLbConfig(ActionMap action_map, RouteVector route_vector)
XdsRoutingLbConfig(ActionMap action_map, RouteTable route_table)
: action_map_(std::move(action_map)),
route_vector_(std::move(route_vector)) {}
route_table_(std::move(route_table)) {}
const char* name() const override { return kXdsRouting; }
const ActionMap& action_map() const { return action_map_; }
const RouteVector& route_vector() const { return route_vector_; }
const RouteTable& route_table() const { return route_table_; }
private:
ActionMap action_map_;
RouteVector route_vector_;
RouteTable route_table_;
};
// xds_routing LB policy.
@ -87,39 +90,37 @@ class XdsRoutingLb : public LoadBalancingPolicy {
// A simple wrapper for ref-counting a picker from the child policy.
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
public:
explicit ChildPickerWrapper(const std::string& name,
ChildPickerWrapper(std::string name,
std::unique_ptr<SubchannelPicker> picker)
: name_(name), picker_(std::move(picker)) {}
: name_(std::move(name)), picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(std::move(args)); }
std::string name() { return name_; }
const std::string& name() { return name_; }
private:
std::string name_;
std::unique_ptr<SubchannelPicker> picker_;
};
// Picks a child using stateless WRR and then delegates to that
// Picks a child using prefix or path matching and then delegates to that
// child's picker.
class XdsRoutingPicker : public SubchannelPicker {
class RoutePicker : public SubchannelPicker {
public:
// Maintains a xds_routing list of pickers from each child that is in
// ready state. The first element in the pair represents the end of a
// range proportional to the child's weight. The start of the range
// is the previous value in the vector and is 0 for the first element.
using PickerList = InlinedVector<RefCountedPtr<ChildPickerWrapper>, 1>;
struct Route {
XdsRoutingLbConfig::Matcher matcher;
RefCountedPtr<ChildPickerWrapper> picker;
};
using PickerMap = std::map<std::string, RefCountedPtr<ChildPickerWrapper>>;
// Maintains an ordered xds route table as provided by RDS response.
using RouteTable = std::vector<Route>;
XdsRoutingPicker(RefCountedPtr<XdsRoutingLb> parent, PickerMap pickers)
: parent_(std::move(parent)), pickers_(std::move(pickers)) {}
~XdsRoutingPicker() { parent_.reset(DEBUG_LOCATION, "XdsRoutingPicker"); }
RoutePicker(RouteTable route_table)
: route_table_(std::move(route_table)) {}
PickResult Pick(PickArgs args) override;
private:
RefCountedPtr<XdsRoutingLb> parent_;
PickerMap pickers_;
RouteTable route_table_;
};
// Each XdsRoutingChild holds a ref to its parent XdsRoutingLb.
@ -145,8 +146,6 @@ class XdsRoutingLb : public LoadBalancingPolicy {
return picker_wrapper_;
}
std::string name() const { return name_; }
private:
class Helper : public ChannelControlHelper {
public:
@ -211,35 +210,29 @@ class XdsRoutingLb : public LoadBalancingPolicy {
};
//
// XdsRoutingLb::XdsRoutingPicker
// XdsRoutingLb::RoutePicker
//
XdsRoutingLb::PickResult XdsRoutingLb::XdsRoutingPicker::Pick(PickArgs args) {
std::string path;
XdsRoutingLb::PickResult XdsRoutingLb::RoutePicker::Pick(PickArgs args) {
absl::string_view path;
for (const auto& p : *(args.initial_metadata)) {
if (memcmp(p.first.data(), ":path", static_cast<int>(p.first.size())) ==
0) {
path = std::string(p.second.data(), static_cast<int>(p.second.size()));
if (p.first == ":path") {
path = p.second;
break;
}
}
std::vector<std::string> v = absl::StrSplit(path, '/');
GPR_DEBUG_ASSERT(v.size() == 3);
std::string service = v[1];
std::string method = v[2];
for (int i = 0; i < parent_->config_->route_vector().size(); ++i) {
if (service == parent_->config_->route_vector()[i].first.first &&
("" == parent_->config_->route_vector()[i].first.second ||
method == parent_->config_->route_vector()[i].first.second)) {
auto picker = pickers_.find(parent_->config_->route_vector()[i].second);
if (picker != pickers_.end()) {
gpr_log(GPR_INFO, "XdsRouting Picked: %s for path %s",
picker->first.c_str(), path.c_str());
return picker->second.get()->Pick(args);
std::vector<absl::string_view> v = absl::StrSplit(path.substr(1), '/');
for (int i = 0; i < route_table_.size(); ++i) {
if (v[0] == route_table_[i].matcher.service &&
("" == route_table_[i].matcher.method ||
v[1] == route_table_[i].matcher.method)) {
auto picker = route_table_[i].picker;
if (picker != nullptr) {
return picker.get()->Pick(args);
}
}
}
return pickers_.begin()->second.get()->Pick(args);
return route_table_[route_table_.size() - 1].picker.get()->Pick(args);
}
//
@ -313,11 +306,7 @@ void XdsRoutingLb::UpdateLocked(UpdateArgs args) {
}
void XdsRoutingLb::UpdateStateLocked() {
// Construct a new picker which maintains a map of all child pickers
// that are ready. Each child is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the
// weights of all children.
XdsRoutingPicker::PickerMap picker_map;
std::map<std::string, RefCountedPtr<ChildPickerWrapper>> picker_map;
// Also count the number of children in each state, to determine the
// overall state.
size_t num_connecting = 0;
@ -367,10 +356,19 @@ void XdsRoutingLb::UpdateStateLocked() {
ConnectivityStateName(connectivity_state));
}
std::unique_ptr<SubchannelPicker> picker;
RoutePicker::RouteTable route_table;
switch (connectivity_state) {
case GRPC_CHANNEL_READY:
picker = absl::make_unique<XdsRoutingPicker>(
Ref(DEBUG_LOCATION, "XdsRoutingPicker"), std::move(picker_map));
for (int i = 0; i < config_->route_table().size(); ++i) {
RoutePicker::Route route;
route.matcher = config_->route_table()[i].first;
auto child_picker = picker_map.find(config_->route_table()[i].second);
if (child_picker != picker_map.end()) {
route.picker = child_picker->second;
}
route_table.push_back(std::move(route));
}
picker = absl::make_unique<RoutePicker>(std::move(route_table));
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
@ -460,6 +458,7 @@ void XdsRoutingLb::XdsRoutingChild::UpdateLocked(
// Update child weight.
// Reactivate if needed.
if (delayed_removal_timer_callback_pending_) {
delayed_removal_timer_callback_pending_ = false;
grpc_timer_cancel(&delayed_removal_timer_);
}
// Create child policy if needed.
@ -530,7 +529,6 @@ void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimerLocked(
RefCountedPtr<SubchannelInterface>
XdsRoutingLb::XdsRoutingChild::Helper::CreateSubchannel(
const grpc_channel_args& args) {
gpr_log(GPR_INFO, "XdsRoutingChild::Helper::CreateSubchannel");
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return nullptr;
return xds_routing_child_->xds_routing_policy_->channel_control_helper()
->CreateSubchannel(args);
@ -538,12 +536,15 @@ XdsRoutingLb::XdsRoutingChild::Helper::CreateSubchannel(
void XdsRoutingLb::XdsRoutingChild::Helper::UpdateState(
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
gpr_log(GPR_INFO, "XdsRoutingChild::Helper::UpdateState %s",
xds_routing_child_->name().c_str());
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"XdsRoutingChild::Helper::UpdateState child %s, state %d, piker %p",
xds_routing_child_->name_.c_str(), state, picker.get());
}
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
// Cache the picker in the XdsRoutingChild.
xds_routing_child_->picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>(
xds_routing_child_->name(), std::move(picker));
xds_routing_child_->name_, std::move(picker));
// Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
@ -607,87 +608,67 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error:required field not present"));
} else if (it->second.type() != Json::Type::ARRAY) {
} else if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error:type should be array"));
"field:actions error:type should be object"));
} else {
for (const auto& p : it->second.array_value()) {
auto it_cds = p.object_value().find("cds");
auto it_weighted_target = p.object_value().find("weighted_target");
if (it_cds == p.object_value().end() &&
it_weighted_target == p.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error: each action needs to be either cds or "
"weighted target"));
}
auto it_name =
(it_cds == p.object_value().end() ? it_weighted_target : it_cds);
auto it_child_policy = p.object_value().find("child_policy");
if (it_child_policy == p.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error: each action needs child policies"));
}
for (const auto& p : it->second.object_value()) {
XdsRoutingLbConfig::ChildConfig child_config;
std::vector<grpc_error*> child_errors =
ParseChildConfig(it_child_policy->second, &child_config);
ParseChildConfig(p.second, &child_config);
if (!child_errors.empty()) {
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("field:actions name:",
it_name->second.string_value())
.c_str());
absl::StrCat("field:actions name:", p.first).c_str());
for (grpc_error* child_error : child_errors) {
error = grpc_error_add_child(error, child_error);
}
error_list.push_back(error);
} else {
action_map[it_name->second.string_value()] = std::move(child_config);
action_map[p.first] = std::move(child_config);
}
}
}
XdsRoutingLbConfig::RouteVector route_vector;
auto route_iter = json.object_value().find("routes");
if (route_iter == json.object_value().end()) {
gpr_log(GPR_INFO, "No routes specified");
} else if (route_iter->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:type should be array"));
} else {
for (const auto& p : route_iter->second.array_value()) {
auto method_name = p.object_value().find("methodName");
if (method_name == p.object_value().end()) {
XdsRoutingLbConfig::RouteTable route_table;
it = json.object_value().find("routes");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:methodName is required"));
} else {
auto action_name = p.object_value().find("action");
if (action_name == p.object_value().end()) {
"field:routes error:required field not present"));
} else if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:action is required"));
"field:routes error:type should be array"));
} else {
for (const auto& route : it->second.array_value()) {
// Parse methodName.
XdsRoutingLbConfig::Matcher matcher;
auto service = method_name->second.object_value().find("service");
auto method = method_name->second.object_value().find("method");
if (service == method_name->second.object_value().end() &&
method != method_name->second.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:methodName error: service is empty when method is "
"not"));
}
if (service != method_name->second.object_value().end()) {
matcher.first = service->second.string_value();
} else {
matcher.first = "";
std::vector<grpc_error*> route_errors =
ParseRouteConfig(route.object_value(), &matcher);
if (!route_errors.empty()) {
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
grpc_error* error =
GRPC_ERROR_CREATE_FROM_COPIED_STRING("field:routes error");
for (grpc_error* route_error : route_errors) {
error = grpc_error_add_child(error, route_error);
}
if (method != method_name->second.object_value().end()) {
matcher.second = method->second.string_value();
} else {
matcher.first = "";
error_list.push_back(error);
}
route_vector.emplace_back(matcher,
action_name->second.string_value());
// Parse action.
std::string cluster_name;
std::vector<grpc_error*> action_errors =
ParseActionConfig(route.object_value(), &cluster_name);
if (!action_errors.empty()) {
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
grpc_error* error =
GRPC_ERROR_CREATE_FROM_COPIED_STRING("field:actions error:");
for (grpc_error* action_error : action_errors) {
error = grpc_error_add_child(error, action_error);
}
error_list.push_back(error);
}
route_table.emplace_back(std::move(matcher), std::move(cluster_name));
}
}
if (!error_list.empty()) {
@ -696,22 +677,24 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
return nullptr;
}
return MakeRefCounted<XdsRoutingLbConfig>(std::move(action_map),
std::move(route_vector));
std::move(route_table));
}
private:
static std::vector<grpc_error*> ParseChildConfig(
const Json& json, XdsRoutingLbConfig::ChildConfig* child_config) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::ARRAY) {
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type array"));
"value should be of type object"));
return error_list;
}
auto it = json.object_value().find("child_policy");
if (it != json.object_value().end()) {
grpc_error* parse_error = GRPC_ERROR_NONE;
child_config->config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
json.array_value(), &parse_error);
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(it->second,
&parse_error);
if (child_config->config == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
@ -719,6 +702,68 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
} else {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
}
return error_list;
}
static std::vector<grpc_error*> ParseRouteConfig(
const Json& json, XdsRoutingLbConfig::Matcher* route_config) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
auto method_name = json.object_value().find("methodName");
if (method_name == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:methodName is required"));
} else if (method_name->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:methodName error: type should be object"));
} else {
auto service = method_name->second.object_value().find("service");
auto method = method_name->second.object_value().find("method");
if (service != method_name->second.object_value().end()) {
route_config->service = service->second.string_value();
} else {
route_config->service = "";
}
if (method != method_name->second.object_value().end()) {
route_config->method = method->second.string_value();
} else {
route_config->method = "";
}
if ((route_config->service == "") && (route_config->method != "")) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:methodName error: service is empty when method is "
"not"));
}
}
return error_list;
}
static std::vector<grpc_error*> ParseActionConfig(const Json& json,
std::string* cluster_name) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
auto action_name = json.object_value().find("action");
if (action_name == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:action is required"));
} else if (action_name->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:methodName error:type should be string"));
} else {
*cluster_name = action_name->second.string_value();
}
return error_list;
}
};

@ -1012,7 +1012,6 @@ grpc_error* RouteConfigParse(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No route found in the virtual host.");
}
for (size_t i = 0; i < size; ++i) {
const envoy_api_v2_route_Route* route = routes[i];
const envoy_api_v2_route_RouteMatch* match =
@ -1020,28 +1019,28 @@ grpc_error* RouteConfigParse(
XdsApi::RdsRoute rds_route;
const upb_strview prefix = envoy_api_v2_route_RouteMatch_prefix(match);
const upb_strview path = envoy_api_v2_route_RouteMatch_path(match);
if (!upb_strview_eql(prefix, upb_strview_makez(""))) {
std::string prefix_string = std::string(prefix.data, prefix.size);
std::vector<std::string> v = absl::StrSplit(prefix_string, '/');
if (v.size() != 2) {
if (prefix.size > 0) {
std::vector<absl::string_view> v = absl::StrSplit(
absl::string_view(prefix.data, prefix.size).substr(1), '/');
if (v.size() != 1) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Prefix not in the required format of /service/");
}
rds_route.service = v[1];
if (!upb_strview_eql(path, upb_strview_makez(""))) {
rds_route.service = std::string(v[0].data(), v[0].size());
if (path.size > 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Prefix is not empty string, path cannot also be non-empty.");
}
} else if (!upb_strview_eql(path, upb_strview_makez(""))) {
std::string path_string = std::string(path.data, path.size);
std::vector<std::string> v = absl::StrSplit(path_string, '/');
if (v.size() != 3) {
} else if (path.size > 0) {
std::vector<absl::string_view> v = absl::StrSplit(
absl::string_view(path.data, path.size).substr(1), '/');
if (v.size() != 2) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Path not in the required format of /service/method");
}
rds_route.service = v[1];
rds_route.method = v[2];
if (!upb_strview_eql(prefix, upb_strview_makez(""))) {
rds_route.service = std::string(v[0].data(), v[0].size());
rds_route.method = std::string(v[1].data(), v[1].size());
if (prefix.size > 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Path is not empty string, prefix cannot also be non-empty.");
}
@ -1059,13 +1058,12 @@ grpc_error* RouteConfigParse(
}
const upb_strview action =
envoy_api_v2_route_RouteAction_cluster(route_action);
rds_route.action_name = std::string(action.data, action.size);
rds_route.cluster_name = std::string(action.data, action.size);
rds_update->routes.emplace_back(std::move(rds_route));
gpr_log(GPR_INFO, "RouteConfigParse a route %s %s %s %s",
gpr_log(GPR_INFO, "RouteConfigParse a route %s %s %s",
rds_update->routes[i].service.c_str(),
rds_update->routes[i].method.c_str(),
rds_update->routes[i].action_type.c_str(),
rds_update->routes[i].action_name.c_str());
rds_update->routes[i].cluster_name.c_str());
}
return GRPC_ERROR_NONE;
}

@ -46,8 +46,7 @@ class XdsApi {
struct RdsRoute {
std::string service;
std::string method;
std::string action_type;
std::string action_name;
std::string cluster_name;
};
struct RdsUpdate {

@ -22,6 +22,7 @@
#include <limits.h>
#include <string.h>
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include <grpc/byte_buffer_reader.h>
@ -70,6 +71,35 @@ namespace grpc_core {
TraceFlag grpc_xds_client_trace(false, "xds_client");
namespace {
std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
std::string json = absl::StrFormat(
" \"cds:%s\":{\n"
" \"child_policy\":[\n"
" { \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" } }\n"
" ]\n"
" }",
cluster_name.c_str(), cluster_name.c_str());
return json;
}
std::string CreateServiceConfigRoute(const std::string& cluster_name,
const std::string& service,
const std::string& method) {
std::string json = absl::StrFormat(
" { \"methodName\":\n"
" { \"service\": \"%s\",\n"
" \"method\": \"%s\"},\n"
" \"action\": \"cds:%s\"\n"
" }",
service.c_str(), method.c_str(), cluster_name.c_str());
return json;
}
} // namespace
//
// Internal class declarations
//
@ -888,35 +918,23 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
"LDS update does not include requested resource"));
return;
}
const std::string& cluster_name =
lds_update->rds_update.has_value()
? lds_update->rds_update.value().routes[0].action_name
: "";
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update received: route_config_name=%s, "
"cluster_name=%s (empty if RDS is needed to obtain it)",
xds_client(), lds_update->route_config_name.c_str(),
cluster_name.c_str());
for (auto route : lds_update->rds_update.value().routes) {
gpr_log(GPR_INFO, "Create service config using %s %s %s %s",
"[xds_client %p] LDS update received: route_config_name=%s",
xds_client(), lds_update->route_config_name.c_str());
if (lds_update->rds_update.has_value()) {
for (const auto& route : lds_update->rds_update.value().routes) {
gpr_log(GPR_INFO,
"Create service config using route: { service=\"%s\", "
"method=\"%s\" }, cluster=\"%s\" }",
route.service.c_str(), route.method.c_str(),
route.action_type.c_str(), route.action_name.c_str());
route.cluster_name.c_str());
}
}
}
auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
if (state != nullptr) state->Finish();
// Ignore identical update.
if (xds_client()->route_config_name_ == lds_update->route_config_name &&
xds_client()->cluster_name_ == cluster_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update identical to current, ignoring.",
xds_client());
}
return;
}
if (!xds_client()->route_config_name_.empty()) {
Unsubscribe(
XdsApi::kRdsTypeUrl, xds_client()->route_config_name_,
@ -924,10 +942,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
}
xds_client()->route_config_name_ = lds_update->route_config_name;
if (lds_update->rds_update.has_value()) {
// If cluster_name was found inlined in LDS response, notify the watcher
// immediately.
xds_client()->cluster_name_ =
lds_update->rds_update.value().routes[0].action_name;
// If the RouteConfiguration was found inlined in LDS response, notify the
// watcher immediately.
RefCountedPtr<ServiceConfig> service_config;
grpc_error* error = xds_client()->CreateServiceConfig(
lds_update->rds_update.value(), &service_config);
@ -954,24 +970,10 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
"RDS update does not include requested resource"));
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] RDS update received: cluster_name=%s",
xds_client(), rds_update->routes[0].action_name.c_str());
}
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
auto& state =
rds_state.subscribed_resources[xds_client()->route_config_name_];
if (state != nullptr) state->Finish();
// Ignore identical update.
if (xds_client()->cluster_name_ == rds_update->routes[0].action_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] RDS update identical to current, ignoring.",
xds_client());
}
return;
}
xds_client()->cluster_name_ = rds_update->routes[0].action_name;
// Notify the watcher.
RefCountedPtr<ServiceConfig> service_config;
grpc_error* error =
@ -2043,85 +2045,46 @@ void XdsClient::ResetBackoff() {
}
}
char* XdsClient::CreateServiceConfigActionCluster(
const std::string& cluster_name, const bool without_comma) const {
const char* last_line = without_comma ? "}" : "},";
char* json;
gpr_asprintf(&json,
" { \"cds\": \"%s\",\n"
" \"child_policy\":[\n"
" { \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" } }\n"
" ]\n"
" %s\n",
cluster_name.c_str(), cluster_name.c_str(), last_line);
return json;
}
char* XdsClient::CreateServiceConfigRoute(const std::string& cluster_name,
const std::string& service,
const std::string& method,
const bool without_comma) const {
const char* last_line = without_comma ? "}" : "},";
char* json;
gpr_asprintf(&json,
" { \"methodName\":\n"
" { \"service\": \"%s\",\n"
" \"method\": \"%s\"},\n"
" \"action\": \"%s\"\n"
" %s\n",
service.c_str(), method.c_str(), cluster_name.c_str(),
last_line);
return json;
}
grpc_error* XdsClient::CreateServiceConfig(
const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config) const {
gpr_strvec v;
gpr_strvec_init(&v);
char* json_start;
gpr_asprintf(&json_start,
"{\n"
std::vector<std::string> v;
std::string json_start =
("{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_routing_experimental\":{\n"
" \"actions\":[\n");
gpr_strvec_add(&v, json_start);
" \"actions\":{\n");
v.push_back(std::move(json_start));
std::vector<std::string> actions_vector;
for (size_t i = 0; i < rds_update.routes.size(); ++i) {
auto route = rds_update.routes[i];
// TODO: (donnadionne) CreateServiceConfigActionWeightedTarget
char* action = CreateServiceConfigActionCluster(
route.action_name.c_str(), i == (rds_update.routes.size() - 1));
gpr_strvec_add(&v, action);
}
char* json_transition;
gpr_asprintf(&json_transition,
" ],\n"
actions_vector.push_back(
CreateServiceConfigActionCluster(route.cluster_name.c_str()));
}
v.push_back(absl::StrJoin(actions_vector, ","));
std::string json_transition =
(" },\n"
" \"routes\":[\n");
gpr_strvec_add(&v, json_transition);
v.push_back(std::move(json_transition));
std::vector<std::string> routes_vector;
for (size_t i = 0; i < rds_update.routes.size(); ++i) {
auto route_info = rds_update.routes[i];
char* route = CreateServiceConfigRoute(
route_info.action_name.c_str(), route_info.service.c_str(),
route_info.method.c_str(), i == (rds_update.routes.size() - 1));
gpr_strvec_add(&v, route);
routes_vector.push_back(CreateServiceConfigRoute(
route_info.cluster_name.c_str(), route_info.service.c_str(),
route_info.method.c_str()));
}
char* json_end;
gpr_asprintf(&json_end,
" ]\n"
v.push_back(absl::StrJoin(routes_vector, ","));
std::string json_end =
(" ]\n"
" } }\n"
" ]\n"
"}");
gpr_strvec_add(&v, json_end);
size_t len;
char* json = gpr_strvec_flatten(&v, &len);
gpr_strvec_destroy(&v);
v.push_back(std::move(json_end));
std::string json = absl::StrJoin(v, "");
grpc_error* error = GRPC_ERROR_NONE;
*service_config = ServiceConfig::Create(json, &error);
*service_config = ServiceConfig::Create(json.c_str(), &error);
gpr_log(GPR_INFO, "Built service config: \"%s\"",
service_config->get()->json_string().c_str());
gpr_free(json);
return error;
}

@ -218,13 +218,6 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// Sends an error notification to all watchers.
void NotifyOnError(grpc_error* error);
char* CreateServiceConfigActionCluster(
const std::string& cluster_name, const bool without_comma = false) const;
char* CreateServiceConfigRoute(const std::string& prefix,
const std::string& service,
const std::string& method,
const bool without_comma = false) const;
grpc_error* CreateServiceConfig(
const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config) const;
@ -254,7 +247,6 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
OrphanablePtr<ChannelState> chand_;
std::string route_config_name_;
std::string cluster_name_;
// One entry for each watched CDS resource.
std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
// One entry for each watched EDS resource.

@ -46,6 +46,8 @@ void grpc_lb_policy_cds_init(void);
void grpc_lb_policy_cds_shutdown(void);
void grpc_lb_policy_xds_init(void);
void grpc_lb_policy_xds_shutdown(void);
void grpc_lb_policy_xds_routing_init(void);
void grpc_lb_policy_xds_routing_shutdown(void);
void grpc_lb_policy_pick_first_init(void);
void grpc_lb_policy_pick_first_shutdown(void);
void grpc_lb_policy_round_robin_init(void);
@ -88,6 +90,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_cds_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_init,
grpc_lb_policy_xds_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_routing_init,
grpc_lb_policy_xds_routing_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,

@ -1946,15 +1946,9 @@ TEST_P(LdsTest, XdsRoutingPathMatching) {
// Change RDS resource to point to new cluster.
RouteConfiguration new_route_config =
balancers_[0]->ads_service()->default_route_config();
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_match()
->set_path("/grpc.testing.EchoTestService/Echo");
//->set_prefix("/dgrpc.testing.EchoTestService");
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route->mutable_match()->set_path("/grpc.testing.EchoTestService/Echo");
route->mutable_route()->set_cluster(kNewClusterName);
Listener listener =
balancers_[0]->ads_service()->BuildListener(new_route_config);
balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName);
@ -1989,14 +1983,9 @@ TEST_P(LdsTest, XdsRoutingPrefixMatching) {
// Change RDS resource to point to new cluster.
RouteConfiguration new_route_config =
balancers_[0]->ads_service()->default_route_config();
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_match()
->set_prefix("/grpc.testing.EchoTestService");
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route->mutable_match()->set_prefix("/grpc.testing.EchoTestService");
route->mutable_route()->set_cluster(kNewClusterName);
Listener listener =
balancers_[0]->ads_service()->BuildListener(new_route_config);
balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName);

Loading…
Cancel
Save