From ea839de878b883b27ce9221570e2ad7ee9d16583 Mon Sep 17 00:00:00 2001 From: Matthew Stevenson <52979934+matthewstevenson88@users.noreply.github.com> Date: Tue, 28 Mar 2023 09:53:46 -0700 Subject: [PATCH] Allow configuring max concurrent ALTS handshakes based on an environment variable. (#32672) The logic is straightforward: attempt to read the `GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES` environment variable and, if it set to an integer, instantiate the handshake queues based on this integer. Based on go/grpc-alts-concurrent-handshake-cap. --- BUILD | 6 ++++- .../alts/handshaker/alts_handshaker_client.cc | 21 ++++++++++++++++- .../alts/handshaker/alts_handshaker_client.h | 5 ++++ test/core/tsi/alts/handshaker/BUILD | 1 + .../handshaker/alts_handshaker_client_test.cc | 23 +++++++++++++++++++ 5 files changed, 54 insertions(+), 2 deletions(-) diff --git a/BUILD b/BUILD index 422f5bdc0e8..b9f11d27831 100644 --- a/BUILD +++ b/BUILD @@ -3208,7 +3208,10 @@ grpc_cc_library( "//src/core:tsi/alts/handshaker/alts_tsi_handshaker_private.h", "//src/core:tsi/alts/handshaker/alts_tsi_utils.h", ], - external_deps = ["upb_lib"], + external_deps = [ + "absl/strings", + "upb_lib", + ], language = "c++", visibility = ["@grpc:public"], deps = [ @@ -3221,6 +3224,7 @@ grpc_cc_library( "tsi_base", "//src/core:channel_args", "//src/core:closure", + "//src/core:env", "//src/core:pollset_set", "//src/core:slice", ], diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc index becc81e27d2..4f71701c9be 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc @@ -22,6 +22,7 @@ #include +#include "absl/strings/numbers.h" #include "upb/upb.hpp" #include @@ -29,6 +30,7 @@ #include #include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/gprpp/env.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/call.h" @@ -40,6 +42,8 @@ #define TSI_ALTS_INITIAL_BUFFER_SIZE 256 const int kHandshakerClientOpNum = 4; +const char kMaxConcurrentStreamsEnvironmentVariable[] = + "GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES"; struct alts_handshaker_client { const alts_handshaker_client_vtable* vtable; @@ -419,7 +423,8 @@ HandshakeQueue* g_client_handshake_queue; HandshakeQueue* g_server_handshake_queue; void DoHandshakeQueuesInit(void) { - const size_t per_queue_max_outstanding_handshakes = 40; + const size_t per_queue_max_outstanding_handshakes = + MaxNumberOfConcurrentHandshakes(); g_client_handshake_queue = new HandshakeQueue(per_queue_max_outstanding_handshakes); g_server_handshake_queue = @@ -924,3 +929,17 @@ void alts_handshaker_client_destroy(alts_handshaker_client* c) { alts_grpc_handshaker_client_unref(client); } } + +size_t MaxNumberOfConcurrentHandshakes() { + size_t max_concurrent_handshakes = 40; + absl::optional env_var_max_concurrent_handshakes = + grpc_core::GetEnv(kMaxConcurrentStreamsEnvironmentVariable); + if (env_var_max_concurrent_handshakes.has_value()) { + size_t effective_max_concurrent_handshakes = 40; + if (absl::SimpleAtoi(*env_var_max_concurrent_handshakes, + &effective_max_concurrent_handshakes)) { + max_concurrent_handshakes = effective_max_concurrent_handshakes; + } + } + return max_concurrent_handshakes; +} diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.h b/src/core/tsi/alts/handshaker/alts_handshaker_client.h index ad82da5563a..69b789cb332 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.h +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.h @@ -157,4 +157,9 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( void alts_handshaker_client_handle_response(alts_handshaker_client* client, bool is_ok); +// Returns the max number of concurrent handshakes that are permitted. +// +// Exposed for testing purposes only. +size_t MaxNumberOfConcurrentHandshakes(); + #endif // GRPC_SRC_CORE_TSI_ALTS_HANDSHAKER_ALTS_HANDSHAKER_CLIENT_H diff --git a/test/core/tsi/alts/handshaker/BUILD b/test/core/tsi/alts/handshaker/BUILD index 8cc5dde7fd7..eb390047a26 100644 --- a/test/core/tsi/alts/handshaker/BUILD +++ b/test/core/tsi/alts/handshaker/BUILD @@ -35,6 +35,7 @@ grpc_cc_test( deps = [ ":alts_handshaker_service_api_test_lib", "//:grpc", + "//src/core:env", "//test/core/util:grpc_test_util", ], ) diff --git a/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc b/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc index 7791a01c27a..37bd1859ed7 100644 --- a/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc +++ b/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc @@ -25,6 +25,7 @@ #include #include +#include "src/core/lib/gprpp/env.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/tsi/alts/handshaker/alts_shared_resource.h" #include "src/core/tsi/alts/handshaker/alts_tsi_handshaker.h" @@ -40,6 +41,8 @@ #define ALTS_HANDSHAKER_CLIENT_TEST_TARGET_SERVICE_ACCOUNT2 "B@google.com" #define ALTS_HANDSHAKER_CLIENT_TEST_MAX_FRAME_SIZE (64 * 1024) +const char kMaxConcurrentStreamsEnvironmentVariable[] = + "GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES"; const size_t kHandshakerClientOpNum = 4; const size_t kMaxRpcVersionMajor = 3; const size_t kMaxRpcVersionMinor = 2; @@ -508,6 +511,26 @@ TEST(AltsHandshakerClientTest, ScheduleRequestGrpcCallFailureTest) { destroy_config(config); } +TEST(MaxNumberOfConcurrentHandshakesTest, Default) { + grpc_core::UnsetEnv(kMaxConcurrentStreamsEnvironmentVariable); + EXPECT_EQ(MaxNumberOfConcurrentHandshakes(), 40); +} + +TEST(MaxNumberOfConcurrentHandshakesTest, EnvVarNotInt) { + grpc_core::SetEnv(kMaxConcurrentStreamsEnvironmentVariable, "not-a-number"); + EXPECT_EQ(MaxNumberOfConcurrentHandshakes(), 40); +} + +TEST(MaxNumberOfConcurrentHandshakesTest, EnvVarNegative) { + grpc_core::SetEnv(kMaxConcurrentStreamsEnvironmentVariable, "-10"); + EXPECT_EQ(MaxNumberOfConcurrentHandshakes(), 40); +} + +TEST(MaxNumberOfConcurrentHandshakesTest, EnvVarSuccess) { + grpc_core::SetEnv(kMaxConcurrentStreamsEnvironmentVariable, "10"); + EXPECT_EQ(MaxNumberOfConcurrentHandshakes(), 10); +} + int main(int argc, char** argv) { grpc::testing::TestEnvironment env(&argc, argv); ::testing::InitGoogleTest(&argc, argv);