diff --git a/src/core/ext/filters/client_channel/xds/xds_api.cc b/src/core/ext/filters/client_channel/xds/xds_api.cc index d2d10b634c4..93319313ef3 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.cc +++ b/src/core/ext/filters/client_channel/xds/xds_api.cc @@ -273,42 +273,6 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap::Node* node, arena); } -envoy_api_v2_DiscoveryRequest* CreateDiscoveryRequest( - upb_arena* arena, const char* type_url, const std::string& version, - const std::string& nonce, grpc_error* error) { - // Create a request. - envoy_api_v2_DiscoveryRequest* request = - envoy_api_v2_DiscoveryRequest_new(arena); - // Set type_url. - envoy_api_v2_DiscoveryRequest_set_type_url(request, - upb_strview_makez(type_url)); - // Set version_info. - if (!version.empty()) { - envoy_api_v2_DiscoveryRequest_set_version_info( - request, upb_strview_makez(version.c_str())); - } - // Set nonce. - if (!nonce.empty()) { - envoy_api_v2_DiscoveryRequest_set_response_nonce( - request, upb_strview_makez(nonce.c_str())); - } - // Set error_detail if it's a NACK. - if (error != GRPC_ERROR_NONE) { - grpc_slice error_description_slice; - GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, - &error_description_slice)); - upb_strview error_description_strview = - upb_strview_make(reinterpret_cast( - GPR_SLICE_START_PTR(error_description_slice)), - GPR_SLICE_LENGTH(error_description_slice)); - google_rpc_Status* error_detail = - envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, arena); - google_rpc_Status_set_message(error_detail, error_description_strview); - GRPC_ERROR_UNREF(error); - } - return request; -} - inline absl::string_view UpbStringToAbsl(const upb_strview& str) { return absl::string_view(str.data, str.size); } @@ -479,92 +443,43 @@ grpc_slice SerializeDiscoveryRequest(upb_arena* arena, } // namespace -grpc_slice XdsApi::CreateUnsupportedTypeNackRequest(const std::string& type_url, - const std::string& nonce, - grpc_error* error) { - upb::Arena arena; - envoy_api_v2_DiscoveryRequest* request = CreateDiscoveryRequest( - arena.ptr(), type_url.c_str(), /*version=*/"", nonce, error); - MaybeLogDiscoveryRequest(client_, tracer_, request); - return SerializeDiscoveryRequest(arena.ptr(), request); -} - -grpc_slice XdsApi::CreateLdsRequest(const std::string& server_name, - const std::string& version, - const std::string& nonce, grpc_error* error, - bool populate_node) { - upb::Arena arena; - envoy_api_v2_DiscoveryRequest* request = - CreateDiscoveryRequest(arena.ptr(), kLdsTypeUrl, version, nonce, error); - // Populate node. - if (populate_node) { - envoy_api_v2_core_Node* node_msg = - envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_, "", - node_msg); - } - // Add resource_name. - envoy_api_v2_DiscoveryRequest_add_resource_names( - request, upb_strview_make(server_name.data(), server_name.size()), - arena.ptr()); - MaybeLogDiscoveryRequest(client_, tracer_, request); - return SerializeDiscoveryRequest(arena.ptr(), request); -} - -grpc_slice XdsApi::CreateRdsRequest(const std::string& route_config_name, - const std::string& version, - const std::string& nonce, grpc_error* error, - bool populate_node) { - upb::Arena arena; - envoy_api_v2_DiscoveryRequest* request = - CreateDiscoveryRequest(arena.ptr(), kRdsTypeUrl, version, nonce, error); - // Populate node. - if (populate_node) { - envoy_api_v2_core_Node* node_msg = - envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_, "", - node_msg); - } - // Add resource_name. - envoy_api_v2_DiscoveryRequest_add_resource_names( - request, - upb_strview_make(route_config_name.data(), route_config_name.size()), - arena.ptr()); - MaybeLogDiscoveryRequest(client_, tracer_, request); - return SerializeDiscoveryRequest(arena.ptr(), request); -} - -grpc_slice XdsApi::CreateCdsRequest( - const std::set& cluster_names, +grpc_slice XdsApi::CreateAdsRequest( + const std::string& type_url, + const std::set& resource_names, const std::string& version, const std::string& nonce, grpc_error* error, bool populate_node) { upb::Arena arena; + // Create a request. envoy_api_v2_DiscoveryRequest* request = - CreateDiscoveryRequest(arena.ptr(), kCdsTypeUrl, version, nonce, error); - // Populate node. - if (populate_node) { - envoy_api_v2_core_Node* node_msg = - envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_, "", - node_msg); + envoy_api_v2_DiscoveryRequest_new(arena.ptr()); + // Set type_url. + envoy_api_v2_DiscoveryRequest_set_type_url( + request, upb_strview_make(type_url.data(), type_url.size())); + // Set version_info. + if (!version.empty()) { + envoy_api_v2_DiscoveryRequest_set_version_info( + request, upb_strview_make(version.data(), version.size())); } - // Add resource_names. - for (const auto& cluster_name : cluster_names) { - envoy_api_v2_DiscoveryRequest_add_resource_names( - request, upb_strview_make(cluster_name.data(), cluster_name.size()), - arena.ptr()); + // Set nonce. + if (!nonce.empty()) { + envoy_api_v2_DiscoveryRequest_set_response_nonce( + request, upb_strview_make(nonce.data(), nonce.size())); + } + // Set error_detail if it's a NACK. + if (error != GRPC_ERROR_NONE) { + grpc_slice error_description_slice; + GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, + &error_description_slice)); + upb_strview error_description_strview = + upb_strview_make(reinterpret_cast( + GPR_SLICE_START_PTR(error_description_slice)), + GPR_SLICE_LENGTH(error_description_slice)); + google_rpc_Status* error_detail = + envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, + arena.ptr()); + google_rpc_Status_set_message(error_detail, error_description_strview); + GRPC_ERROR_UNREF(error); } - MaybeLogDiscoveryRequest(client_, tracer_, request); - return SerializeDiscoveryRequest(arena.ptr(), request); -} - -grpc_slice XdsApi::CreateEdsRequest( - const std::set& eds_service_names, - const std::string& version, const std::string& nonce, grpc_error* error, - bool populate_node) { - upb::Arena arena; - envoy_api_v2_DiscoveryRequest* request = - CreateDiscoveryRequest(arena.ptr(), kEdsTypeUrl, version, nonce, error); // Populate node. if (populate_node) { envoy_api_v2_core_Node* node_msg = @@ -573,10 +488,9 @@ grpc_slice XdsApi::CreateEdsRequest( node_msg); } // Add resource_names. - for (const auto& eds_service_name : eds_service_names) { + for (const auto& resource_name : resource_names) { envoy_api_v2_DiscoveryRequest_add_resource_names( - request, - upb_strview_make(eds_service_name.data(), eds_service_name.size()), + request, upb_strview_make(resource_name.data(), resource_name.size()), arena.ptr()); } MaybeLogDiscoveryRequest(client_, tracer_, request); @@ -1288,13 +1202,13 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer, return GRPC_ERROR_NONE; } -grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer, - const envoy_api_v2_DiscoveryResponse* response, - const std::string& expected_server_name, - const std::string& expected_route_config_name, - const bool xds_routing_enabled, - absl::optional* rds_update, - upb_arena* arena) { +grpc_error* RdsResponseParse( + XdsClient* client, TraceFlag* tracer, + const envoy_api_v2_DiscoveryResponse* response, + const std::string& expected_server_name, + const std::set& expected_route_configuration_names, + const bool xds_routing_enabled, + absl::optional* rds_update, upb_arena* arena) { // Get the resources from the response. size_t size; const google_protobuf_Any* const* resources = @@ -1315,10 +1229,14 @@ grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer, return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode route_config."); } // Check route_config_name. Ignore unexpected route_config. - const upb_strview name = envoy_api_v2_RouteConfiguration_name(route_config); - const upb_strview expected_name = - upb_strview_makez(expected_route_config_name.c_str()); - if (!upb_strview_eql(name, expected_name)) continue; + const upb_strview route_config_name = + envoy_api_v2_RouteConfiguration_name(route_config); + absl::string_view route_config_name_strview(route_config_name.data, + route_config_name.size); + if (expected_route_configuration_names.find(route_config_name_strview) == + expected_route_configuration_names.end()) { + continue; + } // Parse the route_config. XdsApi::RdsUpdate local_rds_update; grpc_error* error = @@ -1603,7 +1521,7 @@ grpc_error* EdsResponseParse( grpc_error* XdsApi::ParseAdsResponse( const grpc_slice& encoded_response, const std::string& expected_server_name, - const std::string& expected_route_config_name, + const std::set& expected_route_configuration_names, const std::set& expected_cluster_names, const std::set& expected_eds_service_names, absl::optional* lds_update, @@ -1638,8 +1556,8 @@ grpc_error* XdsApi::ParseAdsResponse( xds_routing_enabled_, lds_update, arena.ptr()); } else if (*type_url == kRdsTypeUrl) { return RdsResponseParse(client_, tracer_, response, expected_server_name, - expected_route_config_name, xds_routing_enabled_, - rds_update, arena.ptr()); + expected_route_configuration_names, + xds_routing_enabled_, rds_update, arena.ptr()); } else if (*type_url == kCdsTypeUrl) { return CdsResponseParse(client_, tracer_, response, expected_cluster_names, cds_update_map, arena.ptr()); diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index 4fa7b2bed31..8971faf79ab 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -230,47 +230,21 @@ class XdsApi { XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap::Node* node); - // Creates a request to nack an unsupported resource type. + // Creates an ADS request. // Takes ownership of \a error. - grpc_slice CreateUnsupportedTypeNackRequest(const std::string& type_url, - const std::string& nonce, - grpc_error* error); - - // Creates an LDS request querying \a server_name. - // Takes ownership of \a error. - grpc_slice CreateLdsRequest(const std::string& server_name, - const std::string& version, - const std::string& nonce, grpc_error* error, - bool populate_node); - - // Creates an RDS request querying \a route_config_name. - // Takes ownership of \a error. - grpc_slice CreateRdsRequest(const std::string& route_config_name, + grpc_slice CreateAdsRequest(const std::string& type_url, + const std::set& resource_names, const std::string& version, const std::string& nonce, grpc_error* error, bool populate_node); - // Creates a CDS request querying \a cluster_names. - // Takes ownership of \a error. - grpc_slice CreateCdsRequest(const std::set& cluster_names, - const std::string& version, - const std::string& nonce, grpc_error* error, - bool populate_node); - - // Creates an EDS request querying \a eds_service_names. - // Takes ownership of \a error. - grpc_slice CreateEdsRequest( - const std::set& eds_service_names, - const std::string& version, const std::string& nonce, grpc_error* error, - bool populate_node); - // Parses the ADS response and outputs the validated update for either CDS or // EDS. If the response can't be parsed at the top level, \a type_url will // point to an empty string; otherwise, it will point to the received data. grpc_error* ParseAdsResponse( const grpc_slice& encoded_response, const std::string& expected_server_name, - const std::string& expected_route_config_name, + const std::set& expected_route_configuration_names, const std::set& expected_cluster_names, const std::set& expected_eds_service_names, absl::optional* lds_update, diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 7a407da627f..db48084297f 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -255,8 +255,8 @@ class XdsClient::ChannelState::AdsCallState bool IsCurrentCallOnChannel() const; - std::set ClusterNamesForRequest(); - std::set EdsServiceNamesForRequest(); + std::set ResourceNamesForRequest( + const std::string& type_url); // The owning RetryableCall<>. RefCountedPtr> parent_; @@ -804,33 +804,13 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( } auto& state = state_map_[type_url]; grpc_slice request_payload_slice; - std::set resource_names; - if (type_url == XdsApi::kLdsTypeUrl) { - resource_names.insert(xds_client()->server_name_); - request_payload_slice = xds_client()->api_.CreateLdsRequest( - xds_client()->server_name_, state.version, state.nonce, - GRPC_ERROR_REF(state.error), !sent_initial_message_); - state.subscribed_resources[xds_client()->server_name_]->Start(Ref()); - } else if (type_url == XdsApi::kRdsTypeUrl) { - resource_names.insert(xds_client()->lds_result_->route_config_name); - request_payload_slice = xds_client()->api_.CreateRdsRequest( - xds_client()->lds_result_->route_config_name, state.version, - state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_); - state.subscribed_resources[xds_client()->lds_result_->route_config_name] - ->Start(Ref()); - } else if (type_url == XdsApi::kCdsTypeUrl) { - resource_names = ClusterNamesForRequest(); - request_payload_slice = xds_client()->api_.CreateCdsRequest( - resource_names, state.version, state.nonce, GRPC_ERROR_REF(state.error), - !sent_initial_message_); - } else if (type_url == XdsApi::kEdsTypeUrl) { - resource_names = EdsServiceNamesForRequest(); - request_payload_slice = xds_client()->api_.CreateEdsRequest( - resource_names, state.version, state.nonce, GRPC_ERROR_REF(state.error), - !sent_initial_message_); - } else { - request_payload_slice = xds_client()->api_.CreateUnsupportedTypeNackRequest( - type_url, state.nonce, GRPC_ERROR_REF(state.error)); + std::set resource_names = + ResourceNamesForRequest(type_url); + request_payload_slice = xds_client()->api_.CreateAdsRequest( + type_url, resource_names, state.version, state.nonce, + GRPC_ERROR_REF(state.error), !sent_initial_message_); + if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl && + type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) { state_map_.erase(type_url); } sent_initial_message_ = true; @@ -1242,12 +1222,10 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { // Note that ParseAdsResponse() also validates the response. grpc_error* parse_error = xds_client()->api_.ParseAdsResponse( response_slice, xds_client()->server_name_, - (xds_client()->lds_result_.has_value() - ? xds_client()->lds_result_->route_config_name - : ""), - ClusterNamesForRequest(), EdsServiceNamesForRequest(), &lds_update, - &rds_update, &cds_update_map, &eds_update_map, &version, &nonce, - &type_url); + ResourceNamesForRequest(XdsApi::kRdsTypeUrl), + ResourceNamesForRequest(XdsApi::kCdsTypeUrl), + ResourceNamesForRequest(XdsApi::kEdsTypeUrl), &lds_update, &rds_update, + &cds_update_map, &eds_update_map, &version, &nonce, &type_url); grpc_slice_unref_internal(response_slice); if (type_url.empty()) { // Ignore unparsable response. @@ -1351,25 +1329,18 @@ bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const { } std::set -XdsClient::ChannelState::AdsCallState::ClusterNamesForRequest() { - std::set cluster_names; - for (auto& p : state_map_[XdsApi::kCdsTypeUrl].subscribed_resources) { - cluster_names.insert(p.first); - OrphanablePtr& state = p.second; - state->Start(Ref()); +XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( + const std::string& type_url) { + std::set resource_names; + auto it = state_map_.find(type_url); + if (it != state_map_.end()) { + for (auto& p : it->second.subscribed_resources) { + resource_names.insert(p.first); + OrphanablePtr& state = p.second; + state->Start(Ref()); + } } - return cluster_names; -} - -std::set -XdsClient::ChannelState::AdsCallState::EdsServiceNamesForRequest() { - std::set eds_names; - for (auto& p : state_map_[XdsApi::kEdsTypeUrl].subscribed_resources) { - eds_names.insert(p.first); - OrphanablePtr& state = p.second; - state->Start(Ref()); - } - return eds_names; + return resource_names; } // diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 271ddae52f3..a163c49c24a 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -1916,30 +1916,6 @@ TEST_P(XdsResolverOnlyTest, ChangeClusters) { EXPECT_EQ(0, std::get<1>(counts)); } -// Tests that we go into TRANSIENT_FAILURE if the Listener is removed. -TEST_P(XdsResolverOnlyTest, ListenerRemoved) { - SetNextResolution({}); - SetNextResolutionForLbChannelAllBalancers(); - AdsServiceImpl::EdsResourceArgs args({ - {"locality0", GetBackendPorts()}, - }); - balancers_[0]->ads_service()->SetEdsResource( - AdsServiceImpl::BuildEdsResource(args)); - // We need to wait for all backends to come online. - WaitForAllBackends(); - // Unset LDS resource. - balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl, - kDefaultResourceName); - // Wait for RPCs to start failing. - do { - } while (SendRpc(RpcOptions(), nullptr).ok()); - // Make sure RPCs are still failing. - CheckRpcSendFailure(1000); - // Make sure we ACK'ed the update. - EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state().state, - AdsServiceImpl::ResponseState::ACKED); -} - // Tests that we go into TRANSIENT_FAILURE if the Cluster disappears. TEST_P(XdsResolverOnlyTest, ClusterRemoved) { SetNextResolution({}); @@ -2294,6 +2270,30 @@ TEST_P(LdsRdsTest, Vanilla) { AdsServiceImpl::ResponseState::ACKED); } +// Tests that we go into TRANSIENT_FAILURE if the Listener is removed. +TEST_P(LdsRdsTest, ListenerRemoved) { + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args)); + // We need to wait for all backends to come online. + WaitForAllBackends(); + // Unset LDS resource. + balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl, + kDefaultResourceName); + // Wait for RPCs to start failing. + do { + } while (SendRpc(RpcOptions(), nullptr).ok()); + // Make sure RPCs are still failing. + CheckRpcSendFailure(1000); + // Make sure we ACK'ed the update. + EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state().state, + AdsServiceImpl::ResponseState::ACKED); +} + // Tests that LDS client should send a NACK if matching domain can't be found in // the LDS response. TEST_P(LdsRdsTest, NoMatchedDomain) {