Bug fixes, tracing for bdp estimation

pull/9511/head
Craig Tiller 8 years ago
parent 1d7d12370e
commit efbd7c2a0f
  1. 5
      doc/environment_variables.md
  2. 15
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  3. 2
      src/core/ext/transport/chttp2/transport/frame_data.c
  4. 3
      src/core/ext/transport/chttp2/transport/internal.h
  5. 45
      src/core/ext/transport/chttp2/transport/writing.c
  6. 2
      src/core/lib/surface/init.c
  7. 26
      src/core/lib/transport/bdp_estimator.c
  8. 5
      src/core/lib/transport/bdp_estimator.h
  9. 4
      test/core/util/passthru_endpoint.c

@ -35,6 +35,7 @@ some configuration as environment variables that can be set.
A comma separated list of tracers that provide additional insight into how A comma separated list of tracers that provide additional insight into how
gRPC C core is processing requests via debug logs. Available tracers include: gRPC C core is processing requests via debug logs. Available tracers include:
- api - traces api calls to the C core - api - traces api calls to the C core
- bdp_estimator - traces behavior of bdp estimation logic
- channel - traces operations on the C core channel stack - channel - traces operations on the C core channel stack
- combiner - traces combiner lock state - combiner - traces combiner lock state
- compression - traces compression operations - compression - traces compression operations
@ -55,10 +56,10 @@ some configuration as environment variables that can be set.
- secure_endpoint - traces bytes flowing through encrypted channels - secure_endpoint - traces bytes flowing through encrypted channels
- transport_security - traces metadata about secure channel establishment - transport_security - traces metadata about secure channel establishment
- tcp - traces bytes in and out of a channel - tcp - traces bytes in and out of a channel
'all' can additionally be used to turn all traces on. 'all' can additionally be used to turn all traces on.
Individual traces can be disabled by prefixing them with '-'. Individual traces can be disabled by prefixing them with '-'.
Example: Example:
export GRPC_TRACE=all,-pending_tags export GRPC_TRACE=all,-pending_tags

@ -254,9 +254,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false)); grpc_combiner_scheduler(t->combiner, false));
grpc_bdp_estimator_init(&t->bdp_estimator); grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
t->last_bdp_ping_finished = gpr_now(GPR_CLOCK_MONOTONIC); t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
t->last_pid_update = t->last_bdp_ping_finished;
grpc_pid_controller_init( grpc_pid_controller_init(
&t->pid_controller, &t->pid_controller,
(grpc_pid_controller_args){.gain_p = 4, (grpc_pid_controller_args){.gain_p = 4,
@ -1887,10 +1886,6 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
errors[1] = errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
} }
if (!t->parse_saw_data_frames) {
need_bdp_ping = false;
}
t->parse_saw_data_frames = false;
if (errors[1] != GRPC_ERROR_NONE) { if (errors[1] != GRPC_ERROR_NONE) {
errors[2] = try_http_parsing(exec_ctx, t); errors[2] = try_http_parsing(exec_ctx, t);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
@ -1933,10 +1928,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked); &t->read_action_locked);
if (need_bdp_ping && if (need_bdp_ping) {
gpr_time_cmp(gpr_time_add(t->last_bdp_ping_finished,
gpr_time_from_millis(100, GPR_TIMESPAN)),
gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
grpc_bdp_estimator_schedule_ping(&t->bdp_estimator); grpc_bdp_estimator_schedule_ping(&t->bdp_estimator);
send_ping_locked(exec_ctx, t, send_ping_locked(exec_ctx, t,
@ -1992,7 +1984,6 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
} }
grpc_bdp_estimator_complete_ping(&t->bdp_estimator); grpc_bdp_estimator_complete_ping(&t->bdp_estimator);
t->last_bdp_ping_finished = gpr_now(GPR_CLOCK_MONOTONIC);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
} }

@ -156,8 +156,6 @@ static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
t->parse_saw_data_frames = true;
switch (p->state) { switch (p->state) {
case GRPC_CHTTP2_DATA_ERROR: case GRPC_CHTTP2_DATA_ERROR:
p->state = GRPC_CHTTP2_DATA_ERROR; p->state = GRPC_CHTTP2_DATA_ERROR;

@ -322,8 +322,6 @@ struct grpc_chttp2_transport {
/** initial window change */ /** initial window change */
int64_t initial_window_update; int64_t initial_window_update;
/** did the current parse see actual data bytes? */
bool parse_saw_data_frames;
/** window available for peer to send to us */ /** window available for peer to send to us */
int64_t incoming_window; int64_t incoming_window;
@ -357,7 +355,6 @@ struct grpc_chttp2_transport {
grpc_pid_controller pid_controller; grpc_pid_controller pid_controller;
grpc_closure start_bdp_ping_locked; grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked; grpc_closure finish_bdp_ping_locked;
gpr_timespec last_bdp_ping_finished;
gpr_timespec last_pid_update; gpr_timespec last_pid_update;
/* if non-NULL, close the transport with this error when writes are finished /* if non-NULL, close the transport with this error when writes are finished

@ -74,22 +74,33 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
} }
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
/* ping already in-flight: wait */ /* ping already in-flight: wait */
gpr_log(GPR_DEBUG, "already pinging"); if (grpc_http_trace || grpc_bdp_estimator_trace) {
gpr_log(GPR_DEBUG, "Ping delayed [%p]: already pinging", t->peer_string);
}
return; return;
} }
if (t->ping_state.pings_before_data_required == 0 && if (t->ping_state.pings_before_data_required == 0 &&
t->ping_policy.max_pings_without_data != 0) { t->ping_policy.max_pings_without_data != 0) {
/* need to send something of substance before sending a ping again */ /* need to send something of substance before sending a ping again */
gpr_log(GPR_DEBUG, "too many pings: %d/%d", if (grpc_http_trace || grpc_bdp_estimator_trace) {
t->ping_state.pings_before_data_required, gpr_log(GPR_DEBUG, "Ping delayed [%p]: too many recent pings: %d/%d",
t->ping_policy.max_pings_without_data); t->peer_string, t->ping_state.pings_before_data_required,
t->ping_policy.max_pings_without_data);
}
return; return;
} }
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(gpr_time_sub(now, t->ping_state.last_ping_sent_time), gpr_timespec elapsed = gpr_time_sub(now, t->ping_state.last_ping_sent_time);
t->ping_policy.min_time_between_pings) < 0) { /*gpr_log(GPR_DEBUG, "elapsed:%d.%09d min:%d.%09d", (int)elapsed.tv_sec,
elapsed.tv_nsec, (int)t->ping_policy.min_time_between_pings.tv_sec,
(int)t->ping_policy.min_time_between_pings.tv_nsec);*/
if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) < 0) {
/* not enough elapsed time between successive pings */ /* not enough elapsed time between successive pings */
gpr_log(GPR_DEBUG, "not enough time"); if (grpc_http_trace || grpc_bdp_estimator_trace) {
gpr_log(GPR_DEBUG,
"Ping delayed [%p]: not enough time elapsed since last ping",
t->peer_string);
}
return; return;
} }
/* coalesce equivalent pings into this one */ /* coalesce equivalent pings into this one */
@ -297,20 +308,14 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
} }
} }
for (size_t i = 0; i < t->ping_ack_count; i++) {
grpc_slice_buffer_add(&t->outbuf,
grpc_chttp2_ping_create(1, t->ping_acks[i]));
}
t->ping_ack_count = 0;
/* if the grpc_chttp2_transport is ready to send a window update, do so here /* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */ also; 3/4 is a magic number that will likely get tuned soon */
uint32_t target_incoming_window = GPR_MAX( uint32_t target_incoming_window = GPR_MAX(
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
1024); 1024);
uint32_t threshold_to_send_transport_window_update = uint32_t threshold_to_send_transport_window_update =
t->outbuf.count > 0 ? target_incoming_window t->outbuf.count > 0 ? 3 * target_incoming_window / 4
: 3 * target_incoming_window / 4; : target_incoming_window / 2;
if (t->incoming_window < threshold_to_send_transport_window_update) { if (t->incoming_window < threshold_to_send_transport_window_update) {
maybe_initiate_ping(exec_ctx, t, maybe_initiate_ping(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
@ -324,7 +329,15 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
t->ping_policy.max_pings_without_data; t->ping_policy.max_pings_without_data;
} }
maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE); for (size_t i = 0; i < t->ping_ack_count; i++) {
grpc_slice_buffer_add(&t->outbuf,
grpc_chttp2_ping_create(1, t->ping_acks[i]));
}
t->ping_ack_count = 0;
if (t->outbuf.count > 0) {
maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE);
}
GPR_TIMER_END("grpc_chttp2_begin_write", 0); GPR_TIMER_END("grpc_chttp2_begin_write", 0);

@ -62,6 +62,7 @@
#include "src/core/lib/surface/init.h" #include "src/core/lib/surface/init.h"
#include "src/core/lib/surface/lame_client.h" #include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/surface/server.h" #include "src/core/lib/surface/server.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h" #include "src/core/lib/transport/transport_impl.h"
@ -190,6 +191,7 @@ void grpc_init(void) {
grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace);
grpc_register_tracer("combiner", &grpc_combiner_trace); grpc_register_tracer("combiner", &grpc_combiner_trace);
grpc_register_tracer("server_channel", &grpc_server_channel_trace); grpc_register_tracer("server_channel", &grpc_server_channel_trace);
grpc_register_tracer("bdp_estimator", &grpc_bdp_estimator_trace);
// Default pluck trace to 1 // Default pluck trace to 1
grpc_cq_pluck_trace = 1; grpc_cq_pluck_trace = 1;
grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace); grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace);

@ -38,9 +38,12 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator) { int grpc_bdp_estimator_trace = 0;
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) {
estimator->estimate = 65536; estimator->estimate = 65536;
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
estimator->name = name;
} }
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
@ -51,34 +54,51 @@ bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes) { int64_t num_bytes) {
estimator->accumulator += num_bytes;
switch (estimator->ping_state) { switch (estimator->ping_state) {
case GRPC_BDP_PING_UNSCHEDULED: case GRPC_BDP_PING_UNSCHEDULED:
return true; return true;
case GRPC_BDP_PING_SCHEDULED: case GRPC_BDP_PING_SCHEDULED:
return false; return false;
case GRPC_BDP_PING_STARTED: case GRPC_BDP_PING_STARTED:
estimator->accumulator += num_bytes;
return false; return false;
} }
GPR_UNREACHABLE_CODE(return false); GPR_UNREACHABLE_CODE(return false);
} }
void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) { void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) {
if (grpc_bdp_estimator_trace) {
gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64,
estimator->name, estimator->accumulator, estimator->estimate);
}
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED); GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED);
estimator->ping_state = GRPC_BDP_PING_SCHEDULED; estimator->ping_state = GRPC_BDP_PING_SCHEDULED;
estimator->accumulator = 0;
} }
void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) { void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) {
if (grpc_bdp_estimator_trace) {
gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64,
estimator->name, estimator->accumulator, estimator->estimate);
}
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED); GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED);
estimator->ping_state = GRPC_BDP_PING_STARTED; estimator->ping_state = GRPC_BDP_PING_STARTED;
estimator->accumulator = 0; estimator->accumulator = 0;
} }
void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) { void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) {
if (grpc_bdp_estimator_trace) {
gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64,
estimator->name, estimator->accumulator, estimator->estimate);
}
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED); GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED);
if (estimator->accumulator > 2 * estimator->estimate / 3) { if (estimator->accumulator > 2 * estimator->estimate / 3) {
estimator->estimate *= 2; estimator->estimate *= 2;
gpr_log(GPR_DEBUG, "est --> %" PRId64, estimator->estimate); if (grpc_bdp_estimator_trace) {
gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64,
estimator->name, estimator->estimate);
}
} }
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
estimator->accumulator = 0;
} }

@ -40,6 +40,8 @@
#define GRPC_BDP_SAMPLES 16 #define GRPC_BDP_SAMPLES 16
#define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3 #define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3
extern int grpc_bdp_estimator_trace;
typedef enum { typedef enum {
GRPC_BDP_PING_UNSCHEDULED, GRPC_BDP_PING_UNSCHEDULED,
GRPC_BDP_PING_SCHEDULED, GRPC_BDP_PING_SCHEDULED,
@ -50,9 +52,10 @@ typedef struct grpc_bdp_estimator {
grpc_bdp_estimator_ping_state ping_state; grpc_bdp_estimator_ping_state ping_state;
int64_t accumulator; int64_t accumulator;
int64_t estimate; int64_t estimate;
const char *name;
} grpc_bdp_estimator; } grpc_bdp_estimator;
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator); void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name);
// Returns true if a reasonable estimate could be obtained // Returns true if a reasonable estimate could be obtained
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,

@ -148,7 +148,9 @@ static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
} }
static char *me_get_peer(grpc_endpoint *ep) { static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint"); passthru_endpoint *p = ((half *)ep)->parent;
return ((half *)ep) == &p->client ? gpr_strdup("fake:mock_client_endpoint")
: gpr_strdup("fake:mock_server_endpoint");
} }
static int me_get_fd(grpc_endpoint *ep) { return -1; } static int me_get_fd(grpc_endpoint *ep) { return -1; }

Loading…
Cancel
Save