[testing]: remove server-side check on number of concurrent RPCs in alts_concurrent_connectivity_test (#32585)

This check only works if all handshake RPCs have an OK status, and it's
racey e.g. if the client is cancelling handshake RPCs (being when an RPC
is cancelled, termination of the RPC at the client is asynchronous from
termination at the server, so the client can resume the queue before the
server RPC completes).
pull/32626/head
apolcyn 2 years ago committed by GitHub
parent 97ba987132
commit d47b569330
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 46
      test/core/tsi/alts/fake_handshaker/fake_handshaker_server.cc
  2. 5
      test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h
  3. 3
      test/core/tsi/alts/fake_handshaker/fake_handshaker_server_main.cc
  4. 25
      test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc

@ -58,15 +58,12 @@ namespace gcp {
// It is thread-safe. // It is thread-safe.
class FakeHandshakerService : public HandshakerService::Service { class FakeHandshakerService : public HandshakerService::Service {
public: public:
FakeHandshakerService(int expected_max_concurrent_rpcs, explicit FakeHandshakerService(const std::string& peer_identity)
const std::string& peer_identity) : peer_identity_(peer_identity) {}
: expected_max_concurrent_rpcs_(expected_max_concurrent_rpcs),
peer_identity_(peer_identity) {}
Status DoHandshake( Status DoHandshake(
ServerContext* /*server_context*/, ServerContext* /*server_context*/,
ServerReaderWriter<HandshakerResp, HandshakerReq>* stream) override { ServerReaderWriter<HandshakerResp, HandshakerReq>* stream) override {
ConcurrentRpcsCheck concurrent_rpcs_check(this);
Status status; Status status;
HandshakerContext context; HandshakerContext context;
HandshakerReq request; HandshakerReq request;
@ -249,46 +246,13 @@ class FakeHandshakerService : public HandshakerService::Service {
return result; return result;
} }
class ConcurrentRpcsCheck {
public:
explicit ConcurrentRpcsCheck(FakeHandshakerService* parent)
: parent_(parent) {
if (parent->expected_max_concurrent_rpcs_ > 0) {
grpc::internal::MutexLock lock(
&parent->expected_max_concurrent_rpcs_mu_);
if (++parent->concurrent_rpcs_ >
parent->expected_max_concurrent_rpcs_) {
grpc_core::Crash(
absl::StrFormat("FakeHandshakerService:%p concurrent_rpcs_:%d "
"expected_max_concurrent_rpcs:%d",
parent, parent->concurrent_rpcs_,
parent->expected_max_concurrent_rpcs_));
}
}
}
~ConcurrentRpcsCheck() {
if (parent_->expected_max_concurrent_rpcs_ > 0) {
grpc::internal::MutexLock lock(
&parent_->expected_max_concurrent_rpcs_mu_);
parent_->concurrent_rpcs_--;
}
}
private:
FakeHandshakerService* parent_;
};
grpc::internal::Mutex expected_max_concurrent_rpcs_mu_;
int concurrent_rpcs_ = 0;
const int expected_max_concurrent_rpcs_;
const std::string peer_identity_; const std::string peer_identity_;
}; };
std::unique_ptr<grpc::Service> CreateFakeHandshakerService( std::unique_ptr<grpc::Service> CreateFakeHandshakerService(
int expected_max_concurrent_rpcs, const std::string& peer_identity) { const std::string& peer_identity) {
return std::unique_ptr<grpc::Service>{new grpc::gcp::FakeHandshakerService( return std::unique_ptr<grpc::Service>{
expected_max_concurrent_rpcs, peer_identity)}; new grpc::gcp::FakeHandshakerService(peer_identity)};
} }
} // namespace gcp } // namespace gcp

@ -27,11 +27,8 @@
namespace grpc { namespace grpc {
namespace gcp { namespace gcp {
// If max_expected_concurrent_rpcs is non-zero, the fake handshake service
// will track the number of concurrent RPCs that it handles and abort
// if if ever exceeds that number.
std::unique_ptr<grpc::Service> CreateFakeHandshakerService( std::unique_ptr<grpc::Service> CreateFakeHandshakerService(
int expected_max_concurrent_rpcs, const std::string& peer_identity); const std::string& peer_identity);
} // namespace gcp } // namespace gcp
} // namespace grpc } // namespace grpc

@ -35,8 +35,7 @@ ABSL_FLAG(std::string, peer_identity, "peer_identity", "The peer identity.");
static void RunFakeHandshakerServer(const std::string& server_address, static void RunFakeHandshakerServer(const std::string& server_address,
const std::string& peer_identity) { const std::string& peer_identity) {
std::unique_ptr<grpc::Service> service = std::unique_ptr<grpc::Service> service =
grpc::gcp::CreateFakeHandshakerService( grpc::gcp::CreateFakeHandshakerService(peer_identity);
/*expected_max_concurrent_rpcs=*/0, peer_identity);
grpc::ServerBuilder builder; grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(service.get()); builder.RegisterService(service.get());

@ -63,8 +63,6 @@
namespace { namespace {
const int kFakeHandshakeServerMaxConcurrentStreams = 40;
void drain_cq(grpc_completion_queue* cq) { void drain_cq(grpc_completion_queue* cq) {
grpc_event ev; grpc_event ev;
do { do {
@ -104,17 +102,10 @@ grpc_channel* create_secure_channel_for_test(
class FakeHandshakeServer { class FakeHandshakeServer {
public: public:
explicit FakeHandshakeServer(bool check_num_concurrent_rpcs) { FakeHandshakeServer() {
int port = grpc_pick_unused_port_or_die(); int port = grpc_pick_unused_port_or_die();
address_ = grpc_core::JoinHostPort("localhost", port); address_ = grpc_core::JoinHostPort("localhost", port);
if (check_num_concurrent_rpcs) { service_ = grpc::gcp::CreateFakeHandshakerService("peer_identity");
service_ = grpc::gcp::CreateFakeHandshakerService(
/*expected_max_concurrent_rpcs=*/
kFakeHandshakeServerMaxConcurrentStreams, "peer_identity");
} else {
service_ = grpc::gcp::CreateFakeHandshakerService(
/*expected_max_concurrent_rpcs=*/0, "peer_identity");
}
grpc::ServerBuilder builder; grpc::ServerBuilder builder;
builder.AddListeningPort(address_, grpc::InsecureServerCredentials()); builder.AddListeningPort(address_, grpc::InsecureServerCredentials());
builder.RegisterService(service_.get()); builder.RegisterService(service_.get());
@ -139,8 +130,7 @@ class FakeHandshakeServer {
class TestServer { class TestServer {
public: public:
explicit TestServer() TestServer() {
: fake_handshake_server_(true /* check num concurrent rpcs */) {
grpc_alts_credentials_options* alts_options = grpc_alts_credentials_options* alts_options =
grpc_alts_credentials_server_options_create(); grpc_alts_credentials_server_options_create();
grpc_server_credentials* server_creds = grpc_server_credentials* server_creds =
@ -285,8 +275,7 @@ class ConnectLoopRunner {
// Perform a few ALTS handshakes sequentially (using the fake, in-process ALTS // Perform a few ALTS handshakes sequentially (using the fake, in-process ALTS
// handshake server). // handshake server).
TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) { TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) {
FakeHandshakeServer fake_handshake_server( FakeHandshakeServer fake_handshake_server;
true /* check num concurrent rpcs */);
TestServer test_server; TestServer test_server;
{ {
ConnectLoopRunner runner( ConnectLoopRunner runner(
@ -300,8 +289,7 @@ TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) {
// Run a bunch of concurrent ALTS handshakes on concurrent channels // Run a bunch of concurrent ALTS handshakes on concurrent channels
// (using the fake, in-process handshake server). // (using the fake, in-process handshake server).
TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) { TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) {
FakeHandshakeServer fake_handshake_server( FakeHandshakeServer fake_handshake_server;
true /* check num concurrent rpcs */);
// Test // Test
{ {
TestServer test_server; TestServer test_server;
@ -336,8 +324,7 @@ TEST(AltsConcurrentConnectivityTest,
// cancellation, and the corresponding fake handshake server's sync // cancellation, and the corresponding fake handshake server's sync
// method handler returning, enforcing a limit on the number of active // method handler returning, enforcing a limit on the number of active
// RPCs at the fake handshake server would be inherently racey. // RPCs at the fake handshake server would be inherently racey.
FakeHandshakeServer fake_handshake_server( FakeHandshakeServer fake_handshake_server;
false /* check num concurrent rpcs */);
// The fake_backend_server emulates a secure (ALTS based) gRPC backend. So // The fake_backend_server emulates a secure (ALTS based) gRPC backend. So
// it waits for the client to send the first bytes. // it waits for the client to send the first bytes.
grpc_core::testing::FakeUdpAndTcpServer fake_backend_server( grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(

Loading…
Cancel
Save