Ping progress

pull/9511/head
Craig Tiller 8 years ago
parent e9c2d4e9ff
commit c0118b494e
  1. 46
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 14
      src/core/ext/transport/chttp2/transport/frame_ping.c
  3. 11
      src/core/ext/transport/chttp2/transport/internal.h
  4. 43
      src/core/ext/transport/chttp2/transport/writing.c
  5. 8
      src/core/lib/iomgr/closure.c
  6. 5
      src/core/lib/iomgr/closure.h
  7. 29
      src/core/lib/transport/bdp_estimator.c
  8. 18
      src/core/lib/transport/bdp_estimator.h

@ -244,6 +244,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false));
@ -1204,33 +1206,24 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
and maybe they hold resources that need to be freed */
for (size_t i = 0; i < GRPC_CHTTP2_PING_TYPE_COUNT; i++) {
grpc_chttp2_ping_queue *pq = &t->ping_queues[i];
grpc_closure_list_fail_all(&pq->next_queue, GRPC_ERROR_REF(error));
grpc_closure_list_fail_all(&pq->initiate_queue, GRPC_ERROR_REF(error));
grpc_closure_list_fail_all(&pq->inflight_queue, GRPC_ERROR_REF(error));
grpc_closure_list_sched(exec_ctx, &pq->next_queue);
grpc_closure_list_sched(exec_ctx, &pq->initiate_queue);
grpc_closure_list_sched(exec_ctx, &pq->inflight_queue);
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
grpc_closure_list_sched(exec_ctx, &pq->lists[j]);
}
}
GRPC_ERROR_UNREF(error);
}
static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_ping_type ping_type,
grpc_slice_buffer *buf) {
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_ping_type ping_type,
grpc_closure *on_initiate, grpc_closure *on_ack) {
grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
if (grpc_closure_list_empty(pq->next_queue)) {
/* no ping needed: wait */
return;
}
if (!grpc_closure_list_empty(pq->inflight_queue)) {
/* ping already in-flight: wait */
return;
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
GRPC_ERROR_NONE);
if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
GRPC_ERROR_NONE)) {
grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping");
}
pq->inflight_id = t->ping_ctr * GRPC_CHTTP2_PING_TYPE_COUNT + ping_type;
t->ping_ctr++;
grpc_closure_list_sched(exec_ctx, &pq->initiate_queue);
grpc_slice_buffer_add(buf, grpc_chttp2_ping_create(false, pq->inflight_id));
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@ -1243,7 +1236,10 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_free(from);
return;
}
grpc_closure_list_sched(exec_ctx, &pq->inflight_queue);
grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings");
}
}
static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@ -1942,6 +1938,12 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
}
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
grpc_bdp_estimator_start_ping(&t->bdp_estimator);
}
/*******************************************************************************
* CALLBACK LOOP
*/

@ -40,7 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes) {
grpc_slice slice = grpc_slice_malloc(9 + 8);
uint8_t *p = GRPC_SLICE_START_PTR(slice);
@ -53,7 +53,14 @@ grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
*p++ = 0;
*p++ = 0;
*p++ = 0;
memcpy(p, opaque_8bytes, 8);
*p++ = (uint8_t)(opaque_8bytes >> 56);
*p++ = (uint8_t)(opaque_8bytes >> 48);
*p++ = (uint8_t)(opaque_8bytes >> 40);
*p++ = (uint8_t)(opaque_8bytes >> 32);
*p++ = (uint8_t)(opaque_8bytes >> 24);
*p++ = (uint8_t)(opaque_8bytes >> 16);
*p++ = (uint8_t)(opaque_8bytes >> 8);
*p++ = (uint8_t)(opaque_8bytes);
return slice;
}
@ -70,6 +77,7 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
}
parser->byte = 0;
parser->is_ack = flags;
parser->opaque_8bytes = 0;
return GRPC_ERROR_NONE;
}
@ -83,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_ping_parser *p = parser;
while (p->byte != 8 && cur != end) {
p->opaque_8bytes[p->byte] = *cur;
p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte));
cur++;
p->byte++;
}

@ -81,10 +81,15 @@ typedef enum {
GRPC_CHTTP2_PING_TYPE_COUNT /* must be last */
} grpc_chttp2_ping_type;
typedef enum {
GRPC_CHTTP2_PCL_NEXT = 0,
GRPC_CHTTP2_PCL_INITIATE,
GRPC_CHTTP2_PCL_INFLIGHT,
GRPC_CHTTP2_PCL_COUNT /* must be last */
} grpc_chttp2_ping_closure_list;
typedef struct {
grpc_closure_list next_queue;
grpc_closure_list initiate_queue;
grpc_closure_list inflight_queue;
grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT];
uint64_t inflight_id;
} grpc_chttp2_ping_queue;

@ -55,6 +55,45 @@ static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->write_cb_pool = cb;
}
static void collapse_pings_from_into(grpc_chttp2_transport *t,
grpc_chttp2_ping_type ping_type,
grpc_chttp2_ping_queue *pq) {
for (size_t i = 0; i < GRPC_CHTTP2_PCL_COUNT; i++) {
grpc_closure_list_move(&t->ping_queues[ping_type].lists[i], &pq->lists[i]);
}
}
static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_ping_type ping_type) {
grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
/* no ping needed: wait */
return;
}
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
/* ping already in-flight: wait */
return;
}
/* coalesce equivalent pings into this one */
switch (ping_type) {
case GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE:
collapse_pings_from_into(t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, pq);
break;
case GRPC_CHTTP2_PING_ON_NEXT_WRITE:
break;
case GRPC_CHTTP2_PING_TYPE_COUNT:
GPR_UNREACHABLE_CODE(break);
}
pq->inflight_id = t->ping_ctr * GRPC_CHTTP2_PING_TYPE_COUNT + ping_type;
t->ping_ctr++;
grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_slice_buffer_add(&t->outbuf,
grpc_chttp2_ping_create(false, pq->inflight_id));
}
static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, int64_t send_bytes,
grpc_chttp2_write_cb **list, grpc_error *error) {
@ -226,6 +265,8 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
1024);
if (t->incoming_window < 3 * target_incoming_window / 4) {
maybe_initiate_ping(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
uint32_t announced = (uint32_t)GPR_CLAMP(
target_incoming_window - t->incoming_window, 0, UINT32_MAX);
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("write", t, incoming_window, announced);
@ -234,6 +275,8 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
0, announced, &throwaway_stats));
}
maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE);
GPR_TIMER_END("grpc_chttp2_begin_write", 0);
return t->outbuf.count > 0;

@ -50,20 +50,22 @@ void grpc_closure_list_init(grpc_closure_list *closure_list) {
closure_list->head = closure_list->tail = NULL;
}
void grpc_closure_list_append(grpc_closure_list *closure_list,
bool grpc_closure_list_append(grpc_closure_list *closure_list,
grpc_closure *closure, grpc_error *error) {
if (closure == NULL) {
GRPC_ERROR_UNREF(error);
return;
return false;
}
closure->error_data.error = error;
closure->next_data.next = NULL;
if (closure_list->head == NULL) {
bool was_empty = (closure_list->head == NULL);
if (was_empty) {
closure_list->head = closure;
} else {
closure_list->tail->next_data.next = closure;
}
closure_list->tail = closure;
return was_empty;
}
void grpc_closure_list_fail_all(grpc_closure_list *list,

@ -117,8 +117,9 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
void grpc_closure_list_init(grpc_closure_list *list);
/** add \a closure to the end of \a list
and set \a closure's result to \a error */
void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
and set \a closure's result to \a error
Returns true if \a list becomes non-empty */
bool grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
grpc_error *error);
/** force all success bits in \a list to false */

@ -41,7 +41,7 @@
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator) {
estimator->num_samples = 0;
estimator->first_sample_idx = 0;
estimator->sampling = false;
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
}
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
@ -68,17 +68,26 @@ static int64_t *sampling(grpc_bdp_estimator *estimator) {
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes) {
if (estimator->sampling) {
*sampling(estimator) += num_bytes;
return false;
} else {
return true;
switch (estimator->ping_state) {
case GRPC_BDP_PING_UNSCHEDULED:
return true;
case GRPC_BDP_PING_SCHEDULED:
return false;
case GRPC_BDP_PING_STARTED:
*sampling(estimator) += num_bytes;
return false;
}
GPR_UNREACHABLE_CODE(return false);
}
void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) {
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED);
estimator->ping_state = GRPC_BDP_PING_SCHEDULED;
}
void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) {
GPR_ASSERT(!estimator->sampling);
estimator->sampling = true;
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED);
estimator->ping_state = GRPC_BDP_PING_STARTED;
if (estimator->num_samples == GRPC_BDP_SAMPLES) {
estimator->first_sample_idx++;
estimator->num_samples--;
@ -87,7 +96,7 @@ void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) {
}
void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) {
GPR_ASSERT(estimator->sampling);
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED);
estimator->num_samples++;
estimator->sampling = false;
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
}

@ -40,11 +40,16 @@
#define GRPC_BDP_SAMPLES 16
#define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3
typedef enum {
GRPC_BDP_PING_UNSCHEDULED,
GRPC_BDP_PING_SCHEDULED,
GRPC_BDP_PING_STARTED
} grpc_bdp_estimator_ping_state;
typedef struct grpc_bdp_estimator {
uint8_t num_samples;
uint8_t first_sample_idx;
bool scheduled;
bool sampling;
grpc_bdp_estimator_ping_state ping_state;
int64_t samples[GRPC_BDP_SAMPLES];
} grpc_bdp_estimator;
@ -53,12 +58,15 @@ void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator);
// Returns true if a reasonable estimate could be obtained
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
int64_t *estimate);
// Returns true if the user should start a ping
// Returns true if the user should schedule a ping
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes);
// Schedule a ping
// Schedule a ping: call in response to receiving a true from
// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
// transport (but not necessarily started)
void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator);
// Start a ping
// Start a ping: call after calling grpc_bdp_estimator_schedule_ping and once
// the ping is on the wire
void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator);
// Completes a previously started ping
void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator);

Loading…
Cancel
Save