From d7c51c6163a965d05fc6d189956a74be367d36e7 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Mon, 4 Nov 2019 15:40:34 -0800 Subject: [PATCH 1/2] Have ALTS handshake clients do a RECV_STATUS op --- .../security/transport/security_handshaker.cc | 1 + .../alts/handshaker/alts_handshaker_client.cc | 189 +++++++++++++++--- .../alts/handshaker/alts_tsi_handshaker.cc | 4 +- .../handshaker/alts_tsi_handshaker_private.h | 5 + .../alts_concurrent_connectivity_test.cc | 130 ++++++++++-- .../handshaker/alts_handshaker_client_test.cc | 32 ++- .../handshaker/alts_tsi_handshaker_test.cc | 112 ++++++++++- 7 files changed, 423 insertions(+), 50 deletions(-) diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index 67e3dc6df4e..bbe86376caa 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -195,6 +195,7 @@ void SecurityHandshaker::HandshakeFailedLocked(grpc_error* error) { gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg); if (!is_shutdown_) { + tsi_handshaker_shutdown(handshaker_); // TODO(ctiller): It is currently necessary to shutdown endpoints // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc index 5d797e51e6d..b3cdf5160ca 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc @@ -24,6 +24,7 @@ #include #include +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" @@ -39,8 +40,18 @@ struct alts_handshaker_client { const alts_handshaker_client_vtable* vtable; }; +struct recv_message_result { + tsi_result status; + const unsigned char* bytes_to_send; + size_t bytes_to_send_size; + tsi_handshaker_result* result; +}; + typedef struct alts_grpc_handshaker_client { alts_handshaker_client base; + /* One ref is held by the entity that created this handshaker_client, and + * another ref is held by the pending RECEIVE_STATUS_ON_CLIENT op. */ + gpr_refcount refs; alts_tsi_handshaker* handshaker; grpc_call* call; /* A pointer to a function handling the interaction with handshaker service. @@ -77,6 +88,18 @@ typedef struct alts_grpc_handshaker_client { /* a buffer containing data to be sent to the grpc client or server's peer. */ unsigned char* buffer; size_t buffer_size; + /** callback for receiving handshake call status */ + grpc_closure on_status_received; + /** gRPC status code of handshake call */ + grpc_status_code handshake_status_code; + /** gRPC status details of handshake call */ + grpc_slice handshake_status_details; + /* mu synchronizes all fields below including their internal fields. */ + gpr_mu mu; + /* indicates if the handshaker call's RECV_STATUS_ON_CLIENT op is done. */ + bool receive_status_finished; + /* if non-null, contains arguments to complete a TSI next callback. */ + recv_message_result* pending_recv_message_result; } alts_grpc_handshaker_client; static void handshaker_client_send_buffer_destroy( @@ -94,6 +117,95 @@ static bool is_handshake_finished_properly(grpc_gcp_HandshakerResp* resp) { return false; } +static void alts_grpc_handshaker_client_unref( + alts_grpc_handshaker_client* client) { + if (gpr_unref(&client->refs)) { + if (client->base.vtable != nullptr && + client->base.vtable->destruct != nullptr) { + client->base.vtable->destruct(&client->base); + } + grpc_byte_buffer_destroy(client->send_buffer); + grpc_byte_buffer_destroy(client->recv_buffer); + client->send_buffer = nullptr; + client->recv_buffer = nullptr; + grpc_metadata_array_destroy(&client->recv_initial_metadata); + grpc_slice_unref_internal(client->recv_bytes); + grpc_slice_unref_internal(client->target_name); + grpc_alts_credentials_options_destroy(client->options); + gpr_free(client->buffer); + grpc_slice_unref_internal(client->handshake_status_details); + gpr_mu_destroy(&client->mu); + gpr_free(client); + } +} + +static void maybe_complete_tsi_next( + alts_grpc_handshaker_client* client, bool receive_status_finished, + recv_message_result* pending_recv_message_result) { + recv_message_result* r; + { + grpc_core::MutexLock lock(&client->mu); + client->receive_status_finished |= receive_status_finished; + if (pending_recv_message_result != nullptr) { + GPR_ASSERT(client->pending_recv_message_result == nullptr); + client->pending_recv_message_result = pending_recv_message_result; + } + if (client->pending_recv_message_result == nullptr) { + return; + } + const bool have_final_result = + client->pending_recv_message_result->result != nullptr || + client->pending_recv_message_result->status != TSI_OK; + if (have_final_result && !client->receive_status_finished) { + // If we've received the final message from the handshake + // server, or we're about to invoke the TSI next callback + // with a status other than TSI_OK (which terminates the + // handshake), then first wait for the RECV_STATUS op to complete. + return; + } + r = client->pending_recv_message_result; + client->pending_recv_message_result = nullptr; + } + client->cb(r->status, client->user_data, r->bytes_to_send, + r->bytes_to_send_size, r->result); + gpr_free(r); +} + +static void on_status_received(void* arg, grpc_error* error) { + alts_grpc_handshaker_client* client = + static_cast(arg); + if (client->handshake_status_code != GRPC_STATUS_OK) { + // TODO(apolcyn): consider overriding the handshake result's + // status from the final ALTS message with the status here. + char* status_details = + grpc_slice_to_c_string(client->handshake_status_details); + gpr_log(GPR_INFO, + "alts_grpc_handshaker_client:%p on_status_received " + "status:%d details:|%s| error:|%s|", + client, client->handshake_status_code, status_details, + grpc_error_string(error)); + gpr_free(status_details); + } + maybe_complete_tsi_next(client, true /* receive_status_finished */, + nullptr /* pending_recv_message_result */); + alts_grpc_handshaker_client_unref(client); +} + +static void handle_response_done(alts_grpc_handshaker_client* client, + tsi_result status, + const unsigned char* bytes_to_send, + size_t bytes_to_send_size, + tsi_handshaker_result* result) { + recv_message_result* p = + static_cast(gpr_zalloc(sizeof(*p))); + p->status = status; + p->bytes_to_send = bytes_to_send; + p->bytes_to_send_size = bytes_to_send_size; + p->result = result; + maybe_complete_tsi_next(client, false /* receive_status_finished */, + p /* pending_recv_message_result */); +} + void alts_handshaker_client_handle_response(alts_handshaker_client* c, bool is_ok) { GPR_ASSERT(c != nullptr); @@ -101,38 +213,35 @@ void alts_handshaker_client_handle_response(alts_handshaker_client* c, reinterpret_cast(c); grpc_byte_buffer* recv_buffer = client->recv_buffer; grpc_status_code status = client->status; - tsi_handshaker_on_next_done_cb cb = client->cb; - void* user_data = client->user_data; alts_tsi_handshaker* handshaker = client->handshaker; - /* Invalid input check. */ - if (cb == nullptr) { + if (client->cb == nullptr) { gpr_log(GPR_ERROR, - "cb is nullptr in alts_tsi_handshaker_handle_response()"); + "client->cb is nullptr in alts_tsi_handshaker_handle_response()"); return; } if (handshaker == nullptr) { gpr_log(GPR_ERROR, "handshaker is nullptr in alts_tsi_handshaker_handle_response()"); - cb(TSI_INTERNAL_ERROR, user_data, nullptr, 0, nullptr); + handle_response_done(client, TSI_INTERNAL_ERROR, nullptr, 0, nullptr); return; } /* TSI handshake has been shutdown. */ if (alts_tsi_handshaker_has_shutdown(handshaker)) { gpr_log(GPR_ERROR, "TSI handshake shutdown"); - cb(TSI_HANDSHAKE_SHUTDOWN, user_data, nullptr, 0, nullptr); + handle_response_done(client, TSI_HANDSHAKE_SHUTDOWN, nullptr, 0, nullptr); return; } /* Failed grpc call check. */ if (!is_ok || status != GRPC_STATUS_OK) { gpr_log(GPR_ERROR, "grpc call made to handshaker service failed"); - cb(TSI_INTERNAL_ERROR, user_data, nullptr, 0, nullptr); + handle_response_done(client, TSI_INTERNAL_ERROR, nullptr, 0, nullptr); return; } if (recv_buffer == nullptr) { gpr_log(GPR_ERROR, "recv_buffer is nullptr in alts_tsi_handshaker_handle_response()"); - cb(TSI_INTERNAL_ERROR, user_data, nullptr, 0, nullptr); + handle_response_done(client, TSI_INTERNAL_ERROR, nullptr, 0, nullptr); return; } upb::Arena arena; @@ -143,14 +252,14 @@ void alts_handshaker_client_handle_response(alts_handshaker_client* c, /* Invalid handshaker response check. */ if (resp == nullptr) { gpr_log(GPR_ERROR, "alts_tsi_utils_deserialize_response() failed"); - cb(TSI_DATA_CORRUPTED, user_data, nullptr, 0, nullptr); + handle_response_done(client, TSI_DATA_CORRUPTED, nullptr, 0, nullptr); return; } const grpc_gcp_HandshakerStatus* resp_status = grpc_gcp_HandshakerResp_status(resp); if (resp_status == nullptr) { gpr_log(GPR_ERROR, "No status in HandshakerResp"); - cb(TSI_DATA_CORRUPTED, user_data, nullptr, 0, nullptr); + handle_response_done(client, TSI_DATA_CORRUPTED, nullptr, 0, nullptr); return; } upb_strview out_frames = grpc_gcp_HandshakerResp_out_frames(resp); @@ -184,8 +293,12 @@ void alts_handshaker_client_handle_response(alts_handshaker_client* c, gpr_free(error_details); } } - cb(alts_tsi_utils_convert_to_tsi_result(code), user_data, bytes_to_send, - bytes_to_send_size, result); + // TODO(apolcyn): consider short ciruiting handle_response_done and + // invoking the TSI callback directly if we aren't done yet, if + // handle_response_done's allocation per message received causes + // a performance issue. + handle_response_done(client, alts_tsi_utils_convert_to_tsi_result(code), + bytes_to_send, bytes_to_send_size, result); } /** @@ -200,6 +313,23 @@ static tsi_result make_grpc_call(alts_handshaker_client* c, bool is_start) { memset(ops, 0, sizeof(ops)); grpc_op* op = ops; if (is_start) { + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = nullptr; + op->data.recv_status_on_client.status = &client->handshake_status_code; + op->data.recv_status_on_client.status_details = + &client->handshake_status_details; + op->flags = 0; + op->reserved = nullptr; + op++; + GPR_ASSERT(op - ops <= kHandshakerClientOpNum); + gpr_ref(&client->refs); + grpc_call_error call_error = + client->grpc_caller(client->call, ops, static_cast(op - ops), + &client->on_status_received); + // TODO(apolcyn): return the error here instead, as done for other ops? + GPR_ASSERT(call_error == GRPC_CALL_OK); + memset(ops, 0, sizeof(ops)); + op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op++; @@ -455,6 +585,8 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( } alts_grpc_handshaker_client* client = static_cast(gpr_zalloc(sizeof(*client))); + gpr_mu_init(&client->mu); + gpr_ref_init(&client->refs, 1); client->grpc_caller = grpc_call_start_batch_and_execute; client->handshaker = handshaker; client->cb = cb; @@ -481,6 +613,8 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( vtable_for_testing == nullptr ? &vtable : vtable_for_testing; GRPC_CLOSURE_INIT(&client->on_handshaker_service_resp_recv, grpc_cb, client, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&client->on_status_received, on_status_received, client, + grpc_schedule_on_exec_ctx); grpc_slice_unref_internal(slice); return &client->base; } @@ -590,6 +724,21 @@ grpc_closure* alts_handshaker_client_get_closure_for_testing( return &client->on_handshaker_service_resp_recv; } +void alts_handshaker_client_ref_for_testing(alts_handshaker_client* c) { + alts_grpc_handshaker_client* client = + reinterpret_cast(c); + gpr_ref(&client->refs); +} + +void alts_handshaker_client_on_status_received_for_testing( + alts_handshaker_client* c, grpc_status_code status, grpc_error* error) { + alts_grpc_handshaker_client* client = + reinterpret_cast(c); + client->handshake_status_code = status; + client->handshake_status_details = grpc_empty_slice(); + GRPC_CLOSURE_RUN(&client->on_status_received, error); +} + } // namespace internal } // namespace grpc_core @@ -634,20 +783,8 @@ void alts_handshaker_client_shutdown(alts_handshaker_client* client) { void alts_handshaker_client_destroy(alts_handshaker_client* c) { if (c != nullptr) { - if (c->vtable != nullptr && c->vtable->destruct != nullptr) { - c->vtable->destruct(c); - } alts_grpc_handshaker_client* client = reinterpret_cast(c); - grpc_byte_buffer_destroy(client->send_buffer); - grpc_byte_buffer_destroy(client->recv_buffer); - client->send_buffer = nullptr; - client->recv_buffer = nullptr; - grpc_metadata_array_destroy(&client->recv_initial_metadata); - grpc_slice_unref_internal(client->recv_bytes); - grpc_slice_unref_internal(client->target_name); - grpc_alts_credentials_options_destroy(client->options); - gpr_free(client->buffer); - gpr_free(client); + alts_grpc_handshaker_client_unref(client); } } diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc index ab9a0cfa32f..fc2729a5a8d 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc @@ -514,7 +514,9 @@ static void handshaker_shutdown(tsi_handshaker* self) { if (handshaker->shutdown) { return; } - alts_handshaker_client_shutdown(handshaker->client); + if (handshaker->client != nullptr) { + alts_handshaker_client_shutdown(handshaker->client); + } handshaker->shutdown = true; } diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker_private.h b/src/core/tsi/alts/handshaker/alts_tsi_handshaker_private.h index ec2616e95fc..cb99fdcce0d 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker_private.h +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker_private.h @@ -77,6 +77,11 @@ void alts_handshaker_client_set_cb_for_testing( grpc_closure* alts_handshaker_client_get_closure_for_testing( alts_handshaker_client* client); +void alts_handshaker_client_on_status_received_for_testing( + alts_handshaker_client* client, grpc_status_code status, grpc_error* error); + +void alts_handshaker_client_ref_for_testing(alts_handshaker_client* c); + } // namespace internal } // namespace grpc_core diff --git a/test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc b/test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc index 853dce5c88a..abd608a6364 100644 --- a/test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc +++ b/test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc @@ -61,6 +61,8 @@ namespace { +const int kFakeHandshakeServerMaxConcurrentStreams = 40; + void drain_cq(grpc_completion_queue* cq) { grpc_event ev; do { @@ -70,7 +72,8 @@ void drain_cq(grpc_completion_queue* cq) { } grpc_channel* create_secure_channel_for_test( - const char* server_addr, const char* fake_handshake_server_addr) { + const char* server_addr, const char* fake_handshake_server_addr, + int reconnect_backoff_ms) { grpc_alts_credentials_options* alts_options = grpc_alts_credentials_client_options_create(); grpc_channel_credentials* channel_creds = @@ -80,11 +83,19 @@ grpc_channel* create_secure_channel_for_test( grpc_alts_credentials_options_destroy(alts_options); // The main goal of these tests are to stress concurrent ALTS handshakes, // so we prevent subchnannel sharing. - grpc_arg disable_subchannel_sharing_arg = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true); - grpc_channel_args channel_args = {1, &disable_subchannel_sharing_arg}; + std::vector new_args; + new_args.push_back(grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true)); + if (reconnect_backoff_ms != 0) { + new_args.push_back(grpc_channel_arg_integer_create( + const_cast("grpc.testing.fixed_reconnect_backoff_ms"), + reconnect_backoff_ms)); + } + grpc_channel_args* channel_args = + grpc_channel_args_copy_and_add(nullptr, new_args.data(), new_args.size()); grpc_channel* channel = grpc_secure_channel_create(channel_creds, server_addr, - &channel_args, nullptr); + channel_args, nullptr); + grpc_channel_args_destroy(channel_args); grpc_channel_credentials_release(channel_creds); return channel; } @@ -98,6 +109,8 @@ class FakeHandshakeServer { grpc::ServerBuilder builder; builder.AddListeningPort(address_.get(), grpc::InsecureServerCredentials()); builder.RegisterService(service_.get()); + builder.AddChannelArgument(GRPC_ARG_MAX_CONCURRENT_STREAMS, + kFakeHandshakeServerMaxConcurrentStreams); server_ = builder.BuildAndStart(); gpr_log(GPR_INFO, "Fake handshaker server listening on %s", address_.get()); } @@ -116,12 +129,12 @@ class FakeHandshakeServer { class TestServer { public: - explicit TestServer(const char* fake_handshake_server_address) { + explicit TestServer() { grpc_alts_credentials_options* alts_options = grpc_alts_credentials_server_options_create(); grpc_server_credentials* server_creds = grpc_alts_server_credentials_create_customized( - alts_options, fake_handshake_server_address, + alts_options, fake_handshake_server_.address(), true /* enable_untrusted_alts */); grpc_alts_credentials_options_destroy(alts_options); server_ = grpc_server_create(nullptr, nullptr); @@ -164,6 +177,15 @@ class TestServer { grpc_completion_queue* server_cq_; std::unique_ptr server_thd_; grpc_core::UniquePtr server_addr_; + // Give this test server its own ALTS handshake server + // so that we avoid competing for ALTS handshake server resources (e.g. + // available HTTP2 streams on a globally shared handshaker subchannel) + // with clients that are trying to do mutual ALTS handshakes + // with this server (which could "deadlock" mutual handshakes). + // TODO(apolcyn): remove this workaround from this test and have + // clients/servers share a single fake handshake server if + // the underlying issue needs to be fixed. + FakeHandshakeServer fake_handshake_server_; }; class ConnectLoopRunner { @@ -171,13 +193,15 @@ class ConnectLoopRunner { explicit ConnectLoopRunner( const char* server_address, const char* fake_handshake_server_addr, int per_connect_deadline_seconds, size_t loops, - grpc_connectivity_state expected_connectivity_states) + grpc_connectivity_state expected_connectivity_states, + int reconnect_backoff_ms) : server_address_(grpc_core::UniquePtr(gpr_strdup(server_address))), fake_handshake_server_addr_( grpc_core::UniquePtr(gpr_strdup(fake_handshake_server_addr))), per_connect_deadline_seconds_(per_connect_deadline_seconds), loops_(loops), - expected_connectivity_states_(expected_connectivity_states) { + expected_connectivity_states_(expected_connectivity_states), + reconnect_backoff_ms_(reconnect_backoff_ms) { thd_ = std::unique_ptr(new std::thread(ConnectLoop, this)); } @@ -189,7 +213,8 @@ class ConnectLoopRunner { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); grpc_channel* channel = create_secure_channel_for_test( - self->server_address_.get(), self->fake_handshake_server_addr_.get()); + self->server_address_.get(), self->fake_handshake_server_addr_.get(), + self->reconnect_backoff_ms_); // Connect, forcing an ALTS handshake gpr_timespec connect_deadline = grpc_timeout_seconds_to_deadline(self->per_connect_deadline_seconds_); @@ -228,18 +253,20 @@ class ConnectLoopRunner { size_t loops_; grpc_connectivity_state expected_connectivity_states_; std::unique_ptr thd_; + int reconnect_backoff_ms_; }; // Perform a few ALTS handshakes sequentially (using the fake, in-process ALTS // handshake server). TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) { FakeHandshakeServer fake_handshake_server; - TestServer test_server(fake_handshake_server.address()); + TestServer test_server; { ConnectLoopRunner runner( test_server.address(), fake_handshake_server.address(), 5 /* per connect deadline seconds */, 10 /* loops */, - GRPC_CHANNEL_READY /* expected connectivity states */); + GRPC_CHANNEL_READY /* expected connectivity states */, + 0 /* reconnect_backoff_ms unset */); } } @@ -249,7 +276,7 @@ TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) { FakeHandshakeServer fake_handshake_server; // Test { - TestServer test_server(fake_handshake_server.address()); + TestServer test_server; gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20); size_t num_concurrent_connects = 50; std::vector> connect_loop_runners; @@ -260,7 +287,8 @@ TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) { std::unique_ptr(new ConnectLoopRunner( test_server.address(), fake_handshake_server.address(), 15 /* per connect deadline seconds */, 5 /* loops */, - GRPC_CHANNEL_READY /* expected connectivity states */))); + GRPC_CHANNEL_READY /* expected connectivity states */, + 0 /* reconnect_backoff_ms unset */))); } connect_loop_runners.clear(); gpr_log(GPR_DEBUG, @@ -447,11 +475,12 @@ TEST(AltsConcurrentConnectivityTest, size_t num_concurrent_connects = 100; gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects"); for (size_t i = 0; i < num_concurrent_connects; i++) { - connect_loop_runners.push_back(std::unique_ptr< - ConnectLoopRunner>(new ConnectLoopRunner( - fake_tcp_server.address(), fake_handshake_server.address(), - 10 /* per connect deadline seconds */, 3 /* loops */, - GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */))); + connect_loop_runners.push_back( + std::unique_ptr(new ConnectLoopRunner( + fake_tcp_server.address(), fake_handshake_server.address(), + 10 /* per connect deadline seconds */, 3 /* loops */, + GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */, + 0 /* reconnect_backoff_ms unset */))); } connect_loop_runners.clear(); gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects"); @@ -464,6 +493,69 @@ TEST(AltsConcurrentConnectivityTest, } } +/* This test is intended to make sure that ALTS handshakes correctly + * fail fast when the ALTS handshake server fails incoming handshakes fast. */ +TEST(AltsConcurrentConnectivityTest, + TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) { + FakeTcpServer fake_handshake_server( + FakeTcpServer::CloseSocketUponReceivingBytesFromPeer); + FakeTcpServer fake_tcp_server(FakeTcpServer::CloseSocketUponCloseFromPeer); + { + gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20); + std::vector> connect_loop_runners; + size_t num_concurrent_connects = 100; + gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects"); + for (size_t i = 0; i < num_concurrent_connects; i++) { + connect_loop_runners.push_back( + std::unique_ptr(new ConnectLoopRunner( + fake_tcp_server.address(), fake_handshake_server.address(), + 10 /* per connect deadline seconds */, 2 /* loops */, + GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */, + 0 /* reconnect_backoff_ms unset */))); + } + connect_loop_runners.clear(); + gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects"); + if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) { + gpr_log(GPR_ERROR, + "Exceeded test deadline. ALTS handshakes might not be failing " + "fast when the handshake server closes new connections"); + abort(); + } + } +} + +/* This test is intended to make sure that ALTS handshakes correctly + * fail fast when the ALTS handshake server is non-responsive, in which case + * the overall connection deadline kicks in. */ +TEST(AltsConcurrentConnectivityTest, + TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) { + FakeTcpServer fake_handshake_server( + FakeTcpServer::CloseSocketUponCloseFromPeer); + FakeTcpServer fake_tcp_server(FakeTcpServer::CloseSocketUponCloseFromPeer); + { + gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20); + std::vector> connect_loop_runners; + size_t num_concurrent_connects = 100; + gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects"); + for (size_t i = 0; i < num_concurrent_connects; i++) { + connect_loop_runners.push_back( + std::unique_ptr(new ConnectLoopRunner( + fake_tcp_server.address(), fake_handshake_server.address(), + 10 /* per connect deadline seconds */, 2 /* loops */, + GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */, + 100 /* reconnect_backoff_ms */))); + } + connect_loop_runners.clear(); + gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects"); + if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) { + gpr_log(GPR_ERROR, + "Exceeded test deadline. ALTS handshakes might not be failing " + "fast when the handshake server is non-response timeout occurs"); + abort(); + } + } +} + } // namespace int main(int argc, char** argv) { diff --git a/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc b/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc index 6f08b209348..e4a6595e3a2 100644 --- a/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc +++ b/test/core/tsi/alts/handshaker/alts_handshaker_client_test.cc @@ -43,6 +43,8 @@ using grpc_core::internal:: using grpc_core::internal:: alts_handshaker_client_get_recv_buffer_addr_for_testing; using grpc_core::internal::alts_handshaker_client_get_send_buffer_for_testing; +using grpc_core::internal:: + alts_handshaker_client_on_status_received_for_testing; using grpc_core::internal::alts_handshaker_client_set_grpc_caller_for_testing; typedef struct alts_handshaker_client_test_config { @@ -130,6 +132,13 @@ static grpc_gcp_HandshakerReq* deserialize_handshaker_req( return req; } +static bool is_recv_status_op(const grpc_op* op, size_t nops) { + if (nops == 1 && op->op == GRPC_OP_RECV_STATUS_ON_CLIENT) { + return true; + } + return false; +} + /** * A mock grpc_caller used to check if client_start, server_start, and next * operations correctly handle invalid arguments. It should not be called. @@ -151,6 +160,10 @@ static grpc_call_error check_client_start_success(grpc_call* /*call*/, const grpc_op* op, size_t nops, grpc_closure* closure) { + // RECV_STATUS ops are asserted to always succeed + if (is_recv_status_op(op, nops)) { + return GRPC_CALL_OK; + } upb::Arena arena; alts_handshaker_client* client = static_cast(closure->cb_arg); @@ -196,6 +209,10 @@ static grpc_call_error check_server_start_success(grpc_call* /*call*/, const grpc_op* op, size_t nops, grpc_closure* closure) { + // RECV_STATUS ops are asserted to always succeed + if (is_recv_status_op(op, nops)) { + return GRPC_CALL_OK; + } upb::Arena arena; alts_handshaker_client* client = static_cast(closure->cb_arg); @@ -259,9 +276,12 @@ static grpc_call_error check_next_success(grpc_call* /*call*/, * handshaker service fails. */ static grpc_call_error check_grpc_call_failure(grpc_call* /*call*/, - const grpc_op* /*op*/, - size_t /*nops*/, + const grpc_op* op, size_t nops, grpc_closure* /*tag*/) { + // RECV_STATUS ops are asserted to always succeed + if (is_recv_status_op(op, nops)) { + return GRPC_CALL_OK; + } return GRPC_CALL_ERROR; } @@ -374,6 +394,10 @@ static void schedule_request_success_test() { GPR_ASSERT(alts_handshaker_client_next(config->server, &config->out_frame) == TSI_OK); /* Cleanup. */ + alts_handshaker_client_on_status_received_for_testing( + config->client, GRPC_STATUS_OK, GRPC_ERROR_NONE); + alts_handshaker_client_on_status_received_for_testing( + config->server, GRPC_STATUS_OK, GRPC_ERROR_NONE); destroy_config(config); } @@ -397,6 +421,10 @@ static void schedule_request_grpc_call_failure_test() { GPR_ASSERT(alts_handshaker_client_next(config->server, &config->out_frame) == TSI_INTERNAL_ERROR); /* Cleanup. */ + alts_handshaker_client_on_status_received_for_testing( + config->client, GRPC_STATUS_OK, GRPC_ERROR_NONE); + alts_handshaker_client_on_status_received_for_testing( + config->server, GRPC_STATUS_OK, GRPC_ERROR_NONE); destroy_config(config); } diff --git a/test/core/tsi/alts/handshaker/alts_tsi_handshaker_test.cc b/test/core/tsi/alts/handshaker/alts_tsi_handshaker_test.cc index a102aee3cdc..b516396850f 100644 --- a/test/core/tsi/alts/handshaker/alts_tsi_handshaker_test.cc +++ b/test/core/tsi/alts/handshaker/alts_tsi_handshaker_test.cc @@ -52,6 +52,9 @@ using grpc_core::internal::alts_handshaker_client_check_fields_for_testing; using grpc_core::internal::alts_handshaker_client_get_handshaker_for_testing; using grpc_core::internal:: alts_handshaker_client_get_recv_buffer_addr_for_testing; +using grpc_core::internal:: + alts_handshaker_client_on_status_received_for_testing; +using grpc_core::internal::alts_handshaker_client_ref_for_testing; using grpc_core::internal::alts_handshaker_client_set_cb_for_testing; using grpc_core::internal::alts_handshaker_client_set_fields_for_testing; using grpc_core::internal::alts_handshaker_client_set_recv_bytes_for_testing; @@ -620,7 +623,7 @@ static void on_failed_grpc_call_cb(tsi_result status, void* user_data, GPR_ASSERT(result == nullptr); } -static void check_handle_response_invalid_input() { +static void check_handle_response_nullptr_handshaker() { /* Initialization. */ notification_init(&caller_to_tsi_notification); notification_init(&tsi_to_caller_notification); @@ -642,20 +645,107 @@ static void check_handle_response_invalid_input() { on_invalid_input_cb, nullptr, recv_buffer, GRPC_STATUS_OK); alts_handshaker_client_handle_response(client, true); + /* Note: here and elsewhere in this test, we first ref the handshaker in order + * to match the unref that on_status_received will do. This necessary + * because this test mocks out the grpc call in such a way that the code + * path that would usually take this ref is skipped. */ + alts_handshaker_client_ref_for_testing(client); + alts_handshaker_client_on_status_received_for_testing(client, GRPC_STATUS_OK, + GRPC_ERROR_NONE); + /* Cleanup. */ + grpc_slice_unref(slice); + run_tsi_handshaker_destroy_with_exec_ctx(handshaker); + notification_destroy(&caller_to_tsi_notification); + notification_destroy(&tsi_to_caller_notification); +} + +static void check_handle_response_nullptr_recv_bytes() { + /* Initialization. */ + notification_init(&caller_to_tsi_notification); + notification_init(&tsi_to_caller_notification); + /** + * Create a handshaker at the client side, for which internal mock client is + * always going to fail. + */ + tsi_handshaker* handshaker = create_test_handshaker(true /* is_client */); + tsi_handshaker_next(handshaker, nullptr, 0, nullptr, nullptr, nullptr, + on_client_start_success_cb, nullptr); + alts_tsi_handshaker* alts_handshaker = + reinterpret_cast(handshaker); + alts_handshaker_client* client = + alts_tsi_handshaker_get_client_for_testing(alts_handshaker); /* Check nullptr recv_bytes. */ alts_handshaker_client_set_fields_for_testing(client, alts_handshaker, on_invalid_input_cb, nullptr, nullptr, GRPC_STATUS_OK); alts_handshaker_client_handle_response(client, true); + alts_handshaker_client_ref_for_testing(client); + alts_handshaker_client_on_status_received_for_testing(client, GRPC_STATUS_OK, + GRPC_ERROR_NONE); + /* Cleanup. */ + run_tsi_handshaker_destroy_with_exec_ctx(handshaker); + notification_destroy(&caller_to_tsi_notification); + notification_destroy(&tsi_to_caller_notification); +} + +static void check_handle_response_failed_grpc_call_to_handshaker_service() { + /* Initialization. */ + notification_init(&caller_to_tsi_notification); + notification_init(&tsi_to_caller_notification); + /** + * Create a handshaker at the client side, for which internal mock client is + * always going to fail. + */ + tsi_handshaker* handshaker = create_test_handshaker(true /* is_client */); + tsi_handshaker_next(handshaker, nullptr, 0, nullptr, nullptr, nullptr, + on_client_start_success_cb, nullptr); + alts_tsi_handshaker* alts_handshaker = + reinterpret_cast(handshaker); + grpc_slice slice = grpc_empty_slice(); + grpc_byte_buffer* recv_buffer = grpc_raw_byte_buffer_create(&slice, 1); + alts_handshaker_client* client = + alts_tsi_handshaker_get_client_for_testing(alts_handshaker); /* Check failed grpc call made to handshaker service. */ alts_handshaker_client_set_fields_for_testing( client, alts_handshaker, on_failed_grpc_call_cb, nullptr, recv_buffer, GRPC_STATUS_UNKNOWN); alts_handshaker_client_handle_response(client, true); + alts_handshaker_client_ref_for_testing(client); + alts_handshaker_client_on_status_received_for_testing( + client, GRPC_STATUS_UNKNOWN, GRPC_ERROR_NONE); + /* Cleanup. */ + grpc_slice_unref(slice); + run_tsi_handshaker_destroy_with_exec_ctx(handshaker); + notification_destroy(&caller_to_tsi_notification); + notification_destroy(&tsi_to_caller_notification); +} + +static void +check_handle_response_failed_recv_message_from_handshaker_service() { + /* Initialization. */ + notification_init(&caller_to_tsi_notification); + notification_init(&tsi_to_caller_notification); + /** + * Create a handshaker at the client side, for which internal mock client is + * always going to fail. + */ + tsi_handshaker* handshaker = create_test_handshaker(true /* is_client */); + tsi_handshaker_next(handshaker, nullptr, 0, nullptr, nullptr, nullptr, + on_client_start_success_cb, nullptr); + alts_tsi_handshaker* alts_handshaker = + reinterpret_cast(handshaker); + grpc_slice slice = grpc_empty_slice(); + grpc_byte_buffer* recv_buffer = grpc_raw_byte_buffer_create(&slice, 1); + alts_handshaker_client* client = + alts_tsi_handshaker_get_client_for_testing(alts_handshaker); + /* Check failed recv message op from handshaker service. */ alts_handshaker_client_set_fields_for_testing(client, alts_handshaker, on_failed_grpc_call_cb, nullptr, recv_buffer, GRPC_STATUS_OK); alts_handshaker_client_handle_response(client, false); + alts_handshaker_client_ref_for_testing(client); + alts_handshaker_client_on_status_received_for_testing(client, GRPC_STATUS_OK, + GRPC_ERROR_NONE); /* Cleanup. */ grpc_slice_unref(slice); run_tsi_handshaker_destroy_with_exec_ctx(handshaker); @@ -695,6 +785,9 @@ static void check_handle_response_invalid_resp() { on_invalid_resp_cb, nullptr, recv_buffer, GRPC_STATUS_OK); alts_handshaker_client_handle_response(client, true); + alts_handshaker_client_ref_for_testing(client); + alts_handshaker_client_on_status_received_for_testing(client, GRPC_STATUS_OK, + GRPC_ERROR_NONE); /* Cleanup. */ run_tsi_handshaker_destroy_with_exec_ctx(handshaker); notification_destroy(&caller_to_tsi_notification); @@ -708,12 +801,18 @@ static void check_handle_response_success(void* /*unused*/) { /* Client next. */ wait(&caller_to_tsi_notification); alts_handshaker_client_handle_response(cb_event, true /* is_ok */); + alts_handshaker_client_ref_for_testing(cb_event); + alts_handshaker_client_on_status_received_for_testing( + cb_event, GRPC_STATUS_OK, GRPC_ERROR_NONE); /* Server start. */ wait(&caller_to_tsi_notification); alts_handshaker_client_handle_response(cb_event, true /* is_ok */); /* Server next. */ wait(&caller_to_tsi_notification); alts_handshaker_client_handle_response(cb_event, true /* is_ok */); + alts_handshaker_client_ref_for_testing(cb_event); + alts_handshaker_client_on_status_received_for_testing( + cb_event, GRPC_STATUS_OK, GRPC_ERROR_NONE); } static void on_failed_resp_cb(tsi_result status, void* user_data, @@ -748,6 +847,9 @@ static void check_handle_response_failure() { on_failed_resp_cb, nullptr, recv_buffer, GRPC_STATUS_OK); alts_handshaker_client_handle_response(client, true /* is_ok*/); + alts_handshaker_client_ref_for_testing(client); + alts_handshaker_client_on_status_received_for_testing(client, GRPC_STATUS_OK, + GRPC_ERROR_NONE); /* Cleanup. */ run_tsi_handshaker_destroy_with_exec_ctx(handshaker); notification_destroy(&caller_to_tsi_notification); @@ -787,6 +889,9 @@ static void check_handle_response_after_shutdown() { on_shutdown_resp_cb, nullptr, recv_buffer, GRPC_STATUS_OK); alts_handshaker_client_handle_response(client, true); + alts_handshaker_client_ref_for_testing(client); + alts_handshaker_client_on_status_received_for_testing(client, GRPC_STATUS_OK, + GRPC_ERROR_NONE); /* Cleanup. */ run_tsi_handshaker_destroy_with_exec_ctx(handshaker); notification_destroy(&caller_to_tsi_notification); @@ -837,7 +942,10 @@ int main(int /*argc*/, char** /*argv*/) { should_handshaker_client_api_succeed = false; check_handshaker_shutdown_invalid_input(); check_handshaker_next_failure(); - check_handle_response_invalid_input(); + check_handle_response_nullptr_handshaker(); + check_handle_response_nullptr_recv_bytes(); + check_handle_response_failed_grpc_call_to_handshaker_service(); + check_handle_response_failed_recv_message_from_handshaker_service(); check_handle_response_invalid_resp(); check_handle_response_failure(); /* Cleanup. */ From 8e9c5a4c3355d028b8d2e1780eaf42e547ba10fc Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Mon, 11 Nov 2019 10:38:52 -0800 Subject: [PATCH 2/2] Update CLOSURE_RUN to Closure::Run --- src/core/tsi/alts/handshaker/alts_handshaker_client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc index b3cdf5160ca..e549948aeff 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc @@ -736,7 +736,7 @@ void alts_handshaker_client_on_status_received_for_testing( reinterpret_cast(c); client->handshake_status_code = status; client->handshake_status_details = grpc_empty_slice(); - GRPC_CLOSURE_RUN(&client->on_status_received, error); + grpc_core::Closure::Run(DEBUG_LOCATION, &client->on_status_received, error); } } // namespace internal