From 475623c489cbb6f5e03178b8ddfb8b05a6a38b1b Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Thu, 9 Jun 2022 17:54:59 -0700 Subject: [PATCH] Provide means to control TCP frame sizes in response to high memory pressure (#29793) * Provide means to control TCP frame sizes in response to high memory pressure * static_cast * fix sanity checks * removing endpoint_write frame clipping. this could be added in a separate PR * fix sanity checks * remove unused parameter * addressing review comments * rename functions * add a TODO * fixing naming issues * add changes to test * Save ENOBUFS errno correctly in tcp_posix for subsequent handling * updaing min_progress_size computation * Revert "Save ENOBUFS errno correctly in tcp_posix for subsequent handling" This reverts commit 5e1d10ac9b7166e9cdeaf3b0950b0974f5189189. * fine tuning min progress size estimation and updating unit tests to verify returned min progress size --- .../lib/security/transport/secure_endpoint.cc | 18 ++++++++++++-- .../alts_zero_copy_grpc_protector.cc | 10 +++++++- src/core/tsi/fake_transport_security.cc | 9 ++++++- src/core/tsi/transport_security_grpc.cc | 5 ++-- src/core/tsi/transport_security_grpc.h | 7 ++++-- .../alts_zero_copy_grpc_protector_test.cc | 24 +++++++++++++------ 6 files changed, 58 insertions(+), 15 deletions(-) diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index 233068840d5..dd158eeef03 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -42,6 +42,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/tsi/transport_security_grpc.h" +#include "src/core/tsi/transport_security_interface.h" #define STAGING_BUFFER_SIZE 8192 @@ -84,6 +85,7 @@ struct secure_endpoint { memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE)); } has_posted_reclaimer.store(false, std::memory_order_relaxed); + min_progress_size = 1; gpr_ref_init(&ref, 1); } @@ -121,6 +123,7 @@ struct secure_endpoint { grpc_core::MemoryOwner memory_owner; grpc_core::MemoryAllocator::Reservation self_reservation; std::atomic has_posted_reclaimer; + int min_progress_size; gpr_refcount ref; }; @@ -249,8 +252,19 @@ static void on_read(void* user_data, grpc_error_handle error) { if (ep->zero_copy_protector != nullptr) { // Use zero-copy grpc protector to unprotect. + int min_progress_size = 1; + // Get the size of the last frame which is not yet fully decrypted. + // This estimated frame size is stored in ep->min_progress_size which is + // passed to the TCP layer to indicate the minimum number of + // bytes that need to be read to make meaningful progress. This would + // avoid reading of small slices from the network. + // TODO(vigneshbabu): Set min_progress_size in the regular (non-zero-copy) + // frame protector code path as well. result = tsi_zero_copy_grpc_protector_unprotect( - ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer); + ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer, + &min_progress_size); + min_progress_size = std::max(1, min_progress_size); + ep->min_progress_size = result != TSI_OK ? 1 : min_progress_size; } else { // Use frame protector to unprotect. /* TODO(yangg) check error, maybe bail out early */ @@ -336,7 +350,7 @@ static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, } grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent, - /*min_progress_size=*/1); + /*min_progress_size=*/ep->min_progress_size); } static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, diff --git a/src/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector.cc b/src/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector.cc index d5455b6d76d..1c60e18ea19 100644 --- a/src/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector.cc +++ b/src/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector.cc @@ -174,7 +174,7 @@ static tsi_result alts_zero_copy_grpc_protector_protect( static tsi_result alts_zero_copy_grpc_protector_unprotect( tsi_zero_copy_grpc_protector* self, grpc_slice_buffer* protected_slices, - grpc_slice_buffer* unprotected_slices) { + grpc_slice_buffer* unprotected_slices, int* min_progress_size) { if (self == nullptr || unprotected_slices == nullptr || protected_slices == nullptr) { gpr_log(GPR_ERROR, @@ -215,6 +215,14 @@ static tsi_result alts_zero_copy_grpc_protector_unprotect( return status; } } + if (min_progress_size != nullptr) { + if (protector->parsed_frame_size > kZeroCopyFrameLengthFieldSize) { + *min_progress_size = + protector->parsed_frame_size - protector->protected_sb.length; + } else { + *min_progress_size = 1; + } + } return TSI_OK; } diff --git a/src/core/tsi/fake_transport_security.cc b/src/core/tsi/fake_transport_security.cc index ce09cdb7494..18a42dd7301 100644 --- a/src/core/tsi/fake_transport_security.cc +++ b/src/core/tsi/fake_transport_security.cc @@ -432,7 +432,7 @@ static tsi_result fake_zero_copy_grpc_protector_protect( static tsi_result fake_zero_copy_grpc_protector_unprotect( tsi_zero_copy_grpc_protector* self, grpc_slice_buffer* protected_slices, - grpc_slice_buffer* unprotected_slices) { + grpc_slice_buffer* unprotected_slices, int* min_progress_size) { if (self == nullptr || unprotected_slices == nullptr || protected_slices == nullptr) { return TSI_INVALID_ARGUMENT; @@ -462,6 +462,13 @@ static tsi_result fake_zero_copy_grpc_protector_unprotect( impl->parsed_frame_size = 0; grpc_slice_buffer_reset_and_unref_internal(&impl->header_sb); } + if (min_progress_size != nullptr) { + if (impl->parsed_frame_size > TSI_FAKE_FRAME_HEADER_SIZE) { + *min_progress_size = impl->parsed_frame_size - impl->protected_sb.length; + } else { + *min_progress_size = 1; + } + } return TSI_OK; } diff --git a/src/core/tsi/transport_security_grpc.cc b/src/core/tsi/transport_security_grpc.cc index cec872690de..f43994d1b5b 100644 --- a/src/core/tsi/transport_security_grpc.cc +++ b/src/core/tsi/transport_security_grpc.cc @@ -51,13 +51,14 @@ tsi_result tsi_zero_copy_grpc_protector_protect( tsi_result tsi_zero_copy_grpc_protector_unprotect( tsi_zero_copy_grpc_protector* self, grpc_slice_buffer* protected_slices, - grpc_slice_buffer* unprotected_slices) { + grpc_slice_buffer* unprotected_slices, int* min_progress_size) { if (self == nullptr || self->vtable == nullptr || protected_slices == nullptr || unprotected_slices == nullptr) { return TSI_INVALID_ARGUMENT; } if (self->vtable->unprotect == nullptr) return TSI_UNIMPLEMENTED; - return self->vtable->unprotect(self, protected_slices, unprotected_slices); + return self->vtable->unprotect(self, protected_slices, unprotected_slices, + min_progress_size); } void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector* self) { diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h index ba9428b5885..0330d8fbbab 100644 --- a/src/core/tsi/transport_security_grpc.h +++ b/src/core/tsi/transport_security_grpc.h @@ -47,12 +47,14 @@ tsi_result tsi_zero_copy_grpc_protector_protect( /* Outputs unprotected bytes. - protected_slices is the bytes of protected frames. - unprotected_slices is the unprotected output data. + - if min_progress_size is not null, it returns the size of the last + incomplete frame which could not be fully unprotected. - This method returns TSI_OK in case of success. Success includes cases where there is not enough data to output in which case unprotected_slices has 0 bytes. */ tsi_result tsi_zero_copy_grpc_protector_unprotect( tsi_zero_copy_grpc_protector* self, grpc_slice_buffer* protected_slices, - grpc_slice_buffer* unprotected_slices); + grpc_slice_buffer* unprotected_slices, int* min_progress_size); /* Destroys the tsi_zero_copy_grpc_protector object. */ void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector* self); @@ -68,7 +70,8 @@ struct tsi_zero_copy_grpc_protector_vtable { grpc_slice_buffer* protected_slices); tsi_result (*unprotect)(tsi_zero_copy_grpc_protector* self, grpc_slice_buffer* protected_slices, - grpc_slice_buffer* unprotected_slices); + grpc_slice_buffer* unprotected_slices, + int* min_progress_size); void (*destroy)(tsi_zero_copy_grpc_protector* self); tsi_result (*max_frame_size)(tsi_zero_copy_grpc_protector* self, size_t* max_frame_size); diff --git a/test/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector_test.cc b/test/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector_test.cc index 51a0505284c..72ea12d7bd2 100644 --- a/test/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector_test.cc +++ b/test/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector_test.cc @@ -25,6 +25,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/alts/crypt/gsec.h" +#include "src/core/tsi/alts/zero_copy_frame_protector/alts_iovec_record_protocol.h" #include "src/core/tsi/transport_security_grpc.h" #include "test/core/tsi/alts/crypt/gsec_test_util.h" #include "test/core/util/test_config.h" @@ -176,6 +177,7 @@ static void seal_unseal_small_buffer(tsi_zero_copy_grpc_protector* sender, tsi_zero_copy_grpc_protector* receiver) { grpc_core::ExecCtx exec_ctx; for (size_t i = 0; i < kSealRepeatTimes; i++) { + int min_progress_size; alts_zero_copy_grpc_protector_test_var* var = alts_zero_copy_grpc_protector_test_var_create(); /* Creates a random small slice buffer and calls protect(). */ @@ -193,13 +195,21 @@ static void seal_unseal_small_buffer(tsi_zero_copy_grpc_protector* sender, &var->staging_sb); /* Unprotects one by one. */ GPR_ASSERT(tsi_zero_copy_grpc_protector_unprotect( - receiver, &var->staging_sb, &var->unprotected_sb) == TSI_OK); + receiver, &var->staging_sb, &var->unprotected_sb, + &min_progress_size) == TSI_OK); + if (staging_sb_size >= kZeroCopyFrameLengthFieldSize) { + GPR_ASSERT(min_progress_size == + static_cast(var->protected_sb.length)); + } else { + GPR_ASSERT(min_progress_size == 1); + } GPR_ASSERT(var->unprotected_sb.length == 0); GPR_ASSERT(tsi_zero_copy_grpc_protector_unprotect( - receiver, &var->protected_sb, &var->unprotected_sb) == - TSI_OK); + receiver, &var->protected_sb, &var->unprotected_sb, + &min_progress_size) == TSI_OK); GPR_ASSERT( are_slice_buffers_equal(&var->unprotected_sb, &var->duplicate_sb)); + GPR_ASSERT(min_progress_size == 1); alts_zero_copy_grpc_protector_test_var_destroy(var); } grpc_core::ExecCtx::Get()->Flush(); @@ -226,12 +236,12 @@ static void seal_unseal_large_buffer(tsi_zero_copy_grpc_protector* sender, grpc_slice_buffer_move_first(&var->protected_sb, channel_size, &var->staging_sb); GPR_ASSERT(tsi_zero_copy_grpc_protector_unprotect( - receiver, &var->staging_sb, &var->unprotected_sb) == - TSI_OK); + receiver, &var->staging_sb, &var->unprotected_sb, + nullptr) == TSI_OK); } GPR_ASSERT(tsi_zero_copy_grpc_protector_unprotect( - receiver, &var->protected_sb, &var->unprotected_sb) == - TSI_OK); + receiver, &var->protected_sb, &var->unprotected_sb, + nullptr) == TSI_OK); GPR_ASSERT( are_slice_buffers_equal(&var->unprotected_sb, &var->duplicate_sb)); alts_zero_copy_grpc_protector_test_var_destroy(var);