Improve simulation to not infinitely buffer writes

pull/10720/head
Craig Tiller 8 years ago
parent 456c6cec1f
commit a0cf12d976
  1. 19
      test/core/util/trickle_endpoint.c

@ -44,6 +44,8 @@
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#define WRITE_BUFFER_SIZE (2 * 1024 * 1024)
typedef struct { typedef struct {
grpc_endpoint base; grpc_endpoint base;
double bytes_per_second; double bytes_per_second;
@ -55,6 +57,7 @@ typedef struct {
grpc_slice_buffer writing_buffer; grpc_slice_buffer writing_buffer;
grpc_error *error; grpc_error *error;
bool writing; bool writing;
grpc_closure *write_cb;
} trickle_endpoint; } trickle_endpoint;
static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@ -63,6 +66,15 @@ static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb); grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb);
} }
static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx,
trickle_endpoint *te) {
if (te->write_cb != NULL && (te->error != GRPC_ERROR_NONE ||
te->write_buffer.length <= WRITE_BUFFER_SIZE)) {
grpc_closure_sched(exec_ctx, te->write_cb, GRPC_ERROR_REF(te->error));
te->write_cb = NULL;
}
}
static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_slice_buffer *slices, grpc_closure *cb) { grpc_slice_buffer *slices, grpc_closure *cb) {
trickle_endpoint *te = (trickle_endpoint *)ep; trickle_endpoint *te = (trickle_endpoint *)ep;
@ -70,11 +82,13 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_slice_ref_internal(slices->slices[i]); grpc_slice_ref_internal(slices->slices[i]);
} }
gpr_mu_lock(&te->mu); gpr_mu_lock(&te->mu);
GPR_ASSERT(te->write_cb == NULL);
if (te->write_buffer.length == 0) { if (te->write_buffer.length == 0) {
te->last_write = gpr_now(GPR_CLOCK_MONOTONIC); te->last_write = gpr_now(GPR_CLOCK_MONOTONIC);
} }
grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count); grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count);
grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error)); te->write_cb = cb;
maybe_call_write_cb_locked(exec_ctx, te);
gpr_mu_unlock(&te->mu); gpr_mu_unlock(&te->mu);
} }
@ -102,6 +116,7 @@ static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (te->error == GRPC_ERROR_NONE) { if (te->error == GRPC_ERROR_NONE) {
te->error = GRPC_ERROR_REF(why); te->error = GRPC_ERROR_REF(why);
} }
maybe_call_write_cb_locked(exec_ctx, te);
gpr_mu_unlock(&te->mu); gpr_mu_unlock(&te->mu);
grpc_endpoint_shutdown(exec_ctx, te->wrapped, why); grpc_endpoint_shutdown(exec_ctx, te->wrapped, why);
} }
@ -157,6 +172,7 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
te->base.vtable = &vtable; te->base.vtable = &vtable;
te->wrapped = wrap; te->wrapped = wrap;
te->bytes_per_second = bytes_per_second; te->bytes_per_second = bytes_per_second;
te->write_cb = NULL;
gpr_mu_init(&te->mu); gpr_mu_init(&te->mu);
grpc_slice_buffer_init(&te->write_buffer); grpc_slice_buffer_init(&te->write_buffer);
grpc_slice_buffer_init(&te->writing_buffer); grpc_slice_buffer_init(&te->writing_buffer);
@ -187,6 +203,7 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
grpc_endpoint_write( grpc_endpoint_write(
exec_ctx, te->wrapped, &te->writing_buffer, exec_ctx, te->wrapped, &te->writing_buffer,
grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx));
maybe_call_write_cb_locked(exec_ctx, te);
} }
} }
size_t backlog = te->write_buffer.length; size_t backlog = te->write_buffer.length;

Loading…
Cancel
Save