Merge remote-tracking branch 'origin/master' into typos

# Conflicts:
#	test/core/end2end/connection_refused_test.cc
pull/37541/head
Nathan Baulch 2 months ago
commit 7cbfc121ed
No known key found for this signature in database
  1. 4
      CMakeLists.txt
  2. 1
      build_autogenerated.yaml
  3. 22
      src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc
  4. 2
      src/core/ext/transport/chaotic_good/server/chaotic_good_server.h
  5. 5
      src/core/ext/transport/chaotic_good/server_transport.cc
  6. 2
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  7. 8
      src/core/lib/gprpp/time.h
  8. 8
      src/core/lib/promise/party.cc
  9. 2
      src/core/lib/resource_quota/periodic_update.cc
  10. 17
      src/core/server/xds_server_config_fetcher.cc
  11. 29
      src/core/xds/xds_client/xds_client.cc
  12. 12
      src/objective-c/BoringSSL-GRPC.podspec
  13. 2
      src/objective-c/tests/Podfile
  14. 12
      templates/src/objective-c/BoringSSL-GRPC.podspec.template
  15. 3
      test/core/end2end/connection_refused_test.cc
  16. 5
      test/core/test_util/BUILD
  17. 65
      test/core/test_util/socket_use_after_close_detector.cc
  18. 2
      test/cpp/end2end/client_lb_end2end_test.cc
  19. 42
      test/cpp/end2end/grpclb_end2end_test.cc
  20. 13
      test/cpp/end2end/port_sharing_end2end_test.cc
  21. 9
      test/cpp/end2end/rls_end2end_test.cc
  22. 1
      test/cpp/end2end/xds/BUILD
  23. 9
      test/cpp/end2end/xds/xds_core_end2end_test.cc
  24. 506
      test/cpp/end2end/xds/xds_end2end_test.cc
  25. 15
      test/cpp/end2end/xds/xds_end2end_test_lib.cc
  26. 4
      test/cpp/end2end/xds/xds_end2end_test_lib.h
  27. 2
      tools/interop_matrix/client_matrix.py

4
CMakeLists.txt generated

@ -1591,7 +1591,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_fallback_end2end_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_fault_injection_end2end_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -35482,7 +35482,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(xds_fault_injection_end2end_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc

@ -21871,7 +21871,6 @@ targets:
platforms:
- linux
- posix
- mac
- name: xds_gcp_authn_end2end_test
gtest: true
build: test

@ -187,11 +187,7 @@ void ChaoticGoodServerListener::ActiveConnection::NewConnectionID() {
connection_id_, std::make_shared<InterActivityLatch<PromiseEndpoint>>());
}
void ChaoticGoodServerListener::ActiveConnection::Done(
absl::optional<absl::string_view> error) {
if (error.has_value()) {
LOG(ERROR) << "ActiveConnection::Done:" << this << " " << *error;
}
void ChaoticGoodServerListener::ActiveConnection::Done() {
// Can easily be holding various locks here: bounce through EE to ensure no
// deadlocks.
listener_->event_engine_->Run([self = Ref()]() {
@ -387,13 +383,15 @@ auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
void ChaoticGoodServerListener::ActiveConnection::HandshakingState::
OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result) {
if (!result.ok()) {
connection_->Done(
absl::StrCat("Handshake failed: ", result.status().ToString()));
LOG_EVERY_N_SEC(ERROR, 5) << "Handshake failed: ", result.status();
connection_->Done();
return;
}
CHECK_NE(*result, nullptr);
if ((*result)->endpoint == nullptr) {
connection_->Done("Server handshake done but has empty endpoint.");
LOG_EVERY_N_SEC(ERROR, 5)
<< "Server handshake done but has empty endpoint.";
connection_->Done();
return;
}
CHECK(grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
@ -429,12 +427,10 @@ void ChaoticGoodServerListener::ActiveConnection::HandshakingState::
EventEngineWakeupScheduler(connection_->listener_->event_engine_),
[self = Ref()](absl::Status status) {
if (!status.ok()) {
self->connection_->Done(
absl::StrCat("Server setting frame handling failed: ",
StatusToString(status)));
} else {
self->connection_->Done();
GRPC_TRACE_LOG(chaotic_good, ERROR)
<< "Server setting frame handling failed: " << status;
}
self->connection_->Done();
},
connection_->arena_.get());
MutexLock lock(&connection_->mu_);

@ -111,7 +111,7 @@ class ChaoticGoodServerListener final : public Server::ListenerInterface {
};
private:
void Done(absl::optional<absl::string_view> error = absl::nullopt);
void Done();
void NewConnectionID();
RefCountedPtr<Arena> arena_ = SimpleArenaAllocator()->MakeArena();
const RefCountedPtr<ChaoticGoodServerListener> listener_;

@ -237,10 +237,11 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
call_initiator.emplace(std::move(call.initiator));
auto add_result = NewStream(frame_header.stream_id, *call_initiator);
if (add_result.ok()) {
call_destination_->StartCall(std::move(call.handler));
call_initiator->SpawnGuarded(
"server-write", [this, stream_id = frame_header.stream_id,
call_initiator = *call_initiator]() {
call_initiator = *call_initiator,
call_handler = std::move(call.handler)]() mutable {
call_destination_->StartCall(std::move(call_handler));
return CallOutboundLoop(stream_id, call_initiator);
});
} else {

@ -236,7 +236,7 @@ msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
iov_size++) {
MutableSlice& slice = internal::SliceCast<MutableSlice>(
buf_.MutableSliceAt(out_offset_.slice_idx));
iov[iov_size].iov_base = slice.begin();
iov[iov_size].iov_base = slice.begin() + out_offset_.byte_idx;
iov[iov_size].iov_len = slice.length() - out_offset_.byte_idx;
*sending_length += iov[iov_size].iov_len;
++(out_offset_.slice_idx);

@ -296,6 +296,14 @@ class Duration {
int64_t millis_;
};
inline std::ostream& operator<<(std::ostream& out, const Duration& d) {
return out << d.ToString();
}
inline std::ostream& operator<<(std::ostream& out, const Timestamp& d) {
return out << d.ToString();
}
inline Duration operator+(Duration lhs, Duration rhs) {
return Duration::Milliseconds(
time_detail::MillisAdd(lhs.millis(), rhs.millis()));

@ -41,18 +41,18 @@ namespace grpc_core {
// PartySyncUsingAtomics
GRPC_MUST_USE_RESULT bool Party::RefIfNonZero() {
auto count = state_.load(std::memory_order_relaxed);
auto state = state_.load(std::memory_order_relaxed);
do {
// If zero, we are done (without an increment). If not, we must do a CAS
// to maintain the contract: do not increment the counter if it is already
// zero
if (count == 0) {
if ((state & kRefMask) == 0) {
return false;
}
} while (!state_.compare_exchange_weak(count, count + kOneRef,
} while (!state_.compare_exchange_weak(state, state + kOneRef,
std::memory_order_acq_rel,
std::memory_order_relaxed));
LogStateChange("RefIfNonZero", count, count + kOneRef);
LogStateChange("RefIfNonZero", state, state + kOneRef);
return true;
}

@ -68,8 +68,8 @@ bool PeriodicUpdate::MaybeEndPeriod(absl::FunctionRef<void(Duration)> f) {
expected_updates_per_period_ =
period_.seconds() * expected_updates_per_period_ / time_so_far.seconds();
if (expected_updates_per_period_ < 1) expected_updates_per_period_ = 1;
period_start_ = now;
f(time_so_far);
period_start_ = Timestamp::Now();
updates_remaining_.store(expected_updates_per_period_,
std::memory_order_release);
return true;

@ -677,8 +677,16 @@ void XdsServerConfigFetcher::ListenerWatcher::
// It should get cleaned up eventually. Ignore this update.
return;
}
bool first_good_update = filter_chain_match_manager_ == nullptr;
// Promote the pending FilterChainMatchManager
filter_chain_match_manager_ = std::move(pending_filter_chain_match_manager_);
// TODO(yashykt): Right now, the server_config_watcher_ does not invoke
// XdsServerConfigFetcher while holding a lock, but that might change in the
// future in which case we would want to execute this update outside the
// critical region through a WorkSerializer similar to XdsClient.
server_config_watcher_->UpdateConnectionManager(filter_chain_match_manager_);
// Let the logger know about the update if there was no previous good update.
if (filter_chain_match_manager_ == nullptr) {
if (first_good_update) {
if (serving_status_notifier_.on_serving_status_update != nullptr) {
serving_status_notifier_.on_serving_status_update(
serving_status_notifier_.user_data, listening_address_.c_str(),
@ -688,13 +696,6 @@ void XdsServerConfigFetcher::ListenerWatcher::
<< listening_address_;
}
}
// Promote the pending FilterChainMatchManager
filter_chain_match_manager_ = std::move(pending_filter_chain_match_manager_);
// TODO(yashykt): Right now, the server_config_watcher_ does not invoke
// XdsServerConfigFetcher while holding a lock, but that might change in the
// future in which case we would want to execute this update outside the
// critical region through a WorkSerializer similar to XdsClient.
server_config_watcher_->UpdateConnectionManager(filter_chain_match_manager_);
}
//

@ -210,6 +210,7 @@ class XdsClient::XdsChannel::AdsCall final
if (timer_handle_.has_value() &&
ads_call_->xds_client()->engine()->Cancel(*timer_handle_)) {
timer_handle_.reset();
ads_call_.reset();
}
}
@ -250,24 +251,28 @@ class XdsClient::XdsChannel::AdsCall final
}
void OnTimer() {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << ads_call_->xds_client() << "] xds server "
<< ads_call_->xds_channel()->server_.server_uri()
<< ": timeout obtaining resource {type=" << type_->type_url()
<< " name="
<< XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)
<< "} from xds server";
{
MutexLock lock(&ads_call_->xds_client()->mu_);
timer_handle_.reset();
resource_seen_ = true;
auto& authority_state =
ads_call_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
ads_call_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
state.watchers, ReadDelayHandle::NoWait());
// We might have received the resource after the timer fired but before
// the callback ran.
if (state.resource == nullptr) {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << ads_call_->xds_client() << "] xds server "
<< ads_call_->xds_channel()->server_.server_uri()
<< ": timeout obtaining resource {type=" << type_->type_url()
<< " name="
<< XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)
<< "} from xds server";
resource_seen_ = true;
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
ads_call_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
state.watchers, ReadDelayHandle::NoWait());
}
}
ads_call_->xds_client()->work_serializer_.DrainQueue();
ads_call_.reset();

@ -132,7 +132,7 @@ Pod::Spec.new do |s|
ss.source_files = 'src/ssl/*.{h,c,cc}',
'src/ssl/**/*.{h,c,cc}',
'src/crypto/*.{h,c,cc}',
'src/crypto/**/*.{h,c,cc}',
'src/crypto/**/*.{h,c,cc,inc}',
# We have to include fiat because spake25519 depends on it
'src/third_party/fiat/*.{h,c,cc}',
# Include the err_data.c pre-generated in boringssl's master-with-bazel branch
@ -143,11 +143,7 @@ Pod::Spec.new do |s|
'src/crypto/*.h',
'src/crypto/**/*.h',
'src/third_party/fiat/*.h'
# bcm.c includes other source files, creating duplicated symbols. Since it is not used, we
# explicitly exclude it from the pod.
# TODO (mxyan): Work with BoringSSL team to remove this hack.
ss.exclude_files = 'src/crypto/fipsmodule/bcm.c',
'src/**/*_test.*',
ss.exclude_files = 'src/**/*_test.*',
'src/**/test_*.*',
'src/**/test/*.*'
@ -713,10 +709,10 @@ Pod::Spec.new do |s|
EOF
# We are renaming openssl to openssl_grpc so that there is no conflict with openssl if it exists
find . -type f \\( -path '*.h' -or -path '*.cc' -or -path '*.c' \\) -print0 | xargs -0 -L1 sed -E -i'.grpc_back' 's;#include <openssl/;#include <openssl_grpc/;g'
find . -type f \\( -path '*.h' -or -path '*.cc' -or -path '*.c' -or -path '*.inc' \\) -print0 | xargs -0 -L1 sed -E -i'.grpc_back' 's;#include <openssl/;#include <openssl_grpc/;g'
# Include of boringssl_prefix_symbols.h does not follow Xcode import style. We add the package
# name here so that Xcode knows where to find it.
find . -type f \\( -path '*.h' -or -path '*.cc' -or -path '*.c' \\) -print0 | xargs -0 -L1 sed -E -i'.grpc_back' 's;#include <boringssl_prefix_symbols.h>;#include <openssl_grpc/boringssl_prefix_symbols.h>;g'
find . -type f \\( -path '*.h' -or -path '*.cc' -or -path '*.c' -or -path '*.inc' \\) -print0 | xargs -0 -L1 sed -E -i'.grpc_back' 's;#include <boringssl_prefix_symbols.h>;#include <openssl_grpc/boringssl_prefix_symbols.h>;g'
END_OF_COMMAND
end

@ -22,7 +22,7 @@ def grpc_deps
end
target 'TvTests' do
platform :tvos, '10.0'
platform :tvos, '12.0'
grpc_deps
end

@ -163,7 +163,7 @@
ss.source_files = 'src/ssl/*.{h,c,cc}',
'src/ssl/**/*.{h,c,cc}',
'src/crypto/*.{h,c,cc}',
'src/crypto/**/*.{h,c,cc}',
'src/crypto/**/*.{h,c,cc,inc}',
# We have to include fiat because spake25519 depends on it
'src/third_party/fiat/*.{h,c,cc}',
# Include the err_data.c pre-generated in boringssl's master-with-bazel branch
@ -174,11 +174,7 @@
'src/crypto/*.h',
'src/crypto/**/*.h',
'src/third_party/fiat/*.h'
# bcm.c includes other source files, creating duplicated symbols. Since it is not used, we
# explicitly exclude it from the pod.
# TODO (mxyan): Work with BoringSSL team to remove this hack.
ss.exclude_files = 'src/crypto/fipsmodule/bcm.c',
'src/**/*_test.*',
ss.exclude_files = 'src/**/*_test.*',
'src/**/test_*.*',
'src/**/test/*.*'
@ -221,10 +217,10 @@
EOF
# We are renaming openssl to openssl_grpc so that there is no conflict with openssl if it exists
find . -type f \\( -path '*.h' -or -path '*.cc' -or -path '*.c' \\) -print0 | xargs -0 -L1 sed -E -i'.grpc_back' 's;#include <openssl/;#include <openssl_grpc/;g'
find . -type f \\( -path '*.h' -or -path '*.cc' -or -path '*.c' -or -path '*.inc' \\) -print0 | xargs -0 -L1 sed -E -i'.grpc_back' 's;#include <openssl/;#include <openssl_grpc/;g'
# Include of boringssl_prefix_symbols.h does not follow Xcode import style. We add the package
# name here so that Xcode knows where to find it.
find . -type f \\( -path '*.h' -or -path '*.cc' -or -path '*.c' \\) -print0 | xargs -0 -L1 sed -E -i'.grpc_back' 's;#include <boringssl_prefix_symbols.h>;#include <openssl_grpc/boringssl_prefix_symbols.h>;g'
find . -type f \\( -path '*.h' -or -path '*.cc' -or -path '*.c' -or -path '*.inc' \\) -print0 | xargs -0 -L1 sed -E -i'.grpc_back' 's;#include <boringssl_prefix_symbols.h>;#include <openssl_grpc/boringssl_prefix_symbols.h>;g'
END_OF_COMMAND
end

@ -86,7 +86,8 @@ static void run_test(bool wait_for_ready, bool use_service_config) {
chan = grpc_channel_create(addr.c_str(), creds, args);
grpc_channel_credentials_release(creds);
grpc_slice host = grpc_slice_from_static_string("nonexistent");
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(2);
gpr_timespec deadline =
grpc_timeout_seconds_to_deadline(wait_for_ready ? 2 : 600);
call =
grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
grpc_slice_from_static_string("/service/method"),

@ -395,7 +395,10 @@ grpc_cc_library(
testonly = True,
srcs = ["socket_use_after_close_detector.cc"],
hdrs = ["socket_use_after_close_detector.h"],
external_deps = ["gtest"],
external_deps = [
"absl/log:log",
"gtest",
],
language = "C++",
deps = [
"grpc_test_util",

@ -33,6 +33,7 @@
#include <thread>
#include <vector>
#include "absl/log/log.h"
#include "gtest/gtest.h"
#include <grpc/support/sync.h>
@ -40,71 +41,17 @@
#include "src/core/lib/iomgr/sockaddr.h"
#include "test/core/test_util/port.h"
// TODO(unknown): pull in different headers when enabling this
// test on windows. Also set BAD_SOCKET_RETURN_VAL
// to INVALID_SOCKET on windows.
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_windows.h"
#define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
#else
#define BAD_SOCKET_RETURN_VAL (-1)
#endif
namespace {
#ifdef GPR_WINDOWS
void OpenAndCloseSocketsStressLoop(int port, gpr_event* done_ev) {
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port);
((char*)&addr.sin6_addr)[15] = 1;
for (;;) {
if (gpr_event_get(done_ev)) {
return;
}
std::vector<int> sockets;
for (size_t i = 0; i < 50; i++) {
SOCKET s = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
WSA_FLAG_OVERLAPPED);
ASSERT_TRUE(s != BAD_SOCKET_RETURN_VAL)
<< "Failed to create TCP ipv6 socket";
char val = 1;
ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
SOCKET_ERROR)
<< "Failed to set socketopt reuseaddr. WSA error: " +
std::to_string(WSAGetLastError());
ASSERT_TRUE(grpc_tcp_set_non_block(s) == absl::OkStatus())
<< "Failed to set socket non-blocking";
ASSERT_TRUE(bind(s, (const sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR)
<< "Failed to bind socket " + std::to_string(s) +
" to [::1]:" + std::to_string(port) +
". WSA error: " + std::to_string(WSAGetLastError());
ASSERT_TRUE(listen(s, 1) != SOCKET_ERROR)
<< "Failed to listen on socket " + std::to_string(s) +
". WSA error: " + std::to_string(WSAGetLastError());
sockets.push_back(s);
}
// Do a non-blocking accept followed by a close on all of those sockets.
// Do this in a separate loop to try to induce a time window to hit races.
for (size_t i = 0; i < sockets.size(); i++) {
ASSERT_TRUE(accept(sockets[i], nullptr, nullptr) == INVALID_SOCKET)
<< "Accept on phony socket unexpectedly accepted actual connection.";
ASSERT_TRUE(WSAGetLastError() == WSAEWOULDBLOCK)
<< "OpenAndCloseSocketsStressLoop accept on socket " +
std::to_string(sockets[i]) +
" failed in "
"an unexpected way. "
"WSA error: " +
std::to_string(WSAGetLastError()) +
". Socket use-after-close bugs are likely.";
ASSERT_TRUE(closesocket(sockets[i]) != SOCKET_ERROR)
<< "Failed to close socket: " + std::to_string(sockets[i]) +
". WSA error: " + std::to_string(WSAGetLastError());
}
}
// TODO(apolcyn): re-enable this on windows if we can debug the failure.
// Previously, this was causing test flakes for a while b/c bind calls
// would fail with WSAEACCESS. Not clear if we were just making windows
// unhappy.
LOG(INFO) << "OpenAndCloseSocketsStressLoop is a no-op for windows";
return;
}
#else

@ -3277,7 +3277,7 @@ TEST_F(WeightedRoundRobinTest, CallAndServerMetric) {
// all of its subchannels every time it saw an update, thus causing the
// WRR policy to re-enter the blackout period for that address.
TEST_F(WeightedRoundRobinTest, WithOutlierDetection) {
const int kBlackoutPeriodSeconds = 5;
const int kBlackoutPeriodSeconds = 10;
const int kNumServers = 3;
StartServers(kNumServers);
// Report server metrics that should give 6:4:3 WRR picks.

@ -952,12 +952,19 @@ TEST_F(GrpclbEnd2endTest, UsePickFirstChildPolicy) {
"}");
SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
CheckRpcSendOk(kNumRpcs, 3000 /* timeout_ms */, true /* wait_for_ready */);
// Check that all requests went to the first backend. This verifies
// that we used pick_first instead of round_robin as the child policy.
EXPECT_EQ(backends_[0]->service().request_count(), kNumRpcs);
for (size_t i = 1; i < backends_.size(); ++i) {
EXPECT_EQ(backends_[i]->service().request_count(), 0UL);
// Check that all requests went to one backend. This verifies that we
// used pick_first instead of round_robin as the child policy.
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->service().request_count() > 0) {
LOG(INFO) << "backend " << i << " saw traffic";
EXPECT_EQ(backends_[i]->service().request_count(), kNumRpcs)
<< "backend " << i;
EXPECT_FALSE(found) << "multiple backends saw traffic";
found = true;
}
}
EXPECT_TRUE(found) << "no backends saw traffic";
// The balancer got a single request.
EXPECT_EQ(1U, balancer_->service().request_count());
// and sent a single response.
@ -982,21 +989,24 @@ TEST_F(GrpclbEnd2endTest, SwapChildPolicy) {
"}");
SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
CheckRpcSendOk(kNumRpcs, 3000 /* timeout_ms */, true /* wait_for_ready */);
// Check that all requests went to the first backend. This verifies
// that we used pick_first instead of round_robin as the child policy.
EXPECT_EQ(backends_[0]->service().request_count(), kNumRpcs);
for (size_t i = 1; i < backends_.size(); ++i) {
EXPECT_EQ(backends_[i]->service().request_count(), 0UL);
// Check that all requests went to one backend. This verifies that we
// used pick_first instead of round_robin as the child policy.
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->service().request_count() > 0) {
LOG(INFO) << "backend " << i << " saw traffic";
EXPECT_EQ(backends_[i]->service().request_count(), kNumRpcs)
<< "backend " << i;
EXPECT_FALSE(found) << "multiple backends saw traffic";
found = true;
}
}
EXPECT_TRUE(found) << "no backends saw traffic";
// Send new resolution that removes child policy from service config.
SetNextResolutionDefaultBalancer();
// We should now be using round_robin, which will send traffic to all
// backends.
WaitForAllBackends();
CheckRpcSendOk(kNumRpcs, 3000 /* timeout_ms */, true /* wait_for_ready */);
// Check that every backend saw the same number of requests. This verifies
// that we used round_robin.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(backends_[i]->service().request_count(), 2UL);
}
// The balancer got a single request.
EXPECT_EQ(1U, balancer_->service().request_count());
// and sent a single response.

@ -38,6 +38,7 @@
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset.h"
@ -46,6 +47,7 @@
#include "src/core/lib/security/credentials/credentials.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/test_util/port.h"
#include "test/core/test_util/resolve_localhost_ip46.h"
#include "test/core/test_util/test_config.h"
#include "test/core/test_util/test_tcp_server.h"
#include "test/cpp/end2end/test_service_impl.h"
@ -96,9 +98,11 @@ class TestTcpServer {
: shutdown_(false),
queue_data_(false),
port_(grpc_pick_unused_port_or_die()) {
std::ostringstream server_address;
server_address << "localhost:" << port_;
address_ = server_address.str();
grpc_init(); // needed by LocalIpAndPort()
// This test does not do well with multiple connection attempts at the same
// time to the same tcp server, so use the local IP address instead of
// "localhost" which can result in two connections (ipv4 and ipv6).
address_ = grpc_core::LocalIpAndPort(port_);
test_tcp_server_init(&tcp_server_, &TestTcpServer::OnConnect, this);
GRPC_CLOSURE_INIT(&on_fd_released_, &TestTcpServer::OnFdReleased, this,
grpc_schedule_on_exec_ctx);
@ -108,6 +112,7 @@ class TestTcpServer {
running_thread_.join();
test_tcp_server_destroy(&tcp_server_);
grpc_recycle_unused_port(port_);
grpc_shutdown();
}
// Read some data before handing off the connection.
@ -168,7 +173,7 @@ class TestTcpServer {
grpc_tcp_destroy_and_release_fd(tcp, &fd_, &on_fd_released_);
}
void OnFdReleased(grpc_error_handle err) {
void OnFdReleased(const absl::Status& err) {
EXPECT_EQ(absl::OkStatus(), err);
experimental::ExternalConnectionAcceptor::NewConnectionParameters p;
p.listener_fd = listener_fd_;

@ -234,7 +234,7 @@ class RlsEnd2endTest : public ::testing::Test {
}
struct RpcOptions {
int timeout_ms = 2000;
int timeout_ms = 5000;
bool wait_for_ready = false;
std::vector<std::pair<std::string, std::string>> metadata;
@ -922,14 +922,15 @@ TEST_F(RlsEnd2endTest, RlsRequestTimeout) {
.set_default_target(grpc_core::LocalIpUri(backends_[1]->port_))
.set_lookup_service_timeout(grpc_core::Duration::Seconds(2))
.Build());
// RLS server will send a response, but it's longer than the timeout.
// RLS server will send a response, but it takes longer than the
// timeout set in the LB policy config.
rls_server_->service_.SetResponse(
BuildRlsRequest({{kTestKey, kTestValue}}),
BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}),
/*response_delay=*/grpc_core::Duration::Seconds(3));
// The data plane RPC should be sent to the default target.
CheckRpcSendOk(DEBUG_LOCATION, RpcOptions().set_timeout_ms(4000).set_metadata(
{{"key1", kTestValue}}));
CheckRpcSendOk(DEBUG_LOCATION,
RpcOptions().set_metadata({{"key1", kTestValue}}));
EXPECT_EQ(rls_server_->service_.request_count(), 1);
EXPECT_EQ(backends_[0]->service_.request_count(), 0);
EXPECT_EQ(backends_[1]->service_.request_count(), 1);

@ -281,6 +281,7 @@ grpc_cc_test(
linkstatic = True, # Fixes dyld error on MacOS
shard_count = 5,
tags = [
"no_mac",
"no_test_ios",
"no_windows",
"xds_end2end_test",

@ -1060,7 +1060,7 @@ TEST_P(XdsFederationTest, FederationServer) {
"xdstp://xds.example.com/envoy.config.listener.v3.Listener"
"client/%s?client_listener_resource_name_template_not_in_use");
InitClient(builder);
CreateAndStartBackends(2, /*xds_enabled=*/true);
CreateBackends(2, /*xds_enabled=*/true);
// Eds for new authority balancer.
EdsResourceArgs args =
EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
@ -1099,6 +1099,13 @@ TEST_P(XdsFederationTest, FederationServer) {
new_server_route_config,
ServerHcmAccessor());
}
// Start backends and wait for them to start serving.
StartAllBackends();
for (const auto& backend : backends_) {
ASSERT_TRUE(backend->notifier()->WaitOnServingStatusChange(
grpc_core::LocalIpAndPort(backend->port()), grpc::StatusCode::OK));
}
// Make sure everything works.
WaitForAllBackends(DEBUG_LOCATION);
}

File diff suppressed because it is too large Load Diff

@ -843,11 +843,26 @@ std::string XdsEnd2endTest::MakeConnectionFailureRegex(
"(Connection refused"
"|Connection reset by peer"
"|Socket closed"
"|Broken pipe"
"|FD shutdown)"
// errno value
"( \\([0-9]+\\))?");
}
std::string XdsEnd2endTest::MakeTlsHandshakeFailureRegex(
absl::string_view prefix) {
return absl::StrCat(
prefix,
"(UNKNOWN|UNAVAILABLE): "
// IP address
"(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
// Prefixes added for context
"(Failed to connect to remote host: )?"
// Tls handshake failure
"Tls handshake failed \\(TSI_PROTOCOL_FAILURE\\): SSL_ERROR_SSL: "
"error:1000007d:SSL routines:OPENSSL_internal:CERTIFICATE_VERIFY_FAILED");
}
grpc_core::PemKeyCertPairList XdsEnd2endTest::ReadTlsIdentityPair(
const char* key_path, const char* cert_path) {
return grpc_core::PemKeyCertPairList{grpc_core::PemKeyCertPair(

@ -966,6 +966,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType>,
// message for a connection failure.
static std::string MakeConnectionFailureRegex(absl::string_view prefix);
// Returns a regex that can be matched against an RPC failure status
// message for a Tls handshake failure.
static std::string MakeTlsHandshakeFailureRegex(absl::string_view prefix);
// Returns a private key pair, read from local files.
static grpc_core::PemKeyCertPairList ReadTlsIdentityPair(
const char* key_path, const char* cert_path);

@ -305,7 +305,7 @@ LANG_RELEASE_MATRIX = {
("v1.63.3", ReleaseInfo()),
("v1.64.1", ReleaseInfo()),
("v1.65.0", ReleaseInfo()),
("v1.66.0", ReleaseInfo()),
("v1.66.2", ReleaseInfo()),
]
),
"java": OrderedDict(

Loading…
Cancel
Save