From 60b376849a2c7b9faa8ae5a6ec2e1d6d1aacff43 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 10 Nov 2017 18:06:09 -0800 Subject: [PATCH 1/9] eagerly free slice buffer after write --- src/core/lib/iomgr/tcp_posix.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index fbbb1762b7f..0d78cf41649 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -605,9 +605,10 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->outgoing_buffer); return true; } - }; + } } static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, From 9b3b8e32ec5d6f69aa8c4faeb8f58e5450580289 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 14 Nov 2017 11:31:19 -0800 Subject: [PATCH 2/9] clang fmt --- src/core/lib/iomgr/tcp_posix.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 6a0e1fc5feb..6b61ed9ec30 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -606,7 +606,8 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->outgoing_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } } From 751c3245f8807c8f2468b4d8ca4d7da8d7607221 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 14 Nov 2017 12:31:24 -0800 Subject: [PATCH 3/9] Add partial unref --- include/grpc/impl/codegen/slice.h | 4 ++++ src/core/lib/iomgr/tcp_posix.cc | 2 ++ src/core/lib/slice/slice_buffer.cc | 17 ++++++++++++++++- src/core/lib/slice/slice_internal.h | 3 +++ 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/include/grpc/impl/codegen/slice.h b/include/grpc/impl/codegen/slice.h index 11997fcb563..369cee91a8e 100644 --- a/include/grpc/impl/codegen/slice.h +++ b/include/grpc/impl/codegen/slice.h @@ -108,6 +108,10 @@ typedef struct { /** the number of slices allocated in the array. External users (i.e any code * outside grpc core) MUST NOT use this field */ size_t capacity; + /** the index of the first slice who's memory is still owned by this buffer. + * This is only to be used when partially unreffing this slice buffer in + * grpc_slice_buffer_partial_reset_and_unref_internal. */ + size_t idx_of_first_valid_slice; /** the combined length of all slices in the array */ size_t length; /** inlined elements to avoid allocations */ diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 6b61ed9ec30..016ca87219b 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -576,6 +576,8 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, if (errno == EAGAIN) { tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; + grpc_slice_buffer_partial_reset_and_unref_internal( + exec_ctx, tcp->outgoing_buffer, unwind_slice_idx); return false; } else if (errno == EPIPE) { *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index 5db54dad91e..cd5040fbcb5 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -62,6 +62,7 @@ void grpc_slice_buffer_init(grpc_slice_buffer* sb) { sb->count = 0; sb->length = 0; sb->capacity = GRPC_SLICE_BUFFER_INLINE_ELEMENTS; + sb->idx_of_first_valid_slice = 0; sb->base_slices = sb->slices = sb->inlined; } @@ -166,12 +167,26 @@ void grpc_slice_buffer_pop(grpc_slice_buffer* sb) { void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, grpc_slice_buffer* sb) { size_t i; - for (i = 0; i < sb->count; i++) { + for (i = sb->idx_of_first_valid_slice; i < sb->count; i++) { grpc_slice_unref_internal(exec_ctx, sb->slices[i]); } sb->count = 0; sb->length = 0; + sb->idx_of_first_valid_slice = 0; +} + +void grpc_slice_buffer_partial_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, + grpc_slice_buffer* sb, + size_t idx) { + GPR_ASSERT(idx <= sb->count); + + size_t i; + for (i = sb->idx_of_first_valid_slice; i < idx; i++) { + grpc_slice_unref_internal(exec_ctx, sb->slices[i]); + } + + sb->idx_of_first_valid_slice = idx; } void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer* sb) { diff --git a/src/core/lib/slice/slice_internal.h b/src/core/lib/slice/slice_internal.h index 2439fc08267..2c616950d7f 100644 --- a/src/core/lib/slice/slice_internal.h +++ b/src/core/lib/slice/slice_internal.h @@ -32,6 +32,9 @@ grpc_slice grpc_slice_ref_internal(grpc_slice slice); void grpc_slice_unref_internal(grpc_exec_ctx* exec_ctx, grpc_slice slice); void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, grpc_slice_buffer* sb); +void grpc_slice_buffer_partial_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, + grpc_slice_buffer* sb, + size_t idx); void grpc_slice_buffer_destroy_internal(grpc_exec_ctx* exec_ctx, grpc_slice_buffer* sb); From 589c940195f82738a95ff1f78a4f4d6a43566a41 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 14 Nov 2017 18:22:54 -0800 Subject: [PATCH 4/9] add test --- src/core/lib/iomgr/tcp_posix.cc | 4 +-- src/core/lib/slice/slice_buffer.cc | 9 ++++--- src/core/lib/slice/slice_internal.h | 6 ++--- test/core/slice/slice_buffer_test.cc | 39 ++++++++++++++++++++++++++++ 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 016ca87219b..0864cd594d2 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -576,8 +576,8 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, if (errno == EAGAIN) { tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; - grpc_slice_buffer_partial_reset_and_unref_internal( - exec_ctx, tcp->outgoing_buffer, unwind_slice_idx); + grpc_slice_buffer_partial_unref_internal(exec_ctx, tcp->outgoing_buffer, + unwind_slice_idx); return false; } else if (errno == EPIPE) { *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index cd5040fbcb5..8fdd8cf2257 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -176,10 +176,11 @@ void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, sb->idx_of_first_valid_slice = 0; } -void grpc_slice_buffer_partial_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, - grpc_slice_buffer* sb, - size_t idx) { - GPR_ASSERT(idx <= sb->count); +void grpc_slice_buffer_partial_unref_internal(grpc_exec_ctx* exec_ctx, + grpc_slice_buffer* sb, + size_t idx) { + GPR_ASSERT(idx < sb->count); // if idx == count, then partial is not needed + GPR_ASSERT(sb->idx_of_first_valid_slice <= idx); size_t i; for (i = sb->idx_of_first_valid_slice; i < idx; i++) { diff --git a/src/core/lib/slice/slice_internal.h b/src/core/lib/slice/slice_internal.h index 2c616950d7f..10527dcdeb5 100644 --- a/src/core/lib/slice/slice_internal.h +++ b/src/core/lib/slice/slice_internal.h @@ -32,9 +32,9 @@ grpc_slice grpc_slice_ref_internal(grpc_slice slice); void grpc_slice_unref_internal(grpc_exec_ctx* exec_ctx, grpc_slice slice); void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, grpc_slice_buffer* sb); -void grpc_slice_buffer_partial_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, - grpc_slice_buffer* sb, - size_t idx); +void grpc_slice_buffer_partial_unref_internal(grpc_exec_ctx* exec_ctx, + grpc_slice_buffer* sb, + size_t idx); void grpc_slice_buffer_destroy_internal(grpc_exec_ctx* exec_ctx, grpc_slice_buffer* sb); diff --git a/test/core/slice/slice_buffer_test.cc b/test/core/slice/slice_buffer_test.cc index 338e8079dc8..696251536f5 100644 --- a/test/core/slice/slice_buffer_test.cc +++ b/test/core/slice/slice_buffer_test.cc @@ -18,6 +18,7 @@ #include #include +#include "src/core/lib/slice/slice_internal.h" #include "test/core/util/test_config.h" void test_slice_buffer_add() { @@ -104,11 +105,49 @@ void test_slice_buffer_move_first() { GPR_ASSERT(dst.length == dst_len); } +static void populate_slice_buffer(grpc_slice_buffer* sb) { + grpc_slice slices[4]; + int idx = 0; + + slices[0] = grpc_slice_from_copied_string("aaa"); + slices[1] = grpc_slice_from_copied_string("bbbb"); + slices[2] = grpc_slice_from_copied_string("ccc"); + slices[3] = grpc_slice_from_copied_string("ddddd"); + + for (idx = 0; idx < 4; idx++) { + grpc_slice_ref(slices[idx]); + grpc_slice_buffer_add_indexed(sb, slices[idx]); + } +} + +void test_slice_buffer_unref() { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_buffer sb; + + grpc_slice_buffer_init(&sb); + + // regular init and unref + populate_slice_buffer(&sb); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &sb); + + // inits, then unrefs partial, then unrefs the rest + populate_slice_buffer(&sb); + grpc_slice_buffer_partial_unref_internal(&exec_ctx, &sb, 1); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &sb); + + // two partial unrefs + populate_slice_buffer(&sb); + grpc_slice_buffer_partial_unref_internal(&exec_ctx, &sb, 1); + grpc_slice_buffer_partial_unref_internal(&exec_ctx, &sb, 3); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &sb); +} + int main(int argc, char** argv) { grpc_test_init(argc, argv); test_slice_buffer_add(); test_slice_buffer_move_first(); + test_slice_buffer_unref(); return 0; } From 311fa5f8188c4a1116a38991a377030708833336 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Thu, 16 Nov 2017 14:25:50 -0800 Subject: [PATCH 5/9] Reviewer feedback: no API change --- include/grpc/impl/codegen/slice.h | 4 --- src/core/lib/iomgr/tcp_posix.cc | 39 +++++++++++++++++------------- src/core/lib/slice/slice_buffer.cc | 18 +------------- 3 files changed, 23 insertions(+), 38 deletions(-) diff --git a/include/grpc/impl/codegen/slice.h b/include/grpc/impl/codegen/slice.h index 369cee91a8e..11997fcb563 100644 --- a/include/grpc/impl/codegen/slice.h +++ b/include/grpc/impl/codegen/slice.h @@ -108,10 +108,6 @@ typedef struct { /** the number of slices allocated in the array. External users (i.e any code * outside grpc core) MUST NOT use this field */ size_t capacity; - /** the index of the first slice who's memory is still owned by this buffer. - * This is only to be used when partially unreffing this slice buffer in - * grpc_slice_buffer_partial_reset_and_unref_internal. */ - size_t idx_of_first_valid_slice; /** the combined length of all slices in the array */ size_t length; /** inlined elements to avoid allocations */ diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 0864cd594d2..3523c689730 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -81,9 +81,7 @@ typedef struct { grpc_slice_buffer* incoming_buffer; grpc_slice_buffer* outgoing_buffer; - /** slice within outgoing_buffer to write next */ - size_t outgoing_slice_idx; - /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ + /** byte within outgoing_buffer->slices[0] to write next */ size_t outgoing_byte_idx; grpc_closure* read_cb; @@ -532,23 +530,26 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, size_t unwind_slice_idx; size_t unwind_byte_idx; + // We always start at zero, because we eagerly unref and trim the slice + // buffer as we write + size_t outgoing_slice_idx = 0; + for (;;) { sending_length = 0; - unwind_slice_idx = tcp->outgoing_slice_idx; + unwind_slice_idx = outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; - for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && + for (iov_size = 0; outgoing_slice_idx != tcp->outgoing_buffer->count && iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = GRPC_SLICE_START_PTR( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + + tcp->outgoing_buffer->slices[outgoing_slice_idx]) + tcp->outgoing_byte_idx; iov[iov_size].iov_len = - GRPC_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - + GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]) - tcp->outgoing_byte_idx; sending_length += iov[iov_size].iov_len; - tcp->outgoing_slice_idx++; + outgoing_slice_idx++; tcp->outgoing_byte_idx = 0; } GPR_ASSERT(iov_size > 0); @@ -574,10 +575,15 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, if (sent_length < 0) { if (errno == EAGAIN) { - tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; - grpc_slice_buffer_partial_unref_internal(exec_ctx, tcp->outgoing_buffer, - unwind_slice_idx); + // unref all and forget about all slices that have been written to this + // point + for (size_t idx = 0; idx < unwind_slice_idx; ++idx) { + grpc_slice_unref_internal(exec_ctx, + tcp->outgoing_buffer->slices[idx]); + tcp->outgoing_buffer->count--; + } + tcp->outgoing_buffer->slices += unwind_slice_idx; return false; } else if (errno == EPIPE) { *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), @@ -595,9 +601,9 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, while (trailing > 0) { size_t slice_length; - tcp->outgoing_slice_idx--; - slice_length = GRPC_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); + outgoing_slice_idx--; + slice_length = + GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]); if (slice_length > trailing) { tcp->outgoing_byte_idx = slice_length - trailing; break; @@ -606,7 +612,7 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, } } - if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { + if (outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->outgoing_buffer); @@ -676,7 +682,6 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, return; } tcp->outgoing_buffer = buf; - tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; if (!tcp_flush(exec_ctx, tcp, &error)) { diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index 8fdd8cf2257..5db54dad91e 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -62,7 +62,6 @@ void grpc_slice_buffer_init(grpc_slice_buffer* sb) { sb->count = 0; sb->length = 0; sb->capacity = GRPC_SLICE_BUFFER_INLINE_ELEMENTS; - sb->idx_of_first_valid_slice = 0; sb->base_slices = sb->slices = sb->inlined; } @@ -167,27 +166,12 @@ void grpc_slice_buffer_pop(grpc_slice_buffer* sb) { void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx* exec_ctx, grpc_slice_buffer* sb) { size_t i; - for (i = sb->idx_of_first_valid_slice; i < sb->count; i++) { + for (i = 0; i < sb->count; i++) { grpc_slice_unref_internal(exec_ctx, sb->slices[i]); } sb->count = 0; sb->length = 0; - sb->idx_of_first_valid_slice = 0; -} - -void grpc_slice_buffer_partial_unref_internal(grpc_exec_ctx* exec_ctx, - grpc_slice_buffer* sb, - size_t idx) { - GPR_ASSERT(idx < sb->count); // if idx == count, then partial is not needed - GPR_ASSERT(sb->idx_of_first_valid_slice <= idx); - - size_t i; - for (i = sb->idx_of_first_valid_slice; i < idx; i++) { - grpc_slice_unref_internal(exec_ctx, sb->slices[i]); - } - - sb->idx_of_first_valid_slice = idx; } void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer* sb) { From 588d6d9294fb328d22d2edbd39693408cf6b2eff Mon Sep 17 00:00:00 2001 From: ncteisen Date: Thu, 16 Nov 2017 14:28:30 -0800 Subject: [PATCH 6/9] unref before write failures for completness --- src/core/lib/iomgr/tcp_posix.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 3523c689730..026c1c09425 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -589,9 +589,13 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } else { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } } From 34fec74986ec693643b083920496d7c91624a5c8 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Thu, 16 Nov 2017 14:33:00 -0800 Subject: [PATCH 7/9] No more API change, no test change --- test/core/slice/slice_buffer_test.cc | 39 ---------------------------- 1 file changed, 39 deletions(-) diff --git a/test/core/slice/slice_buffer_test.cc b/test/core/slice/slice_buffer_test.cc index 696251536f5..338e8079dc8 100644 --- a/test/core/slice/slice_buffer_test.cc +++ b/test/core/slice/slice_buffer_test.cc @@ -18,7 +18,6 @@ #include #include -#include "src/core/lib/slice/slice_internal.h" #include "test/core/util/test_config.h" void test_slice_buffer_add() { @@ -105,49 +104,11 @@ void test_slice_buffer_move_first() { GPR_ASSERT(dst.length == dst_len); } -static void populate_slice_buffer(grpc_slice_buffer* sb) { - grpc_slice slices[4]; - int idx = 0; - - slices[0] = grpc_slice_from_copied_string("aaa"); - slices[1] = grpc_slice_from_copied_string("bbbb"); - slices[2] = grpc_slice_from_copied_string("ccc"); - slices[3] = grpc_slice_from_copied_string("ddddd"); - - for (idx = 0; idx < 4; idx++) { - grpc_slice_ref(slices[idx]); - grpc_slice_buffer_add_indexed(sb, slices[idx]); - } -} - -void test_slice_buffer_unref() { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_slice_buffer sb; - - grpc_slice_buffer_init(&sb); - - // regular init and unref - populate_slice_buffer(&sb); - grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &sb); - - // inits, then unrefs partial, then unrefs the rest - populate_slice_buffer(&sb); - grpc_slice_buffer_partial_unref_internal(&exec_ctx, &sb, 1); - grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &sb); - - // two partial unrefs - populate_slice_buffer(&sb); - grpc_slice_buffer_partial_unref_internal(&exec_ctx, &sb, 1); - grpc_slice_buffer_partial_unref_internal(&exec_ctx, &sb, 3); - grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &sb); -} - int main(int argc, char** argv) { grpc_test_init(argc, argv); test_slice_buffer_add(); test_slice_buffer_move_first(); - test_slice_buffer_unref(); return 0; } From 692915ee0784909bceabcd67a7350b9f2fc2f501 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 17 Nov 2017 11:45:49 -0800 Subject: [PATCH 8/9] Swtich to using grpc_bb_take_first --- src/core/lib/iomgr/tcp_posix.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 026c1c09425..212ecd7fc65 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -579,11 +579,9 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, // unref all and forget about all slices that have been written to this // point for (size_t idx = 0; idx < unwind_slice_idx; ++idx) { - grpc_slice_unref_internal(exec_ctx, - tcp->outgoing_buffer->slices[idx]); - tcp->outgoing_buffer->count--; + grpc_slice_unref_internal( + exec_ctx, grpc_slice_buffer_take_first(tcp->outgoing_buffer)); } - tcp->outgoing_buffer->slices += unwind_slice_idx; return false; } else if (errno == EPIPE) { *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), From abbdbf93742ed46c7f7481816f0915c069357040 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 17 Nov 2017 19:02:04 -0800 Subject: [PATCH 9/9] Attempt to fix TSAN --- test/core/end2end/fixtures/http_proxy_fixture.cc | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index 0bdd15c57d5..ac0c953a79f 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -88,9 +88,11 @@ typedef struct proxy_connection { grpc_slice_buffer client_read_buffer; grpc_slice_buffer client_deferred_write_buffer; + bool client_is_writing; grpc_slice_buffer client_write_buffer; grpc_slice_buffer server_read_buffer; grpc_slice_buffer server_deferred_write_buffer; + bool server_is_writing; grpc_slice_buffer server_write_buffer; grpc_http_parser http_parser; @@ -148,6 +150,7 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; + conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, "HTTP proxy client write", error); @@ -160,6 +163,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, if (conn->client_deferred_write_buffer.length > 0) { grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer, &conn->client_write_buffer); + conn->client_is_writing = true; grpc_endpoint_write(exec_ctx, conn->client_endpoint, &conn->client_write_buffer, &conn->on_client_write_done); @@ -173,6 +177,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; + conn->server_is_writing = false; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, false /* is_client */, "HTTP proxy server write", error); @@ -185,6 +190,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, if (conn->server_deferred_write_buffer.length > 0) { grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer, &conn->server_write_buffer); + conn->server_is_writing = true; grpc_endpoint_write(exec_ctx, conn->server_endpoint, &conn->server_write_buffer, &conn->on_server_write_done); @@ -210,13 +216,14 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, // the current write is finished. // // Otherwise, move the read data into the write buffer and write it. - if (conn->server_write_buffer.length > 0) { + if (conn->server_is_writing) { grpc_slice_buffer_move_into(&conn->client_read_buffer, &conn->server_deferred_write_buffer); } else { grpc_slice_buffer_move_into(&conn->client_read_buffer, &conn->server_write_buffer); proxy_connection_ref(conn, "client_read"); + conn->server_is_writing = true; grpc_endpoint_write(exec_ctx, conn->server_endpoint, &conn->server_write_buffer, &conn->on_server_write_done); @@ -242,13 +249,14 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, // the current write is finished. // // Otherwise, move the read data into the write buffer and write it. - if (conn->client_write_buffer.length > 0) { + if (conn->client_is_writing) { grpc_slice_buffer_move_into(&conn->server_read_buffer, &conn->client_deferred_write_buffer); } else { grpc_slice_buffer_move_into(&conn->server_read_buffer, &conn->client_write_buffer); proxy_connection_ref(conn, "server_read"); + conn->client_is_writing = true; grpc_endpoint_write(exec_ctx, conn->client_endpoint, &conn->client_write_buffer, &conn->on_client_write_done); @@ -262,6 +270,7 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; + conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { proxy_connection_failed(exec_ctx, conn, true /* is_client */, "HTTP proxy write response", error); @@ -302,6 +311,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice slice = grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n"); grpc_slice_buffer_add(&conn->client_write_buffer, slice); + conn->client_is_writing = true; grpc_endpoint_write(exec_ctx, conn->client_endpoint, &conn->client_write_buffer, &conn->on_write_response_done); @@ -450,9 +460,11 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_combiner_scheduler(conn->proxy->combiner)); grpc_slice_buffer_init(&conn->client_read_buffer); grpc_slice_buffer_init(&conn->client_deferred_write_buffer); + conn->client_is_writing = false; grpc_slice_buffer_init(&conn->client_write_buffer); grpc_slice_buffer_init(&conn->server_read_buffer); grpc_slice_buffer_init(&conn->server_deferred_write_buffer); + conn->server_is_writing = false; grpc_slice_buffer_init(&conn->server_write_buffer); grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request);