Added grpc arg to enable xds routing and restore old tests.

reviewable/pr22280/r10
Donna Dionne 5 years ago
parent c4d4541af5
commit 8a8ca5436b
  1. 2
      include/grpc/impl/codegen/grpc_types.h
  2. 33
      src/core/ext/filters/client_channel/xds/xds_api.cc
  3. 1
      src/core/ext/filters/client_channel/xds/xds_api.h
  4. 8
      src/core/ext/filters/client_channel/xds/xds_client.cc
  5. 2
      src/core/ext/filters/client_channel/xds/xds_client.h
  6. 7
      test/cpp/end2end/test_multiple_service_impl.h
  7. 29
      test/cpp/end2end/xds_end2end_test.cc

@ -358,6 +358,8 @@ typedef struct {
* The default is 15 seconds. */
#define GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS \
"grpc.xds_resource_does_not_exist_timeout_ms"
/* if set, enable xds routing policy */
#define GRPC_ARG_XDS_ROUTING_ENABLED "grpc.xds_routing_enabled"
/** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
"grpc.workaround.cronet_compression"

@ -952,7 +952,8 @@ MatchType DomainPatternMatchType(const std::string& domain_pattern) {
grpc_error* RouteConfigParse(
XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_RouteConfiguration* route_config,
const std::string& expected_server_name, XdsApi::RdsUpdate* rds_update) {
const std::string& expected_server_name, const bool xds_routing_enabled,
XdsApi::RdsUpdate* rds_update) {
MaybeLogRouteConfiguration(client, tracer, route_config);
// Get the virtual hosts.
size_t size;
@ -1012,7 +1013,12 @@ grpc_error* RouteConfigParse(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No route found in the virtual host.");
}
for (size_t i = 0; i < size; ++i) {
// If xds_routing is not configured, only look at the last one in the route
// list (the default route)
size_t start_index = size - 1;
if (xds_routing_enabled) start_index = 0;
for (size_t i = start_index; i < size; ++i) {
const envoy_api_v2_route_Route* route = routes[i];
const envoy_api_v2_route_RouteMatch* match =
envoy_api_v2_route_Route_match(route);
@ -1066,10 +1072,6 @@ grpc_error* RouteConfigParse(
envoy_api_v2_route_RouteAction_cluster(route_action);
rds_route.cluster_name = std::string(action.data, action.size);
rds_update->routes.emplace_back(std::move(rds_route));
gpr_log(GPR_INFO, "RouteConfigParse a route %s %s %s",
rds_update->routes[i].service.c_str(),
rds_update->routes[i].method.c_str(),
rds_update->routes[i].cluster_name.c_str());
}
return GRPC_ERROR_NONE;
}
@ -1077,6 +1079,7 @@ grpc_error* RouteConfigParse(
grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
const bool xds_routing_enabled,
absl::optional<XdsApi::LdsUpdate>* lds_update,
upb_arena* arena) {
// Get the resources from the response.
@ -1122,8 +1125,9 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
envoy_config_filter_network_http_connection_manager_v2_HttpConnectionManager_route_config(
http_connection_manager);
XdsApi::RdsUpdate rds_update;
grpc_error* error = RouteConfigParse(client, tracer, route_config,
expected_server_name, &rds_update);
grpc_error* error =
RouteConfigParse(client, tracer, route_config, expected_server_name,
xds_routing_enabled, &rds_update);
if (error != GRPC_ERROR_NONE) return error;
lds_update->emplace();
(*lds_update)->rds_update.emplace(std::move(rds_update));
@ -1154,6 +1158,7 @@ grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
const std::string& expected_route_config_name,
const bool xds_routing_enabled,
absl::optional<XdsApi::RdsUpdate>* rds_update,
upb_arena* arena) {
// Get the resources from the response.
@ -1182,8 +1187,9 @@ grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
if (!upb_strview_eql(name, expected_name)) continue;
// Parse the route_config.
XdsApi::RdsUpdate local_rds_update;
grpc_error* error = RouteConfigParse(
client, tracer, route_config, expected_server_name, &local_rds_update);
grpc_error* error =
RouteConfigParse(client, tracer, route_config, expected_server_name,
xds_routing_enabled, &local_rds_update);
if (error != GRPC_ERROR_NONE) return error;
rds_update->emplace(std::move(local_rds_update));
return GRPC_ERROR_NONE;
@ -1463,6 +1469,7 @@ grpc_error* EdsResponseParse(
grpc_error* XdsApi::ParseAdsResponse(
const grpc_slice& encoded_response, const std::string& expected_server_name,
const std::string& expected_route_config_name,
const bool xds_routing_enabled,
const std::set<StringView>& expected_cluster_names,
const std::set<StringView>& expected_eds_service_names,
absl::optional<LdsUpdate>* lds_update,
@ -1494,11 +1501,11 @@ grpc_error* XdsApi::ParseAdsResponse(
// Parse the response according to the resource type.
if (*type_url == kLdsTypeUrl) {
return LdsResponseParse(client_, tracer_, response, expected_server_name,
lds_update, arena.ptr());
xds_routing_enabled, lds_update, arena.ptr());
} else if (*type_url == kRdsTypeUrl) {
return RdsResponseParse(client_, tracer_, response, expected_server_name,
expected_route_config_name, rds_update,
arena.ptr());
expected_route_config_name, xds_routing_enabled,
rds_update, arena.ptr());
} else if (*type_url == kCdsTypeUrl) {
return CdsResponseParse(client_, tracer_, response, expected_cluster_names,
cds_update_map, arena.ptr());

@ -257,6 +257,7 @@ class XdsApi {
const grpc_slice& encoded_response,
const std::string& expected_server_name,
const std::string& expected_route_config_name,
const bool xds_routing_enabled,
const std::set<StringView>& expected_cluster_names,
const std::set<StringView>& expected_eds_service_names,
absl::optional<LdsUpdate>* lds_update,

@ -1245,7 +1245,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
(xds_client->lds_result_.has_value()
? xds_client->lds_result_->route_config_name
: ""),
ads_calld->ClusterNamesForRequest(),
xds_client->xds_routing_enabled_, ads_calld->ClusterNamesForRequest(),
ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update,
&cds_update_map, &eds_update_map, &version, &nonce, &type_url);
grpc_slice_unref_internal(response_slice);
@ -1820,6 +1820,11 @@ grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
{15000, 0, INT_MAX});
}
bool GetXdsRoutingEnabled(const grpc_channel_args& args) {
return grpc_channel_args_find_integer(&args, GRPC_ARG_XDS_ROUTING_ENABLED,
{0, 0, 1});
}
} // namespace
XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
@ -1828,6 +1833,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
const grpc_channel_args& channel_args, grpc_error** error)
: InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
request_timeout_(GetRequestTimeout(channel_args)),
xds_routing_enabled_(GetXdsRoutingEnabled(channel_args)),
combiner_(GRPC_COMBINER_REF(combiner, "xds_client")),
interested_parties_(interested_parties),
bootstrap_(

@ -241,6 +241,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
const grpc_millis request_timeout_;
const bool xds_routing_enabled_;
Combiner* combiner_;
grpc_pollset_set* interested_parties_;

@ -141,13 +141,6 @@ void ServerTryCancel(ServerContext* context) {
}
}
void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) {
EXPECT_FALSE(context->IsCancelled());
context->TryCancel();
gpr_log(GPR_INFO,
"Server called TryCancelNonblocking() to cancel the request");
}
} // namespace
class TestMultipleServiceSignaller {

@ -1187,7 +1187,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
void ResetStub(int fallback_timeout = 0, int failover_timeout = 0,
const grpc::string& expected_targets = "",
int xds_resource_does_not_exist_timeout = 0) {
int xds_resource_does_not_exist_timeout = 0,
int xds_routing_enabled = false) {
ChannelArguments args;
// TODO(juanlishen): Add setter to ChannelArguments.
if (fallback_timeout > 0) {
@ -1200,6 +1201,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
args.SetInt(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
xds_resource_does_not_exist_timeout);
}
if (xds_routing_enabled) {
args.SetInt(GRPC_ARG_XDS_ROUTING_ENABLED, 1);
}
// If the parent channel is using the fake resolver, we inject the
// response generator for the parent here, and then SetNextResolution()
// will inject the xds channel's response generator via the parent's
@ -2178,11 +2182,13 @@ TEST_P(LdsTest, ChooseMatchedDomain) {
AdsServiceImpl::ACKED);
}
// Tests that the LDS client should NACK when the last route is not a default
// route.
TEST_P(LdsTest, DefaultRouteInvalid) {
// Tests that LDS client should choose the last route in the virtual host if
// multiple routes exist in the LDS response.
TEST_P(LdsTest, ChooseLastRoute) {
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
*(route_config.mutable_virtual_hosts(0)->add_routes()) =
route_config.virtual_hosts(0).routes(0);
route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
@ -2193,7 +2199,7 @@ TEST_P(LdsTest, DefaultRouteInvalid) {
SetNextResolutionForLbChannelAllBalancers();
(void)SendRpc();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
AdsServiceImpl::ACKED);
}
// Tests that LDS client should send a NACK if route match has non-empty prefix
@ -2259,6 +2265,7 @@ TEST_P(LdsTest, Timeout) {
// Tests that LDS client should choose the default route (with no matching
// specified) after unable to find a match with previous routes.
TEST_P(LdsTest, XdsRoutingPathMatching) {
ResetStub(0, 0, "", 0, 1);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewCluster2Name = "new_cluster_2";
const size_t kNumRpcs = 10;
@ -2331,6 +2338,7 @@ TEST_P(LdsTest, XdsRoutingPathMatching) {
}
TEST_P(LdsTest, XdsRoutingPrefixMatching) {
ResetStub(0, 0, "", 0, 1);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewCluster2Name = "new_cluster_2";
const size_t kNumRpcs = 10;
@ -2393,6 +2401,7 @@ TEST_P(LdsTest, XdsRoutingPrefixMatching) {
// Tests that LDS client should choose the default route (with no matching
// specified) after unable to find a match with previous routes.
TEST_P(LdsTest, XdsRoutingDefaultRoute) {
ResetStub(0, 0, "", 0, 1);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewCluster2Name = "new_cluster_2";
const size_t kNumRpcs = 10;
@ -2507,12 +2516,14 @@ TEST_P(RdsTest, ChooseMatchedDomain) {
AdsServiceImpl::ACKED);
}
// Tests that the RDS client should NACK when the last route is not a default
// route.
TEST_P(RdsTest, DefaultRouteInvalid) {
// Tests that RDS client should choose the last route in the virtual host if
// multiple routes exist in the RDS response.
TEST_P(RdsTest, ChooseLastRoute) {
balancers_[0]->ads_service()->SetLdsToUseDynamicRds();
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
*(route_config.mutable_virtual_hosts(0)->add_routes()) =
route_config.virtual_hosts(0).routes(0);
route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
@ -2523,7 +2534,7 @@ TEST_P(RdsTest, DefaultRouteInvalid) {
SetNextResolutionForLbChannelAllBalancers();
(void)SendRpc();
EXPECT_EQ(balancers_[0]->ads_service()->rds_response_state(),
AdsServiceImpl::NACKED);
AdsServiceImpl::ACKED);
}
// Tests that RDS client should send a NACK if route match has non-empty prefix

Loading…
Cancel
Save