Provide means to control TCP frame sizes in response to high memory pressure (#29793)

* Provide means to control TCP frame sizes in response to high memory pressure

* static_cast

* fix sanity checks

* removing endpoint_write frame clipping. this could be added in a separate PR

* fix sanity checks

* remove unused parameter

* addressing review comments

* rename functions

* add a TODO

* fixing naming issues

* add changes to test

* Save ENOBUFS errno correctly in tcp_posix for subsequent handling

* updaing min_progress_size computation

* Revert "Save ENOBUFS errno correctly in tcp_posix for subsequent handling"

This reverts commit 5e1d10ac9b.

* fine tuning min progress size estimation and updating unit tests to verify returned min progress size
pull/29968/head^2
Vignesh Babu 3 years ago committed by GitHub
parent db684ad0dd
commit 475623c489
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      src/core/lib/security/transport/secure_endpoint.cc
  2. 10
      src/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector.cc
  3. 9
      src/core/tsi/fake_transport_security.cc
  4. 5
      src/core/tsi/transport_security_grpc.cc
  5. 7
      src/core/tsi/transport_security_grpc.h
  6. 24
      test/core/tsi/alts/zero_copy_frame_protector/alts_zero_copy_grpc_protector_test.cc

@ -42,6 +42,7 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/tsi/transport_security_grpc.h"
#include "src/core/tsi/transport_security_interface.h"
#define STAGING_BUFFER_SIZE 8192
@ -84,6 +85,7 @@ struct secure_endpoint {
memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
}
has_posted_reclaimer.store(false, std::memory_order_relaxed);
min_progress_size = 1;
gpr_ref_init(&ref, 1);
}
@ -121,6 +123,7 @@ struct secure_endpoint {
grpc_core::MemoryOwner memory_owner;
grpc_core::MemoryAllocator::Reservation self_reservation;
std::atomic<bool> has_posted_reclaimer;
int min_progress_size;
gpr_refcount ref;
};
@ -249,8 +252,19 @@ static void on_read(void* user_data, grpc_error_handle error) {
if (ep->zero_copy_protector != nullptr) {
// Use zero-copy grpc protector to unprotect.
int min_progress_size = 1;
// Get the size of the last frame which is not yet fully decrypted.
// This estimated frame size is stored in ep->min_progress_size which is
// passed to the TCP layer to indicate the minimum number of
// bytes that need to be read to make meaningful progress. This would
// avoid reading of small slices from the network.
// TODO(vigneshbabu): Set min_progress_size in the regular (non-zero-copy)
// frame protector code path as well.
result = tsi_zero_copy_grpc_protector_unprotect(
ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer,
&min_progress_size);
min_progress_size = std::max(1, min_progress_size);
ep->min_progress_size = result != TSI_OK ? 1 : min_progress_size;
} else {
// Use frame protector to unprotect.
/* TODO(yangg) check error, maybe bail out early */
@ -336,7 +350,7 @@ static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
}
grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent,
/*min_progress_size=*/1);
/*min_progress_size=*/ep->min_progress_size);
}
static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,

@ -174,7 +174,7 @@ static tsi_result alts_zero_copy_grpc_protector_protect(
static tsi_result alts_zero_copy_grpc_protector_unprotect(
tsi_zero_copy_grpc_protector* self, grpc_slice_buffer* protected_slices,
grpc_slice_buffer* unprotected_slices) {
grpc_slice_buffer* unprotected_slices, int* min_progress_size) {
if (self == nullptr || unprotected_slices == nullptr ||
protected_slices == nullptr) {
gpr_log(GPR_ERROR,
@ -215,6 +215,14 @@ static tsi_result alts_zero_copy_grpc_protector_unprotect(
return status;
}
}
if (min_progress_size != nullptr) {
if (protector->parsed_frame_size > kZeroCopyFrameLengthFieldSize) {
*min_progress_size =
protector->parsed_frame_size - protector->protected_sb.length;
} else {
*min_progress_size = 1;
}
}
return TSI_OK;
}

@ -432,7 +432,7 @@ static tsi_result fake_zero_copy_grpc_protector_protect(
static tsi_result fake_zero_copy_grpc_protector_unprotect(
tsi_zero_copy_grpc_protector* self, grpc_slice_buffer* protected_slices,
grpc_slice_buffer* unprotected_slices) {
grpc_slice_buffer* unprotected_slices, int* min_progress_size) {
if (self == nullptr || unprotected_slices == nullptr ||
protected_slices == nullptr) {
return TSI_INVALID_ARGUMENT;
@ -462,6 +462,13 @@ static tsi_result fake_zero_copy_grpc_protector_unprotect(
impl->parsed_frame_size = 0;
grpc_slice_buffer_reset_and_unref_internal(&impl->header_sb);
}
if (min_progress_size != nullptr) {
if (impl->parsed_frame_size > TSI_FAKE_FRAME_HEADER_SIZE) {
*min_progress_size = impl->parsed_frame_size - impl->protected_sb.length;
} else {
*min_progress_size = 1;
}
}
return TSI_OK;
}

@ -51,13 +51,14 @@ tsi_result tsi_zero_copy_grpc_protector_protect(
tsi_result tsi_zero_copy_grpc_protector_unprotect(
tsi_zero_copy_grpc_protector* self, grpc_slice_buffer* protected_slices,
grpc_slice_buffer* unprotected_slices) {
grpc_slice_buffer* unprotected_slices, int* min_progress_size) {
if (self == nullptr || self->vtable == nullptr ||
protected_slices == nullptr || unprotected_slices == nullptr) {
return TSI_INVALID_ARGUMENT;
}
if (self->vtable->unprotect == nullptr) return TSI_UNIMPLEMENTED;
return self->vtable->unprotect(self, protected_slices, unprotected_slices);
return self->vtable->unprotect(self, protected_slices, unprotected_slices,
min_progress_size);
}
void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector* self) {

@ -47,12 +47,14 @@ tsi_result tsi_zero_copy_grpc_protector_protect(
/* Outputs unprotected bytes.
- protected_slices is the bytes of protected frames.
- unprotected_slices is the unprotected output data.
- if min_progress_size is not null, it returns the size of the last
incomplete frame which could not be fully unprotected.
- This method returns TSI_OK in case of success. Success includes cases where
there is not enough data to output in which case unprotected_slices has 0
bytes. */
tsi_result tsi_zero_copy_grpc_protector_unprotect(
tsi_zero_copy_grpc_protector* self, grpc_slice_buffer* protected_slices,
grpc_slice_buffer* unprotected_slices);
grpc_slice_buffer* unprotected_slices, int* min_progress_size);
/* Destroys the tsi_zero_copy_grpc_protector object. */
void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector* self);
@ -68,7 +70,8 @@ struct tsi_zero_copy_grpc_protector_vtable {
grpc_slice_buffer* protected_slices);
tsi_result (*unprotect)(tsi_zero_copy_grpc_protector* self,
grpc_slice_buffer* protected_slices,
grpc_slice_buffer* unprotected_slices);
grpc_slice_buffer* unprotected_slices,
int* min_progress_size);
void (*destroy)(tsi_zero_copy_grpc_protector* self);
tsi_result (*max_frame_size)(tsi_zero_copy_grpc_protector* self,
size_t* max_frame_size);

@ -25,6 +25,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/alts/crypt/gsec.h"
#include "src/core/tsi/alts/zero_copy_frame_protector/alts_iovec_record_protocol.h"
#include "src/core/tsi/transport_security_grpc.h"
#include "test/core/tsi/alts/crypt/gsec_test_util.h"
#include "test/core/util/test_config.h"
@ -176,6 +177,7 @@ static void seal_unseal_small_buffer(tsi_zero_copy_grpc_protector* sender,
tsi_zero_copy_grpc_protector* receiver) {
grpc_core::ExecCtx exec_ctx;
for (size_t i = 0; i < kSealRepeatTimes; i++) {
int min_progress_size;
alts_zero_copy_grpc_protector_test_var* var =
alts_zero_copy_grpc_protector_test_var_create();
/* Creates a random small slice buffer and calls protect(). */
@ -193,13 +195,21 @@ static void seal_unseal_small_buffer(tsi_zero_copy_grpc_protector* sender,
&var->staging_sb);
/* Unprotects one by one. */
GPR_ASSERT(tsi_zero_copy_grpc_protector_unprotect(
receiver, &var->staging_sb, &var->unprotected_sb) == TSI_OK);
receiver, &var->staging_sb, &var->unprotected_sb,
&min_progress_size) == TSI_OK);
if (staging_sb_size >= kZeroCopyFrameLengthFieldSize) {
GPR_ASSERT(min_progress_size ==
static_cast<int>(var->protected_sb.length));
} else {
GPR_ASSERT(min_progress_size == 1);
}
GPR_ASSERT(var->unprotected_sb.length == 0);
GPR_ASSERT(tsi_zero_copy_grpc_protector_unprotect(
receiver, &var->protected_sb, &var->unprotected_sb) ==
TSI_OK);
receiver, &var->protected_sb, &var->unprotected_sb,
&min_progress_size) == TSI_OK);
GPR_ASSERT(
are_slice_buffers_equal(&var->unprotected_sb, &var->duplicate_sb));
GPR_ASSERT(min_progress_size == 1);
alts_zero_copy_grpc_protector_test_var_destroy(var);
}
grpc_core::ExecCtx::Get()->Flush();
@ -226,12 +236,12 @@ static void seal_unseal_large_buffer(tsi_zero_copy_grpc_protector* sender,
grpc_slice_buffer_move_first(&var->protected_sb, channel_size,
&var->staging_sb);
GPR_ASSERT(tsi_zero_copy_grpc_protector_unprotect(
receiver, &var->staging_sb, &var->unprotected_sb) ==
TSI_OK);
receiver, &var->staging_sb, &var->unprotected_sb,
nullptr) == TSI_OK);
}
GPR_ASSERT(tsi_zero_copy_grpc_protector_unprotect(
receiver, &var->protected_sb, &var->unprotected_sb) ==
TSI_OK);
receiver, &var->protected_sb, &var->unprotected_sb,
nullptr) == TSI_OK);
GPR_ASSERT(
are_slice_buffers_equal(&var->unprotected_sb, &var->duplicate_sb));
alts_zero_copy_grpc_protector_test_var_destroy(var);

Loading…
Cancel
Save