Fixing code review comments

reviewable/pr22280/r9
Donna Dionne 5 years ago
parent f3f11cc21c
commit 0c2f9565f4
  1. 198
      src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
  2. 6
      src/core/ext/filters/client_channel/xds/xds_api.cc
  3. 65
      src/core/ext/filters/client_channel/xds/xds_client.cc
  4. 16
      test/cpp/end2end/xds_end2end_test.cc

@ -228,8 +228,8 @@ XdsRoutingLb::PickResult XdsRoutingLb::RoutePicker::Pick(PickArgs args) {
for (const Route& route : route_table_) { for (const Route& route : route_table_) {
if ((path_elements[0] == route.matcher.service && if ((path_elements[0] == route.matcher.service &&
(path_elements[1] == route.matcher.method || (path_elements[1] == route.matcher.method ||
"" == route.matcher.method)) || route.matcher.method.empty())) ||
("" == route.matcher.service && "" == route.matcher.method)) { (route.matcher.service.empty() && route.matcher.method.empty())) {
return route.picker.get()->Pick(args); return route.picker.get()->Pick(args);
} }
} }
@ -237,8 +237,7 @@ XdsRoutingLb::PickResult XdsRoutingLb::RoutePicker::Pick(PickArgs args) {
result.type = PickResult::PICK_FAILED; result.type = PickResult::PICK_FAILED;
result.error = result.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds routing picker not given any picker; default " "xds routing picker: no matching route"),
"route not configured"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
return result; return result;
} }
@ -302,7 +301,7 @@ void XdsRoutingLb::UpdateLocked(UpdateArgs args) {
// Add or update the actions in the new config. // Add or update the actions in the new config.
for (const auto& p : config_->action_map()) { for (const auto& p : config_->action_map()) {
const std::string& name = p.first; const std::string& name = p.first;
RefCountedPtr<LoadBalancingPolicy::Config> config = p.second; const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
auto it = actions_.find(name); auto it = actions_.find(name);
if (it == actions_.end()) { if (it == actions_.end()) {
it = actions_.emplace(std::make_pair(name, nullptr)).first; it = actions_.emplace(std::make_pair(name, nullptr)).first;
@ -367,21 +366,20 @@ void XdsRoutingLb::UpdateStateLocked() {
switch (connectivity_state) { switch (connectivity_state) {
case GRPC_CHANNEL_READY: { case GRPC_CHANNEL_READY: {
RoutePicker::RouteTable route_table; RoutePicker::RouteTable route_table;
for (int i = 0; i < config_->route_table().size(); ++i) { for (const auto& config_route : config_->route_table()) {
RoutePicker::Route route; RoutePicker::Route route;
route.matcher = config_->route_table()[i].matcher; route.matcher = config_route.matcher;
auto it = actions_.find(config_->route_table()[i].action); route.picker = actions_[config_route.action]->picker_wrapper();
if (it != actions_.end()) { if (route.picker == nullptr) {
route.picker = it->second->picker_wrapper(); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
} else { gpr_log(GPR_INFO,
gpr_log(GPR_INFO, "[xds_routing_lb %p] child %s has not yet returned a "
"[xds_routing_lb %p] child policy may have mis-behaved and " "picker; creating a QueuePicker.",
"did not return a picker, creating a QueuePicker for %s", this, config_route.action.c_str());
this, config_->route_table()[i].action.c_str()); }
route.picker = MakeRefCounted<ChildPickerWrapper>( route.picker = MakeRefCounted<ChildPickerWrapper>(
config_->route_table()[i].action, config_route.action, absl::make_unique<QueuePicker>(
absl::make_unique<QueuePicker>( Ref(DEBUG_LOCATION, "QueuePicker")));
Ref(DEBUG_LOCATION, "QueuePicker")));
} }
route_table.push_back(std::move(route)); route_table.push_back(std::move(route));
} }
@ -625,7 +623,7 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
std::vector<grpc_error*> error_list; std::vector<grpc_error*> error_list;
// action map. // action map.
XdsRoutingLbConfig::ActionMap action_map; XdsRoutingLbConfig::ActionMap action_map;
std::set<std::string /*action_name*/> action_in_use_set; std::set<std::string /*action_name*/> action_in_use;
auto it = json.object_value().find("actions"); auto it = json.object_value().find("actions");
if (it == json.object_value().end()) { if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@ -649,6 +647,7 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
error_list.push_back(error); error_list.push_back(error);
} else { } else {
action_map[p.first] = std::move(child_config); action_map[p.first] = std::move(child_config);
action_in_use.insert(p.first);
} }
} }
} }
@ -667,63 +666,26 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
} else { } else {
const Json::Array& array = it->second.array_value(); const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) { for (size_t i = 0; i < array.size(); ++i) {
const Json& element = array[i]; XdsRoutingLbConfig::Route route;
if (element.type() != Json::Type::OBJECT) { std::vector<grpc_error*> route_errors = ParseRoute(array[i], &route);
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( if (!route_errors.empty()) {
absl::StrCat("filed: routes element: ", i, grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
" should be of type object") absl::StrCat("field:routes element: ", i, " error").c_str());
.c_str())); for (grpc_error* route_error : route_errors) {
} else { error = grpc_error_add_child(error, route_error);
XdsRoutingLbConfig::Route route;
// Parse MethodName.
auto it = element.object_value().find("methodName");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
absl::StrCat("field:routes element: ", i,
" methodName is required")
.c_str()));
} else if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
absl::StrCat("field:routes element: ", i,
" methodName type should be object")
.c_str()));
} else {
std::vector<grpc_error*> route_errors =
ParseRouteConfig(it->second, &route.matcher);
if (!route_errors.empty()) {
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("field:routes element: ", i, " error").c_str());
for (grpc_error* route_error : route_errors) {
error = grpc_error_add_child(error, route_error);
}
error_list.push_back(error);
}
}
// Parse action.
it = element.object_value().find("action");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
absl::StrCat("field:routes element: ", i, " action is required")
.c_str()));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
absl::StrCat("field:routes element: ", i,
" action type should be string")
.c_str()));
} else {
route.action = it->second.string_value();
}
// Validate action exists and mark it as used.
if (action_map.find(route.action) == action_map.end()) {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
absl::StrCat("action ", route.action, " does not exist")
.c_str());
error_list.push_back(error);
} else {
action_in_use_set.insert(route.action);
} }
route_table.emplace_back(std::move(route)); error_list.push_back(error);
}
// Validate action exists and mark it as used.
if (action_map.find(route.action) == action_map.end()) {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
absl::StrCat("field: routes element: ", i, " error: action ",
route.action, " does not exist")
.c_str());
error_list.push_back(error);
} }
action_in_use.erase(route.action);
route_table.emplace_back(std::move(route));
} }
} }
if (route_table.size() == 0) { if (route_table.size() == 0) {
@ -731,11 +693,15 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid routes configured"); GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid routes configured");
error_list.push_back(error); error_list.push_back(error);
} }
for (const auto& action : action_map) { if (!(route_table[route_table.size() - 1].matcher.service.empty() &&
if (action_in_use_set.find(action.first) == action_in_use_set.end()) { route_table[route_table.size() - 1].matcher.method.empty())) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
absl::StrCat("action ", action.first, " is never used").c_str())); "default route must not contain service or method");
} error_list.push_back(error);
}
if (!action_in_use.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"some actions were not referenced by any route"));
} }
if (!error_list.empty()) { if (!error_list.empty()) {
*error = GRPC_ERROR_CREATE_FROM_VECTOR( *error = GRPC_ERROR_CREATE_FROM_VECTOR(
@ -775,35 +741,77 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
return error_list; return error_list;
} }
static std::vector<grpc_error*> ParseRouteConfig( static std::vector<grpc_error*> ParseMethodName(
const Json& json, XdsRoutingLbConfig::Matcher* route_config) { const Json& json, XdsRoutingLbConfig::Matcher* route_config) {
std::vector<grpc_error*> error_list; std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:methodName should be of type object"));
return error_list;
}
// Parse service // Parse service
auto it = json.object_value().find("service"); auto it = json.object_value().find("service");
if (it == json.object_value().end()) { if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:service error: should be string"));
} else {
route_config->service = it->second.string_value();
}
}
// Parse method
it = json.object_value().find("method");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:method error: should be string"));
} else {
route_config->method = it->second.string_value();
}
}
if (route_config->service.empty() && !route_config->method.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:service error: required field not present")); "field:methodName error: service is empty when method is "
} else if (it->second.type() != Json::Type::STRING) { "not"));
}
return error_list;
}
static std::vector<grpc_error*> ParseRoute(const Json& json,
XdsRoutingLbConfig::Route* route) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:service error: should be string")); "field:route element should be of type object"));
return error_list;
}
// Parse MethodName.
auto it = json.object_value().find("methodName");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes element: methodName is required"));
} else { } else {
route_config->service = it->second.string_value(); std::vector<grpc_error*> route_errors =
ParseMethodName(it->second, &route->matcher);
if (!route_errors.empty()) {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_COPIED_STRING("field:route element error");
for (grpc_error* route_error : route_errors) {
error = grpc_error_add_child(error, route_error);
}
error_list.push_back(error);
}
} }
// Parse method // Parse action.
it = json.object_value().find("method"); it = json.object_value().find("action");
if (it == json.object_value().end()) { if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:method error: required field not present")); "field:route element: action is required"));
} else if (it->second.type() != Json::Type::STRING) { } else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:method error: should be string")); "field:route element error action should be of type string"));
} else { } else {
route_config->method = it->second.string_value(); route->action = it->second.string_value();
}
if (route_config->service == "" && route_config->method != "") {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:methodName error: service is empty when method is "
"not"));
} }
return error_list; return error_list;
} }

@ -1045,6 +1045,12 @@ grpc_error* RouteConfigParse(
"Path is not empty string, prefix cannot also be non-empty."); "Path is not empty string, prefix cannot also be non-empty.");
} }
} }
if (i == (size - 1)) {
if (!(rds_route.service.empty() && rds_route.method.empty())) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Default route must have empty service and method");
}
}
if (!envoy_api_v2_route_Route_has_route(route)) { if (!envoy_api_v2_route_Route_has_route(route)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING( return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No RouteAction found in route."); "No RouteAction found in route.");

@ -903,6 +903,10 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
? lds_update->route_config_name.c_str() ? lds_update->route_config_name.c_str()
: "<inlined>")); : "<inlined>"));
if (lds_update->rds_update.has_value()) { if (lds_update->rds_update.has_value()) {
gpr_log(GPR_INFO,
" [xds_client %p] LDS update received; LDS RouteConfiguration "
"contains %lu routes",
this, lds_update->rds_update.value().routes.size());
for (const auto& route : lds_update->rds_update.value().routes) { for (const auto& route : lds_update->rds_update.value().routes) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
" route: { service=\"%s\", " " route: { service=\"%s\", "
@ -960,15 +964,18 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
GRPC_ERROR_CREATE_FROM_STATIC_STRING( GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RDS update does not include requested resource")); "RDS update does not include requested resource"));
return; return;
} else { }
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
for (const auto& route : rds_update.value().routes) { gpr_log(GPR_INFO,
gpr_log(GPR_INFO, "[xds_client %p] RDS update received; RouteConfiguration contains "
" route: { service=\"%s\", " "%lu routes",
"method=\"%s\" }, cluster=\"%s\" }", this, rds_update.value().routes.size());
route.service.c_str(), route.method.c_str(), for (const auto& route : rds_update.value().routes) {
route.cluster_name.c_str()); gpr_log(GPR_INFO,
} " route: { service=\"%s\", "
"method=\"%s\" }, cluster=\"%s\" }",
route.service.c_str(), route.method.c_str(),
route.cluster_name.c_str());
} }
} }
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
@ -2048,31 +2055,29 @@ void XdsClient::ResetBackoff() {
namespace { namespace {
std::string CreateServiceConfigActionCluster(const std::string& cluster_name) { std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
std::string json = absl::StrFormat( return (
" \"cds:%s\":{\n" absl::StrFormat(" \"cds:%s\":{\n"
" \"child_policy\":[ {\n" " \"child_policy\":[ {\n"
" \"cds_experimental\":{\n" " \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n" " \"cluster\": \"%s\"\n"
" }\n" " }\n"
" } ]\n" " } ]\n"
" }", " }",
cluster_name.c_str(), cluster_name.c_str()); cluster_name.c_str(), cluster_name.c_str()));
return json;
} }
std::string CreateServiceConfigRoute(const std::string& cluster_name, std::string CreateServiceConfigRoute(const std::string& cluster_name,
const std::string& service, const std::string& service,
const std::string& method) { const std::string& method) {
std::string json = absl::StrFormat( return (
" { \n" absl::StrFormat(" { \n"
" \"methodName\": {\n" " \"methodName\": {\n"
" \"service\": \"%s\",\n" " \"service\": \"%s\",\n"
" \"method\": \"%s\"\n" " \"method\": \"%s\"\n"
" },\n" " },\n"
" \"action\": \"cds:%s\"\n" " \"action\": \"cds:%s\"\n"
" }", " }",
service.c_str(), method.c_str(), cluster_name.c_str()); service.c_str(), method.c_str(), cluster_name.c_str()));
return json;
} }
} // namespace } // namespace
@ -2111,8 +2116,6 @@ grpc_error* XdsClient::CreateServiceConfig(
std::string json = absl::StrJoin(config_parts, ""); std::string json = absl::StrJoin(config_parts, "");
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
*service_config = ServiceConfig::Create(json.c_str(), &error); *service_config = ServiceConfig::Create(json.c_str(), &error);
gpr_log(GPR_INFO, "Built service config: \"%s\"",
service_config->get()->json_string().c_str());
return error; return error;
} }

@ -2155,8 +2155,8 @@ TEST_P(LdsTest, DefaultRouteInvalid) {
AdsServiceImpl::NACKED); AdsServiceImpl::NACKED);
} }
// Tests that LDS client should send a ACK if route match has non-empty prefix // Tests that LDS client should send a NACK if route match has non-empty prefix
// in the LDS response. // as the only route (default) in the LDS response.
TEST_P(LdsTest, RouteMatchHasNonemptyPrefix) { TEST_P(LdsTest, RouteMatchHasNonemptyPrefix) {
RouteConfiguration route_config = RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config(); balancers_[0]->ads_service()->default_route_config();
@ -2170,7 +2170,7 @@ TEST_P(LdsTest, RouteMatchHasNonemptyPrefix) {
SetNextResolutionForLbChannelAllBalancers(); SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure(); CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(), EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::ACKED); AdsServiceImpl::NACKED);
} }
// Tests that LDS client should send a NACK if route has an action other than // Tests that LDS client should send a NACK if route has an action other than
@ -2321,6 +2321,10 @@ TEST_P(LdsTest, XdsRoutingPrefixMatching) {
auto* matched_route = new_route_config.mutable_virtual_hosts(0)->add_routes(); auto* matched_route = new_route_config.mutable_virtual_hosts(0)->add_routes();
matched_route->mutable_match()->set_prefix("/grpc.testing.EchoTestService"); matched_route->mutable_match()->set_prefix("/grpc.testing.EchoTestService");
matched_route->mutable_route()->set_cluster(kNewCluster2Name); matched_route->mutable_route()->set_cluster(kNewCluster2Name);
auto* default_route = new_route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_match()->set_path("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
Listener listener = Listener listener =
balancers_[0]->ads_service()->BuildListener(new_route_config); balancers_[0]->ads_service()->BuildListener(new_route_config);
balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName); balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName);
@ -2471,8 +2475,8 @@ TEST_P(RdsTest, DefaultRouteInvalid) {
AdsServiceImpl::NACKED); AdsServiceImpl::NACKED);
} }
// Tests that RDS client should send a ACK if route match has non-empty prefix // Tests that RDS client should send a NACK if route match has non-empty prefix
// in the RDS response. // as the only route (default) in the RDS response.
TEST_P(RdsTest, RouteMatchHasNonemptyPrefix) { TEST_P(RdsTest, RouteMatchHasNonemptyPrefix) {
balancers_[0]->ads_service()->SetLdsToUseDynamicRds(); balancers_[0]->ads_service()->SetLdsToUseDynamicRds();
RouteConfiguration route_config = RouteConfiguration route_config =
@ -2487,7 +2491,7 @@ TEST_P(RdsTest, RouteMatchHasNonemptyPrefix) {
SetNextResolutionForLbChannelAllBalancers(); SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure(); CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->rds_response_state(), EXPECT_EQ(balancers_[0]->ads_service()->rds_response_state(),
AdsServiceImpl::ACKED); AdsServiceImpl::NACKED);
} }
// Tests that RDS client should send a NACK if route has an action other than // Tests that RDS client should send a NACK if route has an action other than

Loading…
Cancel
Save