Client-side keepalive pings server enforcement

pull/9902/head
Yuchen Zeng 8 years ago
parent 5272dfca33
commit 0937fc1a30
  1. 5
      include/grpc/impl/codegen/grpc_types.h
  2. 26
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  3. 17
      src/core/ext/transport/chttp2/transport/frame_ping.c
  4. 9
      src/core/ext/transport/chttp2/transport/internal.h
  5. 20
      src/core/ext/transport/chttp2/transport/writing.c

@ -202,6 +202,11 @@ typedef struct {
a data frame or header frame) */
#define GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA \
"grpc.http2.max_pings_without_data"
/** How many misbehaving pings the server can bear before sending goaway and
closing the transport?
(0 indicates that the server can bear an infinite number of misbehaving
pings) */
#define GRPC_ARG_HTTP2_MAX_PING_STRIKES "grpc.http2.max_ping_strikes"
/** How much data are we willing to queue up per stream if
GRPC_WRITE_BUFFER_HINT is set? This is an upper bound */
#define GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE "grpc.http2.write_buffer_size"

@ -153,6 +153,7 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0
#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3
#define DEFAULT_MAX_PING_STRIKES 2
/** keepalive-relevant functions */
static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
@ -351,6 +352,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
.max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
.min_time_between_pings =
gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN),
.max_ping_strikes = DEFAULT_MAX_PING_STRIKES,
};
/* client-side keepalive setting */
@ -396,6 +398,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
&channel_args->args[i],
(grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX});
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
&channel_args->args[i],
(grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX});
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) {
t->ping_policy.min_time_between_pings = gpr_time_from_millis(
@ -488,6 +495,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ping_policy.max_pings_without_data;
t->ping_state.is_delayed_ping_timer_set = false;
t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
/* Start client-side keepalive pings */
if (t->is_client) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
@ -1448,6 +1458,19 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
void grpc_chttp2_ping_strike(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
t->ping_policy.max_ping_strikes != 0) {
send_goaway(exec_ctx, t,
grpc_error_set_int(GRPC_ERROR_CREATE("too_many_pings"),
GRPC_ERROR_INT_HTTP2_ERROR,
GRPC_HTTP2_ENHANCE_YOUR_CALM));
/*The transport will be closed after the write is done */
close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE("Too many pings"));
}
}
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
@ -2163,7 +2186,8 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_transport *t = arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) {
if (t->keepalive_permit_without_calls || t->stream_map.count > 0) {
if (t->keepalive_permit_without_calls ||
grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE,

@ -103,6 +103,23 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
if (p->is_ack) {
grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes);
} else {
if (!t->is_client) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec elapsed =
gpr_time_sub(now, t->ping_recv_state.last_ping_recv_time);
if (t->keepalive_permit_without_calls == 0 &&
grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
if (gpr_time_cmp(elapsed, gpr_time_from_seconds(7200, GPR_TIMESPAN)) <
0) {
grpc_chttp2_ping_strike(exec_ctx, t);
}
} else {
if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) <
0) {
grpc_chttp2_ping_strike(exec_ctx, t);
}
}
}
if (!g_disable_ping_ack) {
if (t->ping_ack_count == t->ping_ack_capacity) {
t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3);

@ -97,6 +97,7 @@ typedef struct {
typedef struct {
gpr_timespec min_time_between_pings;
int max_pings_without_data;
int max_ping_strikes;
} grpc_chttp2_repeated_ping_policy;
typedef struct {
@ -106,6 +107,11 @@ typedef struct {
bool is_delayed_ping_timer_set;
} grpc_chttp2_repeated_ping_state;
typedef struct {
gpr_timespec last_ping_recv_time;
int ping_strikes;
} grpc_chttp2_server_ping_recv_state;
/* deframer state for the overall http2 stream of bytes */
typedef enum {
/* prefix: one entry per http2 connection prefix byte */
@ -316,6 +322,7 @@ struct grpc_chttp2_transport {
size_t ping_ack_count;
size_t ping_ack_capacity;
uint64_t *ping_acks;
grpc_chttp2_server_ping_recv_state ping_recv_state;
/** parser for headers */
grpc_chttp2_hpack_parser hpack_parser;
@ -792,6 +799,8 @@ void grpc_chttp2_incoming_byte_stream_finished(
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint64_t id);
void grpc_chttp2_ping_strike(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
typedef enum {
/* don't initiate a transport write, but piggyback on the next one */
GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,

@ -229,6 +229,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
now_writing = true;
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
}
/* send any window updates */
if (s->announce_window > 0) {
@ -238,6 +243,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
s->id, s->announce_window, &s->stats.outgoing));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
}
if (sent_initial_metadata) {
@ -270,6 +280,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
send_bytes);
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
if (is_last_frame) {
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = true;
@ -345,6 +360,11 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
0, announced, &throwaway_stats));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
}
for (size_t i = 0; i < t->ping_ack_count; i++) {

Loading…
Cancel
Save