From a0cf12d97634fa56dc0b5fcbce58ef11aa0b937a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 19 Apr 2017 13:15:51 -0700 Subject: [PATCH] Improve simulation to not infinitely buffer writes --- test/core/util/trickle_endpoint.c | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 804a9287ee8..02ba257abe8 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -44,6 +44,8 @@ #include #include "src/core/lib/slice/slice_internal.h" +#define WRITE_BUFFER_SIZE (2 * 1024 * 1024) + typedef struct { grpc_endpoint base; double bytes_per_second; @@ -55,6 +57,7 @@ typedef struct { grpc_slice_buffer writing_buffer; grpc_error *error; bool writing; + grpc_closure *write_cb; } trickle_endpoint; static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -63,6 +66,15 @@ static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb); } +static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, + trickle_endpoint *te) { + if (te->write_cb != NULL && (te->error != GRPC_ERROR_NONE || + te->write_buffer.length <= WRITE_BUFFER_SIZE)) { + grpc_closure_sched(exec_ctx, te->write_cb, GRPC_ERROR_REF(te->error)); + te->write_cb = NULL; + } +} + static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; @@ -70,11 +82,13 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_ref_internal(slices->slices[i]); } gpr_mu_lock(&te->mu); + GPR_ASSERT(te->write_cb == NULL); if (te->write_buffer.length == 0) { te->last_write = gpr_now(GPR_CLOCK_MONOTONIC); } grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count); - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error)); + te->write_cb = cb; + maybe_call_write_cb_locked(exec_ctx, te); gpr_mu_unlock(&te->mu); } @@ -102,6 +116,7 @@ static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (te->error == GRPC_ERROR_NONE) { te->error = GRPC_ERROR_REF(why); } + maybe_call_write_cb_locked(exec_ctx, te); gpr_mu_unlock(&te->mu); grpc_endpoint_shutdown(exec_ctx, te->wrapped, why); } @@ -157,6 +172,7 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, te->base.vtable = &vtable; te->wrapped = wrap; te->bytes_per_second = bytes_per_second; + te->write_cb = NULL; gpr_mu_init(&te->mu); grpc_slice_buffer_init(&te->write_buffer); grpc_slice_buffer_init(&te->writing_buffer); @@ -187,6 +203,7 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint_write( exec_ctx, te->wrapped, &te->writing_buffer, grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); + maybe_call_write_cb_locked(exec_ctx, te); } } size_t backlog = te->write_buffer.length;