Merge pull request #23117 from markdroth/xds_resource_removal_crash_fix

Fix use-after-free bug for ResourceState of unsubscribed RDS resource.
pull/23121/head
Mark D. Roth 5 years ago committed by GitHub
commit 3f0eb8ae77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 184
      src/core/ext/filters/client_channel/xds/xds_api.cc
  2. 34
      src/core/ext/filters/client_channel/xds/xds_api.h
  3. 77
      src/core/ext/filters/client_channel/xds/xds_client.cc
  4. 48
      test/cpp/end2end/xds_end2end_test.cc

@ -273,42 +273,6 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap::Node* node,
arena);
}
envoy_api_v2_DiscoveryRequest* CreateDiscoveryRequest(
upb_arena* arena, const char* type_url, const std::string& version,
const std::string& nonce, grpc_error* error) {
// Create a request.
envoy_api_v2_DiscoveryRequest* request =
envoy_api_v2_DiscoveryRequest_new(arena);
// Set type_url.
envoy_api_v2_DiscoveryRequest_set_type_url(request,
upb_strview_makez(type_url));
// Set version_info.
if (!version.empty()) {
envoy_api_v2_DiscoveryRequest_set_version_info(
request, upb_strview_makez(version.c_str()));
}
// Set nonce.
if (!nonce.empty()) {
envoy_api_v2_DiscoveryRequest_set_response_nonce(
request, upb_strview_makez(nonce.c_str()));
}
// Set error_detail if it's a NACK.
if (error != GRPC_ERROR_NONE) {
grpc_slice error_description_slice;
GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
&error_description_slice));
upb_strview error_description_strview =
upb_strview_make(reinterpret_cast<const char*>(
GPR_SLICE_START_PTR(error_description_slice)),
GPR_SLICE_LENGTH(error_description_slice));
google_rpc_Status* error_detail =
envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, arena);
google_rpc_Status_set_message(error_detail, error_description_strview);
GRPC_ERROR_UNREF(error);
}
return request;
}
inline absl::string_view UpbStringToAbsl(const upb_strview& str) {
return absl::string_view(str.data, str.size);
}
@ -479,92 +443,43 @@ grpc_slice SerializeDiscoveryRequest(upb_arena* arena,
} // namespace
grpc_slice XdsApi::CreateUnsupportedTypeNackRequest(const std::string& type_url,
const std::string& nonce,
grpc_error* error) {
upb::Arena arena;
envoy_api_v2_DiscoveryRequest* request = CreateDiscoveryRequest(
arena.ptr(), type_url.c_str(), /*version=*/"", nonce, error);
MaybeLogDiscoveryRequest(client_, tracer_, request);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
grpc_slice XdsApi::CreateLdsRequest(const std::string& server_name,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node) {
upb::Arena arena;
envoy_api_v2_DiscoveryRequest* request =
CreateDiscoveryRequest(arena.ptr(), kLdsTypeUrl, version, nonce, error);
// Populate node.
if (populate_node) {
envoy_api_v2_core_Node* node_msg =
envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_, "",
node_msg);
}
// Add resource_name.
envoy_api_v2_DiscoveryRequest_add_resource_names(
request, upb_strview_make(server_name.data(), server_name.size()),
arena.ptr());
MaybeLogDiscoveryRequest(client_, tracer_, request);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
grpc_slice XdsApi::CreateRdsRequest(const std::string& route_config_name,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node) {
upb::Arena arena;
envoy_api_v2_DiscoveryRequest* request =
CreateDiscoveryRequest(arena.ptr(), kRdsTypeUrl, version, nonce, error);
// Populate node.
if (populate_node) {
envoy_api_v2_core_Node* node_msg =
envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_, "",
node_msg);
}
// Add resource_name.
envoy_api_v2_DiscoveryRequest_add_resource_names(
request,
upb_strview_make(route_config_name.data(), route_config_name.size()),
arena.ptr());
MaybeLogDiscoveryRequest(client_, tracer_, request);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
grpc_slice XdsApi::CreateCdsRequest(
const std::set<absl::string_view>& cluster_names,
grpc_slice XdsApi::CreateAdsRequest(
const std::string& type_url,
const std::set<absl::string_view>& resource_names,
const std::string& version, const std::string& nonce, grpc_error* error,
bool populate_node) {
upb::Arena arena;
// Create a request.
envoy_api_v2_DiscoveryRequest* request =
CreateDiscoveryRequest(arena.ptr(), kCdsTypeUrl, version, nonce, error);
// Populate node.
if (populate_node) {
envoy_api_v2_core_Node* node_msg =
envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_, "",
node_msg);
envoy_api_v2_DiscoveryRequest_new(arena.ptr());
// Set type_url.
envoy_api_v2_DiscoveryRequest_set_type_url(
request, upb_strview_make(type_url.data(), type_url.size()));
// Set version_info.
if (!version.empty()) {
envoy_api_v2_DiscoveryRequest_set_version_info(
request, upb_strview_make(version.data(), version.size()));
}
// Add resource_names.
for (const auto& cluster_name : cluster_names) {
envoy_api_v2_DiscoveryRequest_add_resource_names(
request, upb_strview_make(cluster_name.data(), cluster_name.size()),
arena.ptr());
// Set nonce.
if (!nonce.empty()) {
envoy_api_v2_DiscoveryRequest_set_response_nonce(
request, upb_strview_make(nonce.data(), nonce.size()));
}
// Set error_detail if it's a NACK.
if (error != GRPC_ERROR_NONE) {
grpc_slice error_description_slice;
GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
&error_description_slice));
upb_strview error_description_strview =
upb_strview_make(reinterpret_cast<const char*>(
GPR_SLICE_START_PTR(error_description_slice)),
GPR_SLICE_LENGTH(error_description_slice));
google_rpc_Status* error_detail =
envoy_api_v2_DiscoveryRequest_mutable_error_detail(request,
arena.ptr());
google_rpc_Status_set_message(error_detail, error_description_strview);
GRPC_ERROR_UNREF(error);
}
MaybeLogDiscoveryRequest(client_, tracer_, request);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
grpc_slice XdsApi::CreateEdsRequest(
const std::set<absl::string_view>& eds_service_names,
const std::string& version, const std::string& nonce, grpc_error* error,
bool populate_node) {
upb::Arena arena;
envoy_api_v2_DiscoveryRequest* request =
CreateDiscoveryRequest(arena.ptr(), kEdsTypeUrl, version, nonce, error);
// Populate node.
if (populate_node) {
envoy_api_v2_core_Node* node_msg =
@ -573,10 +488,9 @@ grpc_slice XdsApi::CreateEdsRequest(
node_msg);
}
// Add resource_names.
for (const auto& eds_service_name : eds_service_names) {
for (const auto& resource_name : resource_names) {
envoy_api_v2_DiscoveryRequest_add_resource_names(
request,
upb_strview_make(eds_service_name.data(), eds_service_name.size()),
request, upb_strview_make(resource_name.data(), resource_name.size()),
arena.ptr());
}
MaybeLogDiscoveryRequest(client_, tracer_, request);
@ -1288,13 +1202,13 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
return GRPC_ERROR_NONE;
}
grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
const std::string& expected_route_config_name,
const bool xds_routing_enabled,
absl::optional<XdsApi::RdsUpdate>* rds_update,
upb_arena* arena) {
grpc_error* RdsResponseParse(
XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
const std::set<absl::string_view>& expected_route_configuration_names,
const bool xds_routing_enabled,
absl::optional<XdsApi::RdsUpdate>* rds_update, upb_arena* arena) {
// Get the resources from the response.
size_t size;
const google_protobuf_Any* const* resources =
@ -1315,10 +1229,14 @@ grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode route_config.");
}
// Check route_config_name. Ignore unexpected route_config.
const upb_strview name = envoy_api_v2_RouteConfiguration_name(route_config);
const upb_strview expected_name =
upb_strview_makez(expected_route_config_name.c_str());
if (!upb_strview_eql(name, expected_name)) continue;
const upb_strview route_config_name =
envoy_api_v2_RouteConfiguration_name(route_config);
absl::string_view route_config_name_strview(route_config_name.data,
route_config_name.size);
if (expected_route_configuration_names.find(route_config_name_strview) ==
expected_route_configuration_names.end()) {
continue;
}
// Parse the route_config.
XdsApi::RdsUpdate local_rds_update;
grpc_error* error =
@ -1603,7 +1521,7 @@ grpc_error* EdsResponseParse(
grpc_error* XdsApi::ParseAdsResponse(
const grpc_slice& encoded_response, const std::string& expected_server_name,
const std::string& expected_route_config_name,
const std::set<absl::string_view>& expected_route_configuration_names,
const std::set<absl::string_view>& expected_cluster_names,
const std::set<absl::string_view>& expected_eds_service_names,
absl::optional<LdsUpdate>* lds_update,
@ -1638,8 +1556,8 @@ grpc_error* XdsApi::ParseAdsResponse(
xds_routing_enabled_, lds_update, arena.ptr());
} else if (*type_url == kRdsTypeUrl) {
return RdsResponseParse(client_, tracer_, response, expected_server_name,
expected_route_config_name, xds_routing_enabled_,
rds_update, arena.ptr());
expected_route_configuration_names,
xds_routing_enabled_, rds_update, arena.ptr());
} else if (*type_url == kCdsTypeUrl) {
return CdsResponseParse(client_, tracer_, response, expected_cluster_names,
cds_update_map, arena.ptr());

@ -230,47 +230,21 @@ class XdsApi {
XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap::Node* node);
// Creates a request to nack an unsupported resource type.
// Creates an ADS request.
// Takes ownership of \a error.
grpc_slice CreateUnsupportedTypeNackRequest(const std::string& type_url,
const std::string& nonce,
grpc_error* error);
// Creates an LDS request querying \a server_name.
// Takes ownership of \a error.
grpc_slice CreateLdsRequest(const std::string& server_name,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node);
// Creates an RDS request querying \a route_config_name.
// Takes ownership of \a error.
grpc_slice CreateRdsRequest(const std::string& route_config_name,
grpc_slice CreateAdsRequest(const std::string& type_url,
const std::set<absl::string_view>& resource_names,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node);
// Creates a CDS request querying \a cluster_names.
// Takes ownership of \a error.
grpc_slice CreateCdsRequest(const std::set<absl::string_view>& cluster_names,
const std::string& version,
const std::string& nonce, grpc_error* error,
bool populate_node);
// Creates an EDS request querying \a eds_service_names.
// Takes ownership of \a error.
grpc_slice CreateEdsRequest(
const std::set<absl::string_view>& eds_service_names,
const std::string& version, const std::string& nonce, grpc_error* error,
bool populate_node);
// Parses the ADS response and outputs the validated update for either CDS or
// EDS. If the response can't be parsed at the top level, \a type_url will
// point to an empty string; otherwise, it will point to the received data.
grpc_error* ParseAdsResponse(
const grpc_slice& encoded_response,
const std::string& expected_server_name,
const std::string& expected_route_config_name,
const std::set<absl::string_view>& expected_route_configuration_names,
const std::set<absl::string_view>& expected_cluster_names,
const std::set<absl::string_view>& expected_eds_service_names,
absl::optional<LdsUpdate>* lds_update,

@ -255,8 +255,8 @@ class XdsClient::ChannelState::AdsCallState
bool IsCurrentCallOnChannel() const;
std::set<absl::string_view> ClusterNamesForRequest();
std::set<absl::string_view> EdsServiceNamesForRequest();
std::set<absl::string_view> ResourceNamesForRequest(
const std::string& type_url);
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<AdsCallState>> parent_;
@ -804,33 +804,13 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
}
auto& state = state_map_[type_url];
grpc_slice request_payload_slice;
std::set<absl::string_view> resource_names;
if (type_url == XdsApi::kLdsTypeUrl) {
resource_names.insert(xds_client()->server_name_);
request_payload_slice = xds_client()->api_.CreateLdsRequest(
xds_client()->server_name_, state.version, state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
state.subscribed_resources[xds_client()->server_name_]->Start(Ref());
} else if (type_url == XdsApi::kRdsTypeUrl) {
resource_names.insert(xds_client()->lds_result_->route_config_name);
request_payload_slice = xds_client()->api_.CreateRdsRequest(
xds_client()->lds_result_->route_config_name, state.version,
state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_);
state.subscribed_resources[xds_client()->lds_result_->route_config_name]
->Start(Ref());
} else if (type_url == XdsApi::kCdsTypeUrl) {
resource_names = ClusterNamesForRequest();
request_payload_slice = xds_client()->api_.CreateCdsRequest(
resource_names, state.version, state.nonce, GRPC_ERROR_REF(state.error),
!sent_initial_message_);
} else if (type_url == XdsApi::kEdsTypeUrl) {
resource_names = EdsServiceNamesForRequest();
request_payload_slice = xds_client()->api_.CreateEdsRequest(
resource_names, state.version, state.nonce, GRPC_ERROR_REF(state.error),
!sent_initial_message_);
} else {
request_payload_slice = xds_client()->api_.CreateUnsupportedTypeNackRequest(
type_url, state.nonce, GRPC_ERROR_REF(state.error));
std::set<absl::string_view> resource_names =
ResourceNamesForRequest(type_url);
request_payload_slice = xds_client()->api_.CreateAdsRequest(
type_url, resource_names, state.version, state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
state_map_.erase(type_url);
}
sent_initial_message_ = true;
@ -1242,12 +1222,10 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
// Note that ParseAdsResponse() also validates the response.
grpc_error* parse_error = xds_client()->api_.ParseAdsResponse(
response_slice, xds_client()->server_name_,
(xds_client()->lds_result_.has_value()
? xds_client()->lds_result_->route_config_name
: ""),
ClusterNamesForRequest(), EdsServiceNamesForRequest(), &lds_update,
&rds_update, &cds_update_map, &eds_update_map, &version, &nonce,
&type_url);
ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
ResourceNamesForRequest(XdsApi::kEdsTypeUrl), &lds_update, &rds_update,
&cds_update_map, &eds_update_map, &version, &nonce, &type_url);
grpc_slice_unref_internal(response_slice);
if (type_url.empty()) {
// Ignore unparsable response.
@ -1351,25 +1329,18 @@ bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
}
std::set<absl::string_view>
XdsClient::ChannelState::AdsCallState::ClusterNamesForRequest() {
std::set<absl::string_view> cluster_names;
for (auto& p : state_map_[XdsApi::kCdsTypeUrl].subscribed_resources) {
cluster_names.insert(p.first);
OrphanablePtr<ResourceState>& state = p.second;
state->Start(Ref());
XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
const std::string& type_url) {
std::set<absl::string_view> resource_names;
auto it = state_map_.find(type_url);
if (it != state_map_.end()) {
for (auto& p : it->second.subscribed_resources) {
resource_names.insert(p.first);
OrphanablePtr<ResourceState>& state = p.second;
state->Start(Ref());
}
}
return cluster_names;
}
std::set<absl::string_view>
XdsClient::ChannelState::AdsCallState::EdsServiceNamesForRequest() {
std::set<absl::string_view> eds_names;
for (auto& p : state_map_[XdsApi::kEdsTypeUrl].subscribed_resources) {
eds_names.insert(p.first);
OrphanablePtr<ResourceState>& state = p.second;
state->Start(Ref());
}
return eds_names;
return resource_names;
}
//

@ -1916,30 +1916,6 @@ TEST_P(XdsResolverOnlyTest, ChangeClusters) {
EXPECT_EQ(0, std::get<1>(counts));
}
// Tests that we go into TRANSIENT_FAILURE if the Listener is removed.
TEST_P(XdsResolverOnlyTest, ListenerRemoved) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts()},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
// We need to wait for all backends to come online.
WaitForAllBackends();
// Unset LDS resource.
balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl,
kDefaultResourceName);
// Wait for RPCs to start failing.
do {
} while (SendRpc(RpcOptions(), nullptr).ok());
// Make sure RPCs are still failing.
CheckRpcSendFailure(1000);
// Make sure we ACK'ed the update.
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state().state,
AdsServiceImpl::ResponseState::ACKED);
}
// Tests that we go into TRANSIENT_FAILURE if the Cluster disappears.
TEST_P(XdsResolverOnlyTest, ClusterRemoved) {
SetNextResolution({});
@ -2294,6 +2270,30 @@ TEST_P(LdsRdsTest, Vanilla) {
AdsServiceImpl::ResponseState::ACKED);
}
// Tests that we go into TRANSIENT_FAILURE if the Listener is removed.
TEST_P(LdsRdsTest, ListenerRemoved) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts()},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
// We need to wait for all backends to come online.
WaitForAllBackends();
// Unset LDS resource.
balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl,
kDefaultResourceName);
// Wait for RPCs to start failing.
do {
} while (SendRpc(RpcOptions(), nullptr).ok());
// Make sure RPCs are still failing.
CheckRpcSendFailure(1000);
// Make sure we ACK'ed the update.
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state().state,
AdsServiceImpl::ResponseState::ACKED);
}
// Tests that LDS client should send a NACK if matching domain can't be found in
// the LDS response.
TEST_P(LdsRdsTest, NoMatchedDomain) {

Loading…
Cancel
Save