diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc index 2f409d6746b..f23db70bdb4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc @@ -22,6 +22,7 @@ #include "absl/strings/str_cat.h" #include "absl/strings/str_split.h" +#include "absl/strings/string_view.h" #include @@ -52,24 +53,26 @@ class XdsRoutingLbConfig : public LoadBalancingPolicy::Config { struct ChildConfig { RefCountedPtr config; }; - - using Matcher = std::pair; - using RouteVector = std::vector>; + struct Matcher { + std::string service; + std::string method; + }; + using RouteTable = std::vector>; using ActionMap = std::map; - explicit XdsRoutingLbConfig(ActionMap action_map, RouteVector route_vector) + XdsRoutingLbConfig(ActionMap action_map, RouteTable route_table) : action_map_(std::move(action_map)), - route_vector_(std::move(route_vector)) {} + route_table_(std::move(route_table)) {} const char* name() const override { return kXdsRouting; } const ActionMap& action_map() const { return action_map_; } - const RouteVector& route_vector() const { return route_vector_; } + const RouteTable& route_table() const { return route_table_; } private: ActionMap action_map_; - RouteVector route_vector_; + RouteTable route_table_; }; // xds_routing LB policy. @@ -87,39 +90,37 @@ class XdsRoutingLb : public LoadBalancingPolicy { // A simple wrapper for ref-counting a picker from the child policy. class ChildPickerWrapper : public RefCounted { public: - explicit ChildPickerWrapper(const std::string& name, - std::unique_ptr picker) - : name_(name), picker_(std::move(picker)) {} + ChildPickerWrapper(std::string name, + std::unique_ptr picker) + : name_(std::move(name)), picker_(std::move(picker)) {} PickResult Pick(PickArgs args) { return picker_->Pick(std::move(args)); } - std::string name() { return name_; } + const std::string& name() { return name_; } private: std::string name_; std::unique_ptr picker_; }; - // Picks a child using stateless WRR and then delegates to that + // Picks a child using prefix or path matching and then delegates to that // child's picker. - class XdsRoutingPicker : public SubchannelPicker { + class RoutePicker : public SubchannelPicker { public: - // Maintains a xds_routing list of pickers from each child that is in - // ready state. The first element in the pair represents the end of a - // range proportional to the child's weight. The start of the range - // is the previous value in the vector and is 0 for the first element. - using PickerList = InlinedVector, 1>; + struct Route { + XdsRoutingLbConfig::Matcher matcher; + RefCountedPtr picker; + }; - using PickerMap = std::map>; + // Maintains an ordered xds route table as provided by RDS response. + using RouteTable = std::vector; - XdsRoutingPicker(RefCountedPtr parent, PickerMap pickers) - : parent_(std::move(parent)), pickers_(std::move(pickers)) {} - ~XdsRoutingPicker() { parent_.reset(DEBUG_LOCATION, "XdsRoutingPicker"); } + RoutePicker(RouteTable route_table) + : route_table_(std::move(route_table)) {} PickResult Pick(PickArgs args) override; private: - RefCountedPtr parent_; - PickerMap pickers_; + RouteTable route_table_; }; // Each XdsRoutingChild holds a ref to its parent XdsRoutingLb. @@ -145,8 +146,6 @@ class XdsRoutingLb : public LoadBalancingPolicy { return picker_wrapper_; } - std::string name() const { return name_; } - private: class Helper : public ChannelControlHelper { public: @@ -211,35 +210,29 @@ class XdsRoutingLb : public LoadBalancingPolicy { }; // -// XdsRoutingLb::XdsRoutingPicker +// XdsRoutingLb::RoutePicker // -XdsRoutingLb::PickResult XdsRoutingLb::XdsRoutingPicker::Pick(PickArgs args) { - std::string path; +XdsRoutingLb::PickResult XdsRoutingLb::RoutePicker::Pick(PickArgs args) { + absl::string_view path; for (const auto& p : *(args.initial_metadata)) { - if (memcmp(p.first.data(), ":path", static_cast(p.first.size())) == - 0) { - path = std::string(p.second.data(), static_cast(p.second.size())); + if (p.first == ":path") { + path = p.second; break; } } - std::vector v = absl::StrSplit(path, '/'); - GPR_DEBUG_ASSERT(v.size() == 3); - std::string service = v[1]; - std::string method = v[2]; - for (int i = 0; i < parent_->config_->route_vector().size(); ++i) { - if (service == parent_->config_->route_vector()[i].first.first && - ("" == parent_->config_->route_vector()[i].first.second || - method == parent_->config_->route_vector()[i].first.second)) { - auto picker = pickers_.find(parent_->config_->route_vector()[i].second); - if (picker != pickers_.end()) { - gpr_log(GPR_INFO, "XdsRouting Picked: %s for path %s", - picker->first.c_str(), path.c_str()); - return picker->second.get()->Pick(args); + std::vector v = absl::StrSplit(path.substr(1), '/'); + for (int i = 0; i < route_table_.size(); ++i) { + if (v[0] == route_table_[i].matcher.service && + ("" == route_table_[i].matcher.method || + v[1] == route_table_[i].matcher.method)) { + auto picker = route_table_[i].picker; + if (picker != nullptr) { + return picker.get()->Pick(args); } } } - return pickers_.begin()->second.get()->Pick(args); + return route_table_[route_table_.size() - 1].picker.get()->Pick(args); } // @@ -313,11 +306,7 @@ void XdsRoutingLb::UpdateLocked(UpdateArgs args) { } void XdsRoutingLb::UpdateStateLocked() { - // Construct a new picker which maintains a map of all child pickers - // that are ready. Each child is represented by a portion of the range - // proportional to its weight, such that the total range is the sum of the - // weights of all children. - XdsRoutingPicker::PickerMap picker_map; + std::map> picker_map; // Also count the number of children in each state, to determine the // overall state. size_t num_connecting = 0; @@ -367,10 +356,19 @@ void XdsRoutingLb::UpdateStateLocked() { ConnectivityStateName(connectivity_state)); } std::unique_ptr picker; + RoutePicker::RouteTable route_table; switch (connectivity_state) { case GRPC_CHANNEL_READY: - picker = absl::make_unique( - Ref(DEBUG_LOCATION, "XdsRoutingPicker"), std::move(picker_map)); + for (int i = 0; i < config_->route_table().size(); ++i) { + RoutePicker::Route route; + route.matcher = config_->route_table()[i].first; + auto child_picker = picker_map.find(config_->route_table()[i].second); + if (child_picker != picker_map.end()) { + route.picker = child_picker->second; + } + route_table.push_back(std::move(route)); + } + picker = absl::make_unique(std::move(route_table)); break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: @@ -460,6 +458,7 @@ void XdsRoutingLb::XdsRoutingChild::UpdateLocked( // Update child weight. // Reactivate if needed. if (delayed_removal_timer_callback_pending_) { + delayed_removal_timer_callback_pending_ = false; grpc_timer_cancel(&delayed_removal_timer_); } // Create child policy if needed. @@ -530,7 +529,6 @@ void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimerLocked( RefCountedPtr XdsRoutingLb::XdsRoutingChild::Helper::CreateSubchannel( const grpc_channel_args& args) { - gpr_log(GPR_INFO, "XdsRoutingChild::Helper::CreateSubchannel"); if (xds_routing_child_->xds_routing_policy_->shutting_down_) return nullptr; return xds_routing_child_->xds_routing_policy_->channel_control_helper() ->CreateSubchannel(args); @@ -538,12 +536,15 @@ XdsRoutingLb::XdsRoutingChild::Helper::CreateSubchannel( void XdsRoutingLb::XdsRoutingChild::Helper::UpdateState( grpc_connectivity_state state, std::unique_ptr picker) { - gpr_log(GPR_INFO, "XdsRoutingChild::Helper::UpdateState %s", - xds_routing_child_->name().c_str()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { + gpr_log(GPR_INFO, + "XdsRoutingChild::Helper::UpdateState child %s, state %d, piker %p", + xds_routing_child_->name_.c_str(), state, picker.get()); + } if (xds_routing_child_->xds_routing_policy_->shutting_down_) return; // Cache the picker in the XdsRoutingChild. xds_routing_child_->picker_wrapper_ = MakeRefCounted( - xds_routing_child_->name(), std::move(picker)); + xds_routing_child_->name_, std::move(picker)); // Decide what state to report for aggregation purposes. // If we haven't seen a failure since the last time we were in state // READY, then we report the state change as-is. However, once we do see @@ -607,87 +608,67 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { if (it == json.object_value().end()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:actions error:required field not present")); - } else if (it->second.type() != Json::Type::ARRAY) { + } else if (it->second.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:actions error:type should be array")); + "field:actions error:type should be object")); } else { - for (const auto& p : it->second.array_value()) { - auto it_cds = p.object_value().find("cds"); - auto it_weighted_target = p.object_value().find("weighted_target"); - if (it_cds == p.object_value().end() && - it_weighted_target == p.object_value().end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:actions error: each action needs to be either cds or " - "weighted target")); - } - auto it_name = - (it_cds == p.object_value().end() ? it_weighted_target : it_cds); - auto it_child_policy = p.object_value().find("child_policy"); - if (it_child_policy == p.object_value().end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:actions error: each action needs child policies")); - } + for (const auto& p : it->second.object_value()) { XdsRoutingLbConfig::ChildConfig child_config; std::vector child_errors = - ParseChildConfig(it_child_policy->second, &child_config); + ParseChildConfig(p.second, &child_config); if (!child_errors.empty()) { // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error // string is not static in this case. grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:actions name:", - it_name->second.string_value()) - .c_str()); + absl::StrCat("field:actions name:", p.first).c_str()); for (grpc_error* child_error : child_errors) { error = grpc_error_add_child(error, child_error); } error_list.push_back(error); } else { - action_map[it_name->second.string_value()] = std::move(child_config); + action_map[p.first] = std::move(child_config); } } } - XdsRoutingLbConfig::RouteVector route_vector; - auto route_iter = json.object_value().find("routes"); - if (route_iter == json.object_value().end()) { - gpr_log(GPR_INFO, "No routes specified"); - } else if (route_iter->second.type() != Json::Type::ARRAY) { + XdsRoutingLbConfig::RouteTable route_table; + it = json.object_value().find("routes"); + if (it == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:routes error:required field not present")); + } else if (it->second.type() != Json::Type::ARRAY) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:routes error:type should be array")); } else { - for (const auto& p : route_iter->second.array_value()) { - auto method_name = p.object_value().find("methodName"); - if (method_name == p.object_value().end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:routes error:methodName is required")); - } else { - auto action_name = p.object_value().find("action"); - if (action_name == p.object_value().end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:routes error:action is required")); - } else { - XdsRoutingLbConfig::Matcher matcher; - auto service = method_name->second.object_value().find("service"); - auto method = method_name->second.object_value().find("method"); - if (service == method_name->second.object_value().end() && - method != method_name->second.object_value().end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:methodName error: service is empty when method is " - "not")); - } - if (service != method_name->second.object_value().end()) { - matcher.first = service->second.string_value(); - } else { - matcher.first = ""; - } - if (method != method_name->second.object_value().end()) { - matcher.second = method->second.string_value(); - } else { - matcher.first = ""; - } - route_vector.emplace_back(matcher, - action_name->second.string_value()); + for (const auto& route : it->second.array_value()) { + // Parse methodName. + XdsRoutingLbConfig::Matcher matcher; + std::vector route_errors = + ParseRouteConfig(route.object_value(), &matcher); + if (!route_errors.empty()) { + // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error + // string is not static in this case. + grpc_error* error = + GRPC_ERROR_CREATE_FROM_COPIED_STRING("field:routes error"); + for (grpc_error* route_error : route_errors) { + error = grpc_error_add_child(error, route_error); } + error_list.push_back(error); } + // Parse action. + std::string cluster_name; + std::vector action_errors = + ParseActionConfig(route.object_value(), &cluster_name); + if (!action_errors.empty()) { + // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error + // string is not static in this case. + grpc_error* error = + GRPC_ERROR_CREATE_FROM_COPIED_STRING("field:actions error:"); + for (grpc_error* action_error : action_errors) { + error = grpc_error_add_child(error, action_error); + } + error_list.push_back(error); + } + route_table.emplace_back(std::move(matcher), std::move(cluster_name)); } } if (!error_list.empty()) { @@ -696,28 +677,92 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { return nullptr; } return MakeRefCounted(std::move(action_map), - std::move(route_vector)); + std::move(route_table)); } private: static std::vector ParseChildConfig( const Json& json, XdsRoutingLbConfig::ChildConfig* child_config) { std::vector error_list; - if (json.type() != Json::Type::ARRAY) { + if (json.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "value should be of type array")); + "value should be of type object")); return error_list; } - grpc_error* parse_error = GRPC_ERROR_NONE; - child_config->config = - LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - json.array_value(), &parse_error); - if (child_config->config == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - std::vector child_errors; - child_errors.push_back(parse_error); + auto it = json.object_value().find("child_policy"); + if (it != json.object_value().end()) { + grpc_error* parse_error = GRPC_ERROR_NONE; + child_config->config = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(it->second, + &parse_error); + if (child_config->config == nullptr) { + GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); + std::vector child_errors; + child_errors.push_back(parse_error); + error_list.push_back( + GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); + } + } else { error_list.push_back( - GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy")); + } + return error_list; + } + + static std::vector ParseRouteConfig( + const Json& json, XdsRoutingLbConfig::Matcher* route_config) { + std::vector error_list; + if (json.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "value should be of type object")); + return error_list; + } + auto method_name = json.object_value().find("methodName"); + if (method_name == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:routes error:methodName is required")); + } else if (method_name->second.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:routes error:methodName error: type should be object")); + } else { + auto service = method_name->second.object_value().find("service"); + auto method = method_name->second.object_value().find("method"); + if (service != method_name->second.object_value().end()) { + route_config->service = service->second.string_value(); + } else { + route_config->service = ""; + } + if (method != method_name->second.object_value().end()) { + route_config->method = method->second.string_value(); + } else { + route_config->method = ""; + } + if ((route_config->service == "") && (route_config->method != "")) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:methodName error: service is empty when method is " + "not")); + } + } + return error_list; + } + + static std::vector ParseActionConfig(const Json& json, + std::string* cluster_name) { + std::vector error_list; + if (json.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "value should be of type object")); + return error_list; + } + auto action_name = json.object_value().find("action"); + if (action_name == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:routes error:action is required")); + } else if (action_name->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:methodName error:type should be string")); + } else { + *cluster_name = action_name->second.string_value(); } return error_list; } diff --git a/src/core/ext/filters/client_channel/xds/xds_api.cc b/src/core/ext/filters/client_channel/xds/xds_api.cc index b8a07c95bf0..76558647ee2 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.cc +++ b/src/core/ext/filters/client_channel/xds/xds_api.cc @@ -1012,7 +1012,6 @@ grpc_error* RouteConfigParse( return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "No route found in the virtual host."); } - for (size_t i = 0; i < size; ++i) { const envoy_api_v2_route_Route* route = routes[i]; const envoy_api_v2_route_RouteMatch* match = @@ -1020,28 +1019,28 @@ grpc_error* RouteConfigParse( XdsApi::RdsRoute rds_route; const upb_strview prefix = envoy_api_v2_route_RouteMatch_prefix(match); const upb_strview path = envoy_api_v2_route_RouteMatch_path(match); - if (!upb_strview_eql(prefix, upb_strview_makez(""))) { - std::string prefix_string = std::string(prefix.data, prefix.size); - std::vector v = absl::StrSplit(prefix_string, '/'); - if (v.size() != 2) { + if (prefix.size > 0) { + std::vector v = absl::StrSplit( + absl::string_view(prefix.data, prefix.size).substr(1), '/'); + if (v.size() != 1) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Prefix not in the required format of /service/"); } - rds_route.service = v[1]; - if (!upb_strview_eql(path, upb_strview_makez(""))) { + rds_route.service = std::string(v[0].data(), v[0].size()); + if (path.size > 0) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Prefix is not empty string, path cannot also be non-empty."); } - } else if (!upb_strview_eql(path, upb_strview_makez(""))) { - std::string path_string = std::string(path.data, path.size); - std::vector v = absl::StrSplit(path_string, '/'); - if (v.size() != 3) { + } else if (path.size > 0) { + std::vector v = absl::StrSplit( + absl::string_view(path.data, path.size).substr(1), '/'); + if (v.size() != 2) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Path not in the required format of /service/method"); } - rds_route.service = v[1]; - rds_route.method = v[2]; - if (!upb_strview_eql(prefix, upb_strview_makez(""))) { + rds_route.service = std::string(v[0].data(), v[0].size()); + rds_route.method = std::string(v[1].data(), v[1].size()); + if (prefix.size > 0) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Path is not empty string, prefix cannot also be non-empty."); } @@ -1059,13 +1058,12 @@ grpc_error* RouteConfigParse( } const upb_strview action = envoy_api_v2_route_RouteAction_cluster(route_action); - rds_route.action_name = std::string(action.data, action.size); + rds_route.cluster_name = std::string(action.data, action.size); rds_update->routes.emplace_back(std::move(rds_route)); - gpr_log(GPR_INFO, "RouteConfigParse a route %s %s %s %s", + gpr_log(GPR_INFO, "RouteConfigParse a route %s %s %s", rds_update->routes[i].service.c_str(), rds_update->routes[i].method.c_str(), - rds_update->routes[i].action_type.c_str(), - rds_update->routes[i].action_name.c_str()); + rds_update->routes[i].cluster_name.c_str()); } return GRPC_ERROR_NONE; } diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index 175f2d3a28a..caa2c748a97 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -46,8 +46,7 @@ class XdsApi { struct RdsRoute { std::string service; std::string method; - std::string action_type; - std::string action_name; + std::string cluster_name; }; struct RdsUpdate { diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 0e9cd5c0c29..c9679da7b42 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -22,6 +22,7 @@ #include #include +#include "absl/strings/str_format.h" #include "absl/strings/str_join.h" #include @@ -70,6 +71,35 @@ namespace grpc_core { TraceFlag grpc_xds_client_trace(false, "xds_client"); +namespace { + +std::string CreateServiceConfigActionCluster(const std::string& cluster_name) { + std::string json = absl::StrFormat( + " \"cds:%s\":{\n" + " \"child_policy\":[\n" + " { \"cds_experimental\":{\n" + " \"cluster\": \"%s\"\n" + " } }\n" + " ]\n" + " }", + cluster_name.c_str(), cluster_name.c_str()); + return json; +} + +std::string CreateServiceConfigRoute(const std::string& cluster_name, + const std::string& service, + const std::string& method) { + std::string json = absl::StrFormat( + " { \"methodName\":\n" + " { \"service\": \"%s\",\n" + " \"method\": \"%s\"},\n" + " \"action\": \"cds:%s\"\n" + " }", + service.c_str(), method.c_str(), cluster_name.c_str()); + return json; +} + +} // namespace // // Internal class declarations // @@ -888,35 +918,23 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( "LDS update does not include requested resource")); return; } - const std::string& cluster_name = - lds_update->rds_update.has_value() - ? lds_update->rds_update.value().routes[0].action_name - : ""; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, - "[xds_client %p] LDS update received: route_config_name=%s, " - "cluster_name=%s (empty if RDS is needed to obtain it)", - xds_client(), lds_update->route_config_name.c_str(), - cluster_name.c_str()); - for (auto route : lds_update->rds_update.value().routes) { - gpr_log(GPR_INFO, "Create service config using %s %s %s %s", - route.service.c_str(), route.method.c_str(), - route.action_type.c_str(), route.action_name.c_str()); + "[xds_client %p] LDS update received: route_config_name=%s", + xds_client(), lds_update->route_config_name.c_str()); + if (lds_update->rds_update.has_value()) { + for (const auto& route : lds_update->rds_update.value().routes) { + gpr_log(GPR_INFO, + "Create service config using route: { service=\"%s\", " + "method=\"%s\" }, cluster=\"%s\" }", + route.service.c_str(), route.method.c_str(), + route.cluster_name.c_str()); + } } } auto& lds_state = state_map_[XdsApi::kLdsTypeUrl]; auto& state = lds_state.subscribed_resources[xds_client()->server_name_]; if (state != nullptr) state->Finish(); - // Ignore identical update. - if (xds_client()->route_config_name_ == lds_update->route_config_name && - xds_client()->cluster_name_ == cluster_name) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] LDS update identical to current, ignoring.", - xds_client()); - } - return; - } if (!xds_client()->route_config_name_.empty()) { Unsubscribe( XdsApi::kRdsTypeUrl, xds_client()->route_config_name_, @@ -924,10 +942,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( } xds_client()->route_config_name_ = lds_update->route_config_name; if (lds_update->rds_update.has_value()) { - // If cluster_name was found inlined in LDS response, notify the watcher - // immediately. - xds_client()->cluster_name_ = - lds_update->rds_update.value().routes[0].action_name; + // If the RouteConfiguration was found inlined in LDS response, notify the + // watcher immediately. RefCountedPtr service_config; grpc_error* error = xds_client()->CreateServiceConfig( lds_update->rds_update.value(), &service_config); @@ -954,24 +970,10 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( "RDS update does not include requested resource")); return; } - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] RDS update received: cluster_name=%s", - xds_client(), rds_update->routes[0].action_name.c_str()); - } auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; auto& state = rds_state.subscribed_resources[xds_client()->route_config_name_]; if (state != nullptr) state->Finish(); - // Ignore identical update. - if (xds_client()->cluster_name_ == rds_update->routes[0].action_name) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] RDS update identical to current, ignoring.", - xds_client()); - } - return; - } - xds_client()->cluster_name_ = rds_update->routes[0].action_name; // Notify the watcher. RefCountedPtr service_config; grpc_error* error = @@ -2043,85 +2045,46 @@ void XdsClient::ResetBackoff() { } } -char* XdsClient::CreateServiceConfigActionCluster( - const std::string& cluster_name, const bool without_comma) const { - const char* last_line = without_comma ? "}" : "},"; - char* json; - gpr_asprintf(&json, - " { \"cds\": \"%s\",\n" - " \"child_policy\":[\n" - " { \"cds_experimental\":{\n" - " \"cluster\": \"%s\"\n" - " } }\n" - " ]\n" - " %s\n", - cluster_name.c_str(), cluster_name.c_str(), last_line); - return json; -} - -char* XdsClient::CreateServiceConfigRoute(const std::string& cluster_name, - const std::string& service, - const std::string& method, - const bool without_comma) const { - const char* last_line = without_comma ? "}" : "},"; - char* json; - gpr_asprintf(&json, - " { \"methodName\":\n" - " { \"service\": \"%s\",\n" - " \"method\": \"%s\"},\n" - " \"action\": \"%s\"\n" - " %s\n", - service.c_str(), method.c_str(), cluster_name.c_str(), - last_line); - return json; -} - grpc_error* XdsClient::CreateServiceConfig( const XdsApi::RdsUpdate& rds_update, RefCountedPtr* service_config) const { - gpr_strvec v; - gpr_strvec_init(&v); - char* json_start; - gpr_asprintf(&json_start, - "{\n" - " \"loadBalancingConfig\":[\n" - " { \"xds_routing_experimental\":{\n" - " \"actions\":[\n"); - gpr_strvec_add(&v, json_start); + std::vector v; + std::string json_start = + ("{\n" + " \"loadBalancingConfig\":[\n" + " { \"xds_routing_experimental\":{\n" + " \"actions\":{\n"); + v.push_back(std::move(json_start)); + std::vector actions_vector; for (size_t i = 0; i < rds_update.routes.size(); ++i) { auto route = rds_update.routes[i]; - // TODO: (donnadionne) CreateServiceConfigActionWeightedTarget - char* action = CreateServiceConfigActionCluster( - route.action_name.c_str(), i == (rds_update.routes.size() - 1)); - gpr_strvec_add(&v, action); - } - char* json_transition; - gpr_asprintf(&json_transition, - " ],\n" - " \"routes\":[\n"); - gpr_strvec_add(&v, json_transition); + actions_vector.push_back( + CreateServiceConfigActionCluster(route.cluster_name.c_str())); + } + v.push_back(absl::StrJoin(actions_vector, ",")); + std::string json_transition = + (" },\n" + " \"routes\":[\n"); + v.push_back(std::move(json_transition)); + std::vector routes_vector; for (size_t i = 0; i < rds_update.routes.size(); ++i) { auto route_info = rds_update.routes[i]; - char* route = CreateServiceConfigRoute( - route_info.action_name.c_str(), route_info.service.c_str(), - route_info.method.c_str(), i == (rds_update.routes.size() - 1)); - gpr_strvec_add(&v, route); - } - char* json_end; - gpr_asprintf(&json_end, - " ]\n" - " } }\n" - " ]\n" - "}"); - gpr_strvec_add(&v, json_end); - size_t len; - char* json = gpr_strvec_flatten(&v, &len); - gpr_strvec_destroy(&v); + routes_vector.push_back(CreateServiceConfigRoute( + route_info.cluster_name.c_str(), route_info.service.c_str(), + route_info.method.c_str())); + } + v.push_back(absl::StrJoin(routes_vector, ",")); + std::string json_end = + (" ]\n" + " } }\n" + " ]\n" + "}"); + v.push_back(std::move(json_end)); + std::string json = absl::StrJoin(v, ""); grpc_error* error = GRPC_ERROR_NONE; - *service_config = ServiceConfig::Create(json, &error); + *service_config = ServiceConfig::Create(json.c_str(), &error); gpr_log(GPR_INFO, "Built service config: \"%s\"", service_config->get()->json_string().c_str()); - gpr_free(json); return error; } diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index d29ac12e659..76a45c9b09f 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -218,13 +218,6 @@ class XdsClient : public InternallyRefCounted { // Sends an error notification to all watchers. void NotifyOnError(grpc_error* error); - char* CreateServiceConfigActionCluster( - const std::string& cluster_name, const bool without_comma = false) const; - char* CreateServiceConfigRoute(const std::string& prefix, - const std::string& service, - const std::string& method, - const bool without_comma = false) const; - grpc_error* CreateServiceConfig( const XdsApi::RdsUpdate& rds_update, RefCountedPtr* service_config) const; @@ -254,7 +247,6 @@ class XdsClient : public InternallyRefCounted { OrphanablePtr chand_; std::string route_config_name_; - std::string cluster_name_; // One entry for each watched CDS resource. std::map cluster_map_; // One entry for each watched EDS resource. diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc index bfed2e22ddd..bdc658b76d2 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc @@ -46,6 +46,8 @@ void grpc_lb_policy_cds_init(void); void grpc_lb_policy_cds_shutdown(void); void grpc_lb_policy_xds_init(void); void grpc_lb_policy_xds_shutdown(void); +void grpc_lb_policy_xds_routing_init(void); +void grpc_lb_policy_xds_routing_shutdown(void); void grpc_lb_policy_pick_first_init(void); void grpc_lb_policy_pick_first_shutdown(void); void grpc_lb_policy_round_robin_init(void); @@ -88,6 +90,8 @@ void grpc_register_built_in_plugins(void) { grpc_lb_policy_cds_shutdown); grpc_register_plugin(grpc_lb_policy_xds_init, grpc_lb_policy_xds_shutdown); + grpc_register_plugin(grpc_lb_policy_xds_routing_init, + grpc_lb_policy_xds_routing_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, grpc_lb_policy_pick_first_shutdown); grpc_register_plugin(grpc_lb_policy_round_robin_init, diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 15c594cdf8d..a710f24510e 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -1946,15 +1946,9 @@ TEST_P(LdsTest, XdsRoutingPathMatching) { // Change RDS resource to point to new cluster. RouteConfiguration new_route_config = balancers_[0]->ads_service()->default_route_config(); - new_route_config.mutable_virtual_hosts(0) - ->mutable_routes(0) - ->mutable_match() - ->set_path("/grpc.testing.EchoTestService/Echo"); - //->set_prefix("/dgrpc.testing.EchoTestService"); - new_route_config.mutable_virtual_hosts(0) - ->mutable_routes(0) - ->mutable_route() - ->set_cluster(kNewClusterName); + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + route->mutable_match()->set_path("/grpc.testing.EchoTestService/Echo"); + route->mutable_route()->set_cluster(kNewClusterName); Listener listener = balancers_[0]->ads_service()->BuildListener(new_route_config); balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName); @@ -1989,14 +1983,9 @@ TEST_P(LdsTest, XdsRoutingPrefixMatching) { // Change RDS resource to point to new cluster. RouteConfiguration new_route_config = balancers_[0]->ads_service()->default_route_config(); - new_route_config.mutable_virtual_hosts(0) - ->mutable_routes(0) - ->mutable_match() - ->set_prefix("/grpc.testing.EchoTestService"); - new_route_config.mutable_virtual_hosts(0) - ->mutable_routes(0) - ->mutable_route() - ->set_cluster(kNewClusterName); + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + route->mutable_match()->set_prefix("/grpc.testing.EchoTestService"); + route->mutable_route()->set_cluster(kNewClusterName); Listener listener = balancers_[0]->ads_service()->BuildListener(new_route_config); balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName);