Fix google c2p resolver shutdown during metadata server queries (#28519)

* Fix c2p resolver shutdown during metadata server queries

* handle lame channels in XDS client
pull/28803/head
apolcyn 3 years ago committed by GitHub
parent 9ffd1a7b0a
commit 64082940a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      CMakeLists.txt
  2. 11
      build_autogenerated.yaml
  3. 84
      src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc
  4. 19
      src/core/ext/xds/xds_client.cc
  5. 3
      src/core/ext/xds/xds_client.h
  6. 18
      test/core/client_channel/resolvers/BUILD
  7. 114
      test/core/client_channel/resolvers/google_c2p_resolver_test.cc
  8. 15
      test/core/util/fake_udp_and_tcp_server.cc
  9. 24
      tools/run_tests/generated/tests.json

37
CMakeLists.txt generated

@ -867,6 +867,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx global_config_env_test)
endif()
add_dependencies(buildtests_cxx global_config_test)
add_dependencies(buildtests_cxx google_c2p_resolver_test)
add_dependencies(buildtests_cxx google_mesh_ca_certificate_provider_factory_test)
add_dependencies(buildtests_cxx grpc_authorization_engine_test)
add_dependencies(buildtests_cxx grpc_authorization_policy_provider_test)
@ -10894,6 +10895,42 @@ target_link_libraries(global_config_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(google_c2p_resolver_test
test/core/client_channel/resolvers/google_c2p_resolver_test.cc
test/core/util/fake_udp_and_tcp_server.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(google_c2p_resolver_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(google_c2p_resolver_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
)
endif()
if(gRPC_BUILD_TESTS)

@ -6060,6 +6060,17 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: google_c2p_resolver_test
gtest: true
build: test
language: c++
headers:
- test/core/util/fake_udp_and_tcp_server.h
src:
- test/core/client_channel/resolvers/google_c2p_resolver_test.cc
- test/core/util/fake_udp_and_tcp_server.cc
deps:
- grpc++_test_util
- name: google_mesh_ca_certificate_provider_factory_test
gtest: true
build: test

@ -53,9 +53,6 @@ class GoogleCloud2ProdResolver : public Resolver {
private:
static void OnHttpRequestDone(void* arg, grpc_error_handle error);
// Calls OnDone() if not already called. Releases a ref.
void MaybeCallOnDone(grpc_error_handle error);
// If error is not GRPC_ERROR_NONE, then it's not safe to look at response.
virtual void OnDone(GoogleCloud2ProdResolver* resolver,
const grpc_http_response* response,
@ -65,7 +62,6 @@ class GoogleCloud2ProdResolver : public Resolver {
OrphanablePtr<HttpRequest> http_request_;
grpc_http_response response_;
grpc_closure on_done_;
std::atomic<bool> on_done_called_{false};
};
// A metadata server query to get the zone.
@ -101,6 +97,8 @@ class GoogleCloud2ProdResolver : public Resolver {
grpc_polling_entity pollent_;
bool using_dns_ = false;
OrphanablePtr<Resolver> child_resolver_;
std::string metadata_server_name_ = "metadata.google.internal.";
bool shutdown_ = false;
OrphanablePtr<ZoneQuery> zone_query_;
absl::optional<std::string> zone_;
@ -126,7 +124,7 @@ GoogleCloud2ProdResolver::MetadataQuery::MetadataQuery(
const_cast<char*>("Google")};
request.hdr_count = 1;
request.hdrs = &header;
auto uri = URI::Create("http", "metadata.google.internal.", path,
auto uri = URI::Create("http", resolver_->metadata_server_name_, path,
{} /* query params */, "" /* fragment */);
GPR_ASSERT(uri.ok()); // params are hardcoded
grpc_arg resource_quota_arg = grpc_channel_arg_pointer_create(
@ -147,34 +145,20 @@ GoogleCloud2ProdResolver::MetadataQuery::~MetadataQuery() {
}
void GoogleCloud2ProdResolver::MetadataQuery::Orphan() {
// TODO(roth): Once the HTTP client library supports cancellation,
// use that here.
MaybeCallOnDone(GRPC_ERROR_CANCELLED);
http_request_.reset();
Unref();
}
void GoogleCloud2ProdResolver::MetadataQuery::OnHttpRequestDone(
void* arg, grpc_error_handle error) {
auto* self = static_cast<MetadataQuery*>(arg);
self->MaybeCallOnDone(GRPC_ERROR_REF(error));
}
void GoogleCloud2ProdResolver::MetadataQuery::MaybeCallOnDone(
grpc_error_handle error) {
bool expected = false;
if (!on_done_called_.compare_exchange_strong(expected, true,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
// We've already called OnDone(), so just clean up.
GRPC_ERROR_UNREF(error);
Unref();
return;
}
// Hop back into WorkSerializer to call OnDone().
// Note: We implicitly pass our ref to the callback here.
resolver_->work_serializer_->Run(
[this, error]() {
OnDone(resolver_.get(), &response_, error);
Unref();
GRPC_ERROR_REF(error);
self->resolver_->work_serializer_->Run(
[self, error]() {
self->OnDone(self->resolver_.get(), &self->response_, error);
self->Unref();
},
DEBUG_LOCATION);
}
@ -192,22 +176,31 @@ GoogleCloud2ProdResolver::ZoneQuery::ZoneQuery(
void GoogleCloud2ProdResolver::ZoneQuery::OnDone(
GoogleCloud2ProdResolver* resolver, const grpc_http_response* response,
grpc_error_handle error) {
absl::StatusOr<std::string> zone;
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "error fetching zone from metadata server: %s",
grpc_error_std_string(error).c_str());
}
std::string zone;
if (error == GRPC_ERROR_NONE && response->status == 200) {
zone = absl::UnknownError(
absl::StrCat("error fetching zone from metadata server: ",
grpc_error_std_string(error)));
} else if (response->status != 200) {
zone = absl::UnknownError(absl::StrFormat(
"zone query received non-200 status: %d", response->status));
} else {
absl::string_view body(response->body, response->body_length);
size_t i = body.find_last_of('/');
if (i == body.npos) {
gpr_log(GPR_ERROR, "could not parse zone from metadata server: %s",
std::string(body).c_str());
zone = absl::UnknownError(
absl::StrCat("could not parse zone from metadata server: ", body));
} else {
zone = std::string(body.substr(i + 1));
}
}
resolver->ZoneQueryDone(std::move(zone));
if (!zone.ok()) {
gpr_log(GPR_ERROR, "zone query failed: %s",
zone.status().ToString().c_str());
resolver->ZoneQueryDone("");
} else {
resolver->ZoneQueryDone(std::move(*zone));
}
GRPC_ERROR_UNREF(error);
}
@ -244,7 +237,12 @@ GoogleCloud2ProdResolver::GoogleCloud2ProdResolver(ResolverArgs args)
absl::string_view name_to_resolve = absl::StripPrefix(args.uri.path(), "/");
// If we're not running on GCP, we can't use DirectPath, so delegate
// to the DNS resolver.
if (!grpc_alts_is_running_on_gcp() ||
bool test_only_pretend_running_on_gcp = grpc_channel_args_find_bool(
args.args, "grpc.testing.google_c2p_resolver_pretend_running_on_gcp",
false);
bool running_on_gcp =
test_only_pretend_running_on_gcp || grpc_alts_is_running_on_gcp();
if (!running_on_gcp ||
// If the client is already using xDS, we can't use it here, because
// they may be talking to a completely different xDS server than we
// want to.
@ -258,6 +256,15 @@ GoogleCloud2ProdResolver::GoogleCloud2ProdResolver(ResolverArgs args)
GPR_ASSERT(child_resolver_ != nullptr);
return;
}
// Maybe override metadata server name for testing
const char* test_only_metadata_server_override =
grpc_channel_args_find_string(
args.args,
"grpc.testing.google_c2p_resolver_metadata_server_override");
if (test_only_metadata_server_override != nullptr &&
strlen(test_only_metadata_server_override) > 0) {
metadata_server_name_ = std::string(test_only_metadata_server_override);
}
// Create xds resolver.
child_resolver_ = ResolverRegistry::CreateResolver(
absl::StrCat("xds:", name_to_resolve).c_str(), args.args,
@ -288,6 +295,7 @@ void GoogleCloud2ProdResolver::ResetBackoffLocked() {
}
void GoogleCloud2ProdResolver::ShutdownLocked() {
shutdown_ = true;
zone_query_.reset();
ipv6_query_.reset();
child_resolver_.reset();
@ -306,6 +314,9 @@ void GoogleCloud2ProdResolver::IPv6QueryDone(bool ipv6_supported) {
}
void GoogleCloud2ProdResolver::StartXdsResolver() {
if (shutdown_) {
return;
}
// Construct bootstrap JSON.
std::random_device rd;
std::mt19937 mt(rd());
@ -371,7 +382,8 @@ class GoogleCloud2ProdResolverFactory : public ResolverFactory {
return MakeOrphanable<GoogleCloud2ProdResolver>(std::move(args));
}
// TODO(roth): Remove experimental suffix once this code is proven stable.
// TODO(roth): Remove experimental suffix once this code is proven stable,
// and update the scheme in google_c2p_resolver_test.cc when doing so.
const char* scheme() const override { return "google-c2p-experimental"; }
};

@ -58,6 +58,7 @@
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/uri/uri_parser.h"
#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
@ -586,7 +587,22 @@ void XdsClient::ChannelState::StopLrsCallLocked() {
lrs_calld_.reset();
}
namespace {
bool IsLameChannel(grpc_channel* channel) {
grpc_channel_element* elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
return elem->filter == &grpc_lame_filter;
}
} // namespace
void XdsClient::ChannelState::StartConnectivityWatchLocked() {
if (IsLameChannel(channel_)) {
xds_client()->NotifyOnErrorLocked(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds client has a lame channel"));
return;
}
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
GPR_ASSERT(client_channel != nullptr);
watcher_ = new StateWatcher(WeakRef(DEBUG_LOCATION, "ChannelState+watch"));
@ -596,6 +612,9 @@ void XdsClient::ChannelState::StartConnectivityWatchLocked() {
}
void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
if (IsLameChannel(channel_)) {
return;
}
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_);

@ -194,7 +194,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
bool HasAdsCall() const;
bool HasActiveAdsCall() const;
void StartConnectivityWatchLocked();
void StartConnectivityWatchLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void CancelConnectivityWatchLocked();
void SubscribeLocked(const XdsResourceType* type,

@ -104,3 +104,21 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "google_c2p_resolver_test",
srcs = ["google_c2p_resolver_test.cc"],
external_deps = [
"absl/strings:str_format",
"gtest",
],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//:grpc++",
"//test/core/util:fake_udp_and_tcp_server",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)

@ -0,0 +1,114 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <atomic>
#include <memory>
#include <mutex>
#include <random>
#include <sstream>
#include <thread>
#include <gmock/gmock.h>
#include "absl/strings/str_format.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gprpp/thd.h"
#include "test/core/util/fake_udp_and_tcp_server.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
namespace {
void TryConnectAndDestroy(const char* fake_metadata_server_address) {
grpc::ChannelArguments args;
std::string target = "google-c2p-experimental:///servername_not_used";
args.SetInt("grpc.testing.google_c2p_resolver_pretend_running_on_gcp", 1);
args.SetString("grpc.testing.google_c2p_resolver_metadata_server_override",
fake_metadata_server_address);
auto channel = ::grpc::CreateCustomChannel(
target, grpc::InsecureChannelCredentials(), args);
// Start connecting, and give some time for the google-c2p resolver to begin
// resolution and start trying to contact the metadata server.
channel->GetState(true /* try_to_connect */);
ASSERT_FALSE(
channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100)));
channel.reset();
};
// Exercise the machinery involved with shutting down the C2P resolver while
// it's waiting for its initial metadata server queries to finish.
TEST(DestroyGoogleC2pChannelWithActiveConnectStressTest,
LoopTryConnectAndDestroyWithHangingMetadataServer) {
// Create a fake metadata server which hangs.
grpc_core::testing::FakeUdpAndTcpServer fake_metadata_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
std::vector<std::unique_ptr<std::thread>> threads;
const int kNumThreads = 100;
threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; i++) {
threads.emplace_back(
new std::thread(TryConnectAndDestroy, fake_metadata_server.address()));
}
for (size_t i = 0; i < threads.size(); i++) {
threads[i]->join();
}
}
// Exercise the machinery involved with shutting down the C2P resolver while
// it's waiting for its initial metadata server queries to finish.
TEST(DestroyGoogleC2pChannelWithActiveConnectStressTest,
LoopTryConnectAndDestroyWithFastFailingMetadataServer) {
// Create a fake metadata server address which rejects connections
int port = grpc_pick_unused_port_or_die();
std::string address = absl::StrFormat("[::1]:%d", port);
std::vector<std::unique_ptr<std::thread>> threads;
const int kNumThreads = 100;
threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; i++) {
threads.emplace_back(
new std::thread(TryConnectAndDestroy, address.c_str()));
}
for (size_t i = 0; i < threads.size(); i++) {
threads[i]->join();
}
}
} // namespace
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
gpr_setenv("GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER", "true");
::testing::InitGoogleTest(&argc, argv);
grpc_init();
auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}

@ -21,6 +21,7 @@
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/sockaddr.h"
#include "test/core/util/port.h"
@ -63,7 +64,7 @@ FakeUdpAndTcpServer::FakeUdpAndTcpServer(
port_ = grpc_pick_unused_port_or_die();
udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
if (udp_socket_ == BAD_SOCKET_RETURN_VAL) {
gpr_log(GPR_DEBUG, "Failed to create UDP ipv6 socket: %d", ERRNO);
gpr_log(GPR_ERROR, "Failed to create UDP ipv6 socket: %d", ERRNO);
GPR_ASSERT(0);
}
accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
@ -76,7 +77,7 @@ FakeUdpAndTcpServer::FakeUdpAndTcpServer(
char val = 1;
if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
SOCKET_ERROR) {
gpr_log(GPR_DEBUG,
gpr_log(GPR_ERROR,
"Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d, "
"errno: %d",
port_, ERRNO);
@ -99,7 +100,7 @@ FakeUdpAndTcpServer::FakeUdpAndTcpServer(
int val = 1;
if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
0) {
gpr_log(GPR_DEBUG, "Failed to set SO_REUSEADDR on socket [::1]:%d", port_);
gpr_log(GPR_ERROR, "Failed to set SO_REUSEADDR on socket [::1]:%d", port_);
GPR_ASSERT(0);
}
if (fcntl(udp_socket_, F_SETFL, O_NONBLOCK) != 0) {
@ -116,9 +117,15 @@ FakeUdpAndTcpServer::FakeUdpAndTcpServer(
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port_);
(reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
grpc_resolved_address resolved_addr;
memcpy(resolved_addr.addr, &addr, sizeof(addr));
resolved_addr.len = sizeof(addr);
std::string addr_str = grpc_sockaddr_to_string(&resolved_addr, false);
gpr_log(GPR_INFO, "Fake UDP and TCP server listening on %s",
addr_str.c_str());
if (bind(udp_socket_, reinterpret_cast<const sockaddr*>(&addr),
sizeof(addr)) != 0) {
gpr_log(GPR_DEBUG, "Failed to bind UDP socket to [::1]:%d", port_);
gpr_log(GPR_ERROR, "Failed to bind UDP socket to [::1]:%d", port_);
GPR_ASSERT(0);
}
if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),

@ -4429,6 +4429,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "google_c2p_resolver_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save