Merge branch 'master' of https://github.com/grpc/grpc into tsi_exec_ctx

pull/12553/head
jiangtaoli2016 8 years ago
commit 1500cd6e8c
  1. 5
      doc/service_config.md
  2. 6
      include/grpc++/support/channel_arguments.h
  3. 6
      include/grpc/impl/codegen/grpc_types.h
  4. 2
      src/compiler/python_generator.cc
  5. 197
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
  6. 2
      src/core/ext/filters/client_channel/lb_policy_factory.c
  7. 2
      src/core/ext/filters/client_channel/lb_policy_factory.h
  8. 2
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
  9. 236
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  10. 3
      src/core/ext/transport/chttp2/transport/frame_ping.c
  11. 10
      src/core/ext/transport/chttp2/transport/frame_window_update.c
  12. 3
      src/core/ext/transport/chttp2/transport/hpack_parser.c
  13. 37
      src/core/ext/transport/chttp2/transport/internal.h
  14. 5
      src/core/ext/transport/chttp2/transport/writing.c
  15. 45
      src/core/lib/debug/stats_data.c
  16. 110
      src/core/lib/debug/stats_data.h
  17. 42
      src/core/lib/debug/stats_data.yaml
  18. 25
      src/core/lib/debug/stats_data_bq_schema.sql
  19. 4
      src/cpp/common/channel_arguments.cc
  20. 49
      src/python/grpcio_tests/commands.py
  21. 2
      src/ruby/qps/proxy-worker.rb
  22. 74
      test/cpp/end2end/grpclb_end2end_test.cc
  23. 3
      tools/gce/create_interop_worker.sh
  24. 3
      tools/gce/create_linux_performance_worker.sh
  25. 3
      tools/gce/create_linux_worker.sh
  26. 25
      tools/run_tests/performance/massage_qps_stats.py
  27. 250
      tools/run_tests/performance/scenario_result_schema.json
  28. 6
      tools/run_tests/run_tests.py

@ -24,10 +24,7 @@ The service config is a JSON string of the following form:
// opposed to backend addresses), gRPC will use grpclb (see
// https://github.com/grpc/grpc/blob/master/doc/load-balancing.md),
// regardless of what LB policy is requested either here or via the
// client API. However, if the resolver returns at least one backend
// address in addition to the balancer address(es), the client may fall
// back to the requested policy if it is unable to reach any of the
// grpclb load balancers.
// client API.
'loadBalancingPolicy': string,
// Per-method configuration. Optional.

@ -64,12 +64,6 @@ class ChannelArguments {
/// Set the compression algorithm for the channel.
void SetCompressionAlgorithm(grpc_compression_algorithm algorithm);
/// Set the grpclb fallback timeout (in ms) for the channel. If this amount
/// of time has passed but we have not gotten any non-empty \a serverlist from
/// the balancer, we will fall back to use the backend address(es) returned by
/// the resolver.
void SetGrpclbFallbackTimeout(int fallback_timeout);
/// Set the socket mutator for the channel.
void SetSocketMutator(grpc_socket_mutator* mutator);

@ -287,11 +287,7 @@ typedef struct {
"grpc.experimental.tcp_max_read_chunk_size"
/* Timeout in milliseconds to use for calls to the grpclb load balancer.
If 0 or unset, the balancer calls will have no deadline. */
#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms"
/* Timeout in milliseconds to wait for the serverlist from the grpclb load
balancer before using fallback backend addresses from the resolver.
If 0, fallback will never be used. */
#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms"
#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_timeout_ms"
/** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
"grpc.workaround.cronet_compression"

@ -769,7 +769,7 @@ bool PythonGrpcGenerator::Generate(const FileDescriptor* file,
PrivateGenerator generator(config_, &pbfile);
if (parameter == "grpc_2_0") {
return GenerateGrpc(context, generator, pb2_grpc_file_name, true);
} else if (parameter == "") {
} else if (parameter == "grpc_1_0" || parameter == "") {
return GenerateGrpc(context, generator, pb2_grpc_file_name, true) &&
GenerateGrpc(context, generator, pb2_file_name, false);
} else {

@ -123,7 +123,6 @@
#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
@ -300,10 +299,6 @@ typedef struct glb_lb_policy {
/** timeout in milliseconds for the LB call. 0 means no deadline. */
int lb_call_timeout_ms;
/** timeout in milliseconds for before using fallback backend addresses.
* 0 means not using fallback. */
int lb_fallback_timeout_ms;
/** for communicating with the LB server */
grpc_channel *lb_channel;
@ -330,9 +325,6 @@ typedef struct glb_lb_policy {
* Otherwise, we delegate to the RR policy. */
size_t serverlist_index;
/** stores the backend addresses from the resolver */
grpc_lb_addresses *fallback_backend_addresses;
/** list of picks that are waiting on RR's policy connectivity */
pending_pick *pending_picks;
@ -353,9 +345,6 @@ typedef struct glb_lb_policy {
/** is \a lb_call_retry_timer active? */
bool retry_timer_active;
/** is \a lb_fallback_timer active? */
bool fallback_timer_active;
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
@ -378,9 +367,6 @@ typedef struct glb_lb_policy {
/* LB call retry timer callback. */
grpc_closure lb_on_call_retry;
/* LB fallback timer callback. */
grpc_closure lb_on_fallback;
grpc_call *lb_call; /* streaming call to the LB server, */
grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
@ -404,9 +390,6 @@ typedef struct glb_lb_policy {
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
/** LB fallback timer */
grpc_timer lb_fallback_timer;
bool initial_request_sent;
bool seen_initial_response;
@ -553,32 +536,6 @@ static grpc_lb_addresses *process_serverlist_locked(
return lb_addresses;
}
/* Returns the backend addresses extracted from the given addresses */
static grpc_lb_addresses *extract_backend_addresses_locked(
grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) {
/* first pass: count the number of backend addresses */
size_t num_backends = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (!addresses->addresses[i].is_balancer) {
++num_backends;
}
}
/* second pass: actually populate the addresses and (empty) LB tokens */
grpc_lb_addresses *backend_addresses =
grpc_lb_addresses_create(num_backends, &lb_token_vtable);
size_t num_copied = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) continue;
const grpc_resolved_address *addr = &addresses->addresses[i].address;
grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
addr->len, false /* is_balancer */,
NULL /* balancer_name */,
(void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
++num_copied;
}
return backend_addresses;
}
static void update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
@ -646,38 +603,35 @@ static bool pick_from_internal_rr_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
const grpc_lb_policy_pick_args *pick_args, bool force_async,
grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
// Check for drops if we are not using fallback backend addresses.
if (glb_policy->serverlist != NULL) {
// Look at the index into the serverlist to see if we should drop this call.
grpc_grpclb_server *server =
glb_policy->serverlist->servers[glb_policy->serverlist_index++];
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
glb_policy->serverlist_index = 0; // Wrap-around.
// Look at the index into the serverlist to see if we should drop this call.
grpc_grpclb_server *server =
glb_policy->serverlist->servers[glb_policy->serverlist_index++];
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
glb_policy->serverlist_index = 0; // Wrap-around.
}
if (server->drop) {
// Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
(intptr_t)wc_arg->rr_policy);
}
if (server->drop) {
// Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
(intptr_t)wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
// Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
grpc_grpclb_client_stats_add_call_dropped_locked(
server->load_balance_token, wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
gpr_free(wc_arg->free_when_done);
return false;
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
// Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
gpr_free(wc_arg->free_when_done);
return true;
return false;
}
gpr_free(wc_arg->free_when_done);
return true;
}
// Pick via the RR policy.
const bool pick_done = grpc_lb_policy_pick_locked(
@ -715,18 +669,8 @@ static bool pick_from_internal_rr_locked(
static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
grpc_lb_addresses *addresses;
if (glb_policy->serverlist != NULL) {
GPR_ASSERT(glb_policy->serverlist->num_servers > 0);
addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist);
} else {
// If rr_handover_locked() is invoked when we haven't received any
// serverlist from the balancer, we use the fallback backends returned by
// the resolver. Note that the fallback backend list may be empty, in which
// case the new round_robin policy will keep the requested picks pending.
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses);
}
grpc_lb_addresses *addresses =
process_serverlist_locked(exec_ctx, glb_policy->serverlist);
GPR_ASSERT(addresses != NULL);
grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args));
args->client_channel_factory = glb_policy->cc_factory;
@ -832,6 +776,8 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
/* glb_policy->rr_policy may be NULL (initial handover) */
static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->serverlist != NULL &&
glb_policy->serverlist->num_servers > 0);
if (glb_policy->shutting_down) return;
grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
GPR_ASSERT(args != NULL);
@ -980,9 +926,6 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (glb_policy->serverlist != NULL) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
if (glb_policy->fallback_backend_addresses != NULL) {
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
grpc_subchannel_index_unref();
if (glb_policy->pending_update_args != NULL) {
@ -1124,28 +1067,10 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy);
static void start_picking_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
/* start a timer to fall back */
if (glb_policy->lb_fallback_timeout_ms > 0 &&
glb_policy->serverlist == NULL) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec deadline = gpr_time_add(
now,
gpr_time_from_millis(glb_policy->lb_fallback_timeout_ms, GPR_TIMESPAN));
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
glb_policy->fallback_timer_active = true;
grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline,
&glb_policy->lb_on_fallback, now);
}
glb_policy->started_picking = true;
gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
query_for_backends_locked(exec_ctx, glb_policy);
@ -1600,15 +1525,6 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (glb_policy->serverlist != NULL) {
/* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
} else {
/* or dispose of the fallback */
grpc_lb_addresses_destroy(exec_ctx,
glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses = NULL;
if (glb_policy->fallback_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer);
glb_policy->fallback_timer_active = false;
}
}
/* and update the copy in the glb_lb_policy instance. This
* serverlist instance will be destroyed either upon the next
@ -1619,7 +1535,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
} else {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Received empty server list, ignoring.");
gpr_log(GPR_INFO,
"Received empty server list. Picks will stay pending until "
"a response with > 0 servers is received");
}
grpc_grpclb_destroy_serverlist(serverlist);
}
@ -1666,26 +1584,6 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
}
static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
/* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't do anything. */
if (glb_policy->serverlist != NULL) return;
glb_policy->fallback_timer_active = false;
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"Falling back to use backends from resolver (grpclb %p)",
(void *)glb_policy);
}
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
rr_handover_locked(exec_ctx, glb_policy);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"grpclb_fallback_timer");
}
static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
@ -1806,17 +1704,6 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
&glb_policy->lb_channel_connectivity,
&glb_policy->lb_channel_on_connectivity_changed, NULL);
}
// Propagate update to fallback_backend_addresses if a non-empty serverlist
// hasn't been received from the balancer.
if (glb_policy->serverlist == NULL) {
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
if (glb_policy->rr_policy != NULL) {
rr_handover_locked(exec_ctx, glb_policy);
}
}
}
// Invoked as part of the update process. It continues watching the LB channel
@ -1899,7 +1786,13 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
/* Count the number of gRPC-LB addresses. There must be at least one. */
/* Count the number of gRPC-LB addresses. There must be at least one.
* TODO(roth): For now, we ignore non-balancer addresses, but in the
* future, we may change the behavior such that we fall back to using
* the non-balancer addresses if we cannot reach any balancers. In the
* fallback case, we should use the LB policy indicated by
* GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
* unset, we should default to pick_first). */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@ -1935,11 +1828,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
glb_policy->lb_call_timeout_ms =
grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer(
arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0,
INT_MAX});
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
grpc_arg new_arg =
@ -1948,11 +1836,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
/* Extract the backend addresses (may be empty) from the resolver for
* fallback. */
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
/* Create a client channel over them to communicate with a LB service */
glb_policy->response_generator =
grpc_fake_resolver_response_generator_create();

@ -56,7 +56,7 @@ grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) {
}
void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
const void* address, size_t address_len,
void* address, size_t address_len,
bool is_balancer, const char* balancer_name,
void* user_data) {
GPR_ASSERT(index < addresses->num_addresses);

@ -73,7 +73,7 @@ grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses);
* \a address is a socket address of length \a address_len.
* Takes ownership of \a balancer_name. */
void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
const void *address, size_t address_len,
void *address, size_t address_len,
bool is_balancer, const char *balancer_name,
void *user_data);

@ -204,7 +204,7 @@ static char *choose_service_config(char *service_config_choice_json) {
int random_pct = rand() % 100;
int percentage;
if (sscanf(field->value, "%d", &percentage) != 1 ||
random_pct > percentage) {
random_pct > percentage || percentage == 0) {
service_config_json = NULL;
break;
}

@ -144,10 +144,11 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_ping_type ping_type,
grpc_closure *on_initiate,
grpc_closure *on_complete);
static void send_ping_locked(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate,
grpc_closure *on_complete,
grpc_chttp2_initiate_write_reason initiate_write_reason);
static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error);
@ -346,7 +347,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
GRPC_CHTTP2_CLIENT_CONNECT_STRING));
grpc_chttp2_initiate_write(exec_ctx, t, "initial_write");
}
/* configure http2 the way we like it */
@ -578,7 +578,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
grpc_chttp2_initiate_write(exec_ctx, t, "init");
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
post_benign_reclaimer(exec_ctx, t);
}
@ -846,13 +847,91 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
static void inc_initiate_write_reason(
grpc_exec_ctx *exec_ctx, grpc_chttp2_initiate_write_reason reason) {
switch (reason) {
case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA(
exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA(
exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL(
exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING(
exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE(
exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED(
exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx);
break;
}
}
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, const char *reason) {
grpc_chttp2_transport *t,
grpc_chttp2_initiate_write_reason reason) {
GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
inc_initiate_write_reason(exec_ctx, reason);
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
grpc_chttp2_initiate_write_reason_string(reason));
t->is_first_write_in_batch = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
GRPC_CLOSURE_SCHED(
@ -864,7 +943,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
reason);
grpc_chttp2_initiate_write_reason_string(reason));
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
break;
@ -872,16 +951,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
bool also_initiate_write, const char *reason) {
void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
}
if (also_initiate_write) {
grpc_chttp2_initiate_write(exec_ctx, t, reason);
}
}
static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t,
@ -1105,7 +1180,9 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
}
/* cancel out streams that will never be started */
while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@ -1202,7 +1279,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s) {
if (s->id != 0 && (!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
}
}
@ -1404,14 +1483,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT(s->id != 0);
bool initiate_write = true;
if (op->send_message &&
(op->payload->send_message.send_message->flags &
GRPC_WRITE_BUFFER_HINT)) {
initiate_write = false;
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
if (!(op->send_message &&
(op->payload->send_message.send_message->flags &
GRPC_WRITE_BUFFER_HINT))) {
grpc_chttp2_initiate_write(
exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
}
grpc_chttp2_become_writable(exec_ctx, t, s, initiate_write,
"op.send_initial_metadata");
}
} else {
s->send_initial_metadata = NULL;
@ -1519,8 +1597,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
grpc_chttp2_become_writable(exec_ctx, t, s, true,
"op.send_trailing_metadata");
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
}
}
}
@ -1632,15 +1711,17 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_ping_type ping_type,
grpc_closure *on_initiate, grpc_closure *on_ack) {
static void send_ping_locked(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate,
grpc_closure *on_ack,
grpc_chttp2_initiate_write_reason initiate_write_reason) {
grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
GRPC_ERROR_NONE);
if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
GRPC_ERROR_NONE)) {
grpc_chttp2_initiate_write(exec_ctx, t, "send_ping");
grpc_chttp2_initiate_write(exec_ctx, t, initiate_write_reason);
}
}
@ -1648,7 +1729,8 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
t->ping_state.is_delayed_ping_timer_set = false;
grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping");
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@ -1663,7 +1745,8 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings");
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
}
}
@ -1676,7 +1759,8 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&slice, &http_error);
grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error,
grpc_slice_ref_internal(slice), &t->qbuf);
grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent");
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
GRPC_ERROR_UNREF(error);
}
@ -1723,7 +1807,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
if (op->send_ping) {
send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL,
op->send_ping);
op->send_ping,
GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
}
if (op->on_connectivity_state_change != NULL) {
@ -1968,7 +2053,8 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error,
&s->stats.outgoing));
grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream");
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
}
}
if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
@ -2289,7 +2375,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&s->stats.outgoing));
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error);
grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api");
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
}
typedef struct {
@ -2324,19 +2411,20 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
grpc_chttp2_become_writable(exec_ctx, t, s, true,
"immediate stream flowctl");
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL);
break;
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
grpc_chttp2_become_writable(exec_ctx, t, s, false,
"queue stream flowctl");
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
break;
}
switch (action.send_transport_update) {
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
grpc_chttp2_initiate_write(exec_ctx, t, "immediate transport flowctl");
grpc_chttp2_initiate_write(
exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL);
break;
// this is the same as no action b/c every time the transport enters the
// writing path it will maybe do an update
@ -2354,7 +2442,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
(uint32_t)action.max_frame_size);
}
if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) {
grpc_chttp2_initiate_write(exec_ctx, t, "immediate setting update");
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS);
}
}
if (action.need_ping) {
@ -2362,7 +2451,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator);
send_ping_locked(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
&t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
&t->start_bdp_ping_locked, &t->finish_bdp_ping_locked,
GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING);
}
}
@ -2441,7 +2531,10 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->flow_control.initial_window_update > 0) {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
grpc_chttp2_become_writable(exec_ctx, t, s, true, "unstalled");
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
}
}
t->flow_control.initial_window_update = 0;
@ -2556,7 +2649,8 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE,
&t->start_keepalive_ping_locked,
&t->finish_keepalive_ping_locked);
&t->finish_keepalive_ping_locked,
GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
} else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
@ -3017,6 +3111,56 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
/*******************************************************************************
* MONITORING
*/
const char *grpc_chttp2_initiate_write_reason_string(
grpc_chttp2_initiate_write_reason reason) {
switch (reason) {
case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
return "INITIAL_WRITE";
case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
return "START_NEW_STREAM";
case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
return "SEND_MESSAGE";
case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
return "SEND_INITIAL_METADATA";
case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
return "SEND_TRAILING_METADATA";
case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
return "RETRY_SEND_PING";
case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
return "CONTINUE_PINGS";
case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
return "GOAWAY_SENT";
case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
return "RST_STREAM";
case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
return "CLOSE_FROM_API";
case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
return "STREAM_FLOW_CONTROL";
case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
return "TRANSPORT_FLOW_CONTROL";
case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
return "SEND_SETTINGS";
case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING:
return "BDP_ESTIMATOR_PING";
case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
return "APPLICATION_PING";
case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
return "KEEPALIVE_PING";
case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
return "PING_RESPONSE";
case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
return "FORCE_RST_STREAM";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *t) {
return ((grpc_chttp2_transport *)t)->ep;

@ -117,7 +117,8 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks));
}
t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
grpc_chttp2_initiate_write(exec_ctx, t, "ping response");
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE);
}
}
}

@ -99,8 +99,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
grpc_chttp2_flowctl_recv_stream_update(
&t->flow_control, &s->flow_control, received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
grpc_chttp2_become_writable(exec_ctx, t, s, true,
"stream.read_flow_control");
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE);
}
}
} else {
@ -109,7 +111,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
received_update);
bool is_zero = t->flow_control.remote_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control");
grpc_chttp2_initiate_write(
exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED);
}
}
}

@ -1649,7 +1649,8 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing));
grpc_chttp2_initiate_write(exec_ctx, t, "force_rst_stream");
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst");

@ -79,6 +79,33 @@ typedef enum {
GRPC_CHTTP2_PCL_COUNT /* must be last */
} grpc_chttp2_ping_closure_list;
typedef enum {
GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING,
GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
} grpc_chttp2_initiate_write_reason;
const char *grpc_chttp2_initiate_write_reason_string(
grpc_chttp2_initiate_write_reason reason);
typedef struct {
grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT];
uint64_t inflight_id;
@ -599,7 +626,8 @@ struct grpc_chttp2_stream {
The actual call chain is documented in the implementation of this function.
*/
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, const char *reason);
grpc_chttp2_transport *t,
grpc_chttp2_initiate_write_reason reason);
typedef struct {
/** are we writing? */
@ -851,10 +879,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
bool also_initiate_write, const char *reason);
void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,

@ -201,9 +201,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (t->flow_control.remote_window > 0) {
while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) &&
stream_ref_if_not_destroyed(&s->refcount->refs)) {
grpc_chttp2_initiate_write(exec_ctx, t, "transport.read_flow_control");
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
stream_ref_if_not_destroyed(&s->refcount->refs);
}
}
}

@ -50,6 +50,27 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"http2_writes_offloaded",
"http2_writes_continued",
"http2_partial_writes",
"http2_initiate_write_due_to_initial_write",
"http2_initiate_write_due_to_start_new_stream",
"http2_initiate_write_due_to_send_message",
"http2_initiate_write_due_to_send_initial_metadata",
"http2_initiate_write_due_to_send_trailing_metadata",
"http2_initiate_write_due_to_retry_send_ping",
"http2_initiate_write_due_to_continue_pings",
"http2_initiate_write_due_to_goaway_sent",
"http2_initiate_write_due_to_rst_stream",
"http2_initiate_write_due_to_close_from_api",
"http2_initiate_write_due_to_stream_flow_control",
"http2_initiate_write_due_to_transport_flow_control",
"http2_initiate_write_due_to_send_settings",
"http2_initiate_write_due_to_bdp_estimator_ping",
"http2_initiate_write_due_to_flow_control_unstalled_by_setting",
"http2_initiate_write_due_to_flow_control_unstalled_by_update",
"http2_initiate_write_due_to_application_ping",
"http2_initiate_write_due_to_keepalive_ping",
"http2_initiate_write_due_to_transport_flow_control_unstalled",
"http2_initiate_write_due_to_ping_response",
"http2_initiate_write_due_to_force_rst_stream",
"combiner_locks_initiated",
"combiner_locks_scheduled_items",
"combiner_locks_scheduled_final_items",
@ -92,6 +113,30 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"written",
"Number of HTTP2 writes that were made knowing there was still more data "
"to be written (we cap maximum write size to syscall_write)",
"Number of HTTP2 writes initiated due to 'initial_write'",
"Number of HTTP2 writes initiated due to 'start_new_stream'",
"Number of HTTP2 writes initiated due to 'send_message'",
"Number of HTTP2 writes initiated due to 'send_initial_metadata'",
"Number of HTTP2 writes initiated due to 'send_trailing_metadata'",
"Number of HTTP2 writes initiated due to 'retry_send_ping'",
"Number of HTTP2 writes initiated due to 'continue_pings'",
"Number of HTTP2 writes initiated due to 'goaway_sent'",
"Number of HTTP2 writes initiated due to 'rst_stream'",
"Number of HTTP2 writes initiated due to 'close_from_api'",
"Number of HTTP2 writes initiated due to 'stream_flow_control'",
"Number of HTTP2 writes initiated due to 'transport_flow_control'",
"Number of HTTP2 writes initiated due to 'send_settings'",
"Number of HTTP2 writes initiated due to 'bdp_estimator_ping'",
"Number of HTTP2 writes initiated due to "
"'flow_control_unstalled_by_setting'",
"Number of HTTP2 writes initiated due to "
"'flow_control_unstalled_by_update'",
"Number of HTTP2 writes initiated due to 'application_ping'",
"Number of HTTP2 writes initiated due to 'keepalive_ping'",
"Number of HTTP2 writes initiated due to "
"'transport_flow_control_unstalled'",
"Number of HTTP2 writes initiated due to 'ping_response'",
"Number of HTTP2 writes initiated due to 'force_rst_stream'",
"Number of combiner lock entries by process (first items queued to a "
"combiner)",
"Number of items scheduled against combiner locks",

@ -52,6 +52,27 @@ typedef enum {
GRPC_STATS_COUNTER_HTTP2_WRITES_OFFLOADED,
GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED,
GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE,
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM,
GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS,
@ -169,6 +190,95 @@ typedef enum {
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED)
#define GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA( \
exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA( \
exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL( \
exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL( \
exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING( \
exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( \
exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE( \
exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED( \
exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE)
#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM)
#define GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED)

@ -117,6 +117,48 @@
- counter: http2_partial_writes
doc: Number of HTTP2 writes that were made knowing there was still more data
to be written (we cap maximum write size to syscall_write)
- counter: http2_initiate_write_due_to_initial_write
doc: Number of HTTP2 writes initiated due to 'initial_write'
- counter: http2_initiate_write_due_to_start_new_stream
doc: Number of HTTP2 writes initiated due to 'start_new_stream'
- counter: http2_initiate_write_due_to_send_message
doc: Number of HTTP2 writes initiated due to 'send_message'
- counter: http2_initiate_write_due_to_send_initial_metadata
doc: Number of HTTP2 writes initiated due to 'send_initial_metadata'
- counter: http2_initiate_write_due_to_send_trailing_metadata
doc: Number of HTTP2 writes initiated due to 'send_trailing_metadata'
- counter: http2_initiate_write_due_to_retry_send_ping
doc: Number of HTTP2 writes initiated due to 'retry_send_ping'
- counter: http2_initiate_write_due_to_continue_pings
doc: Number of HTTP2 writes initiated due to 'continue_pings'
- counter: http2_initiate_write_due_to_goaway_sent
doc: Number of HTTP2 writes initiated due to 'goaway_sent'
- counter: http2_initiate_write_due_to_rst_stream
doc: Number of HTTP2 writes initiated due to 'rst_stream'
- counter: http2_initiate_write_due_to_close_from_api
doc: Number of HTTP2 writes initiated due to 'close_from_api'
- counter: http2_initiate_write_due_to_stream_flow_control
doc: Number of HTTP2 writes initiated due to 'stream_flow_control'
- counter: http2_initiate_write_due_to_transport_flow_control
doc: Number of HTTP2 writes initiated due to 'transport_flow_control'
- counter: http2_initiate_write_due_to_send_settings
doc: Number of HTTP2 writes initiated due to 'send_settings'
- counter: http2_initiate_write_due_to_bdp_estimator_ping
doc: Number of HTTP2 writes initiated due to 'bdp_estimator_ping'
- counter: http2_initiate_write_due_to_flow_control_unstalled_by_setting
doc: Number of HTTP2 writes initiated due to 'flow_control_unstalled_by_setting'
- counter: http2_initiate_write_due_to_flow_control_unstalled_by_update
doc: Number of HTTP2 writes initiated due to 'flow_control_unstalled_by_update'
- counter: http2_initiate_write_due_to_application_ping
doc: Number of HTTP2 writes initiated due to 'application_ping'
- counter: http2_initiate_write_due_to_keepalive_ping
doc: Number of HTTP2 writes initiated due to 'keepalive_ping'
- counter: http2_initiate_write_due_to_transport_flow_control_unstalled
doc: Number of HTTP2 writes initiated due to 'transport_flow_control_unstalled'
- counter: http2_initiate_write_due_to_ping_response
doc: Number of HTTP2 writes initiated due to 'ping_response'
- counter: http2_initiate_write_due_to_force_rst_stream
doc: Number of HTTP2 writes initiated due to 'force_rst_stream'
# combiner locks
- counter: combiner_locks_initiated
doc: Number of combiner lock entries by process

@ -1,5 +1,9 @@
client_calls_created_per_iteration:FLOAT,
server_calls_created_per_iteration:FLOAT,
cqs_created_per_iteration:FLOAT,
client_channels_created_per_iteration:FLOAT,
client_subchannels_created_per_iteration:FLOAT,
server_channels_created_per_iteration:FLOAT,
syscall_poll_per_iteration:FLOAT,
syscall_wait_per_iteration:FLOAT,
histogram_slow_lookups_per_iteration:FLOAT,
@ -21,6 +25,27 @@ http2_writes_begun_per_iteration:FLOAT,
http2_writes_offloaded_per_iteration:FLOAT,
http2_writes_continued_per_iteration:FLOAT,
http2_partial_writes_per_iteration:FLOAT,
http2_initiate_write_due_to_initial_write_per_iteration:FLOAT,
http2_initiate_write_due_to_start_new_stream_per_iteration:FLOAT,
http2_initiate_write_due_to_send_message_per_iteration:FLOAT,
http2_initiate_write_due_to_send_initial_metadata_per_iteration:FLOAT,
http2_initiate_write_due_to_send_trailing_metadata_per_iteration:FLOAT,
http2_initiate_write_due_to_retry_send_ping_per_iteration:FLOAT,
http2_initiate_write_due_to_continue_pings_per_iteration:FLOAT,
http2_initiate_write_due_to_goaway_sent_per_iteration:FLOAT,
http2_initiate_write_due_to_rst_stream_per_iteration:FLOAT,
http2_initiate_write_due_to_close_from_api_per_iteration:FLOAT,
http2_initiate_write_due_to_stream_flow_control_per_iteration:FLOAT,
http2_initiate_write_due_to_transport_flow_control_per_iteration:FLOAT,
http2_initiate_write_due_to_send_settings_per_iteration:FLOAT,
http2_initiate_write_due_to_bdp_estimator_ping_per_iteration:FLOAT,
http2_initiate_write_due_to_flow_control_unstalled_by_setting_per_iteration:FLOAT,
http2_initiate_write_due_to_flow_control_unstalled_by_update_per_iteration:FLOAT,
http2_initiate_write_due_to_application_ping_per_iteration:FLOAT,
http2_initiate_write_due_to_keepalive_ping_per_iteration:FLOAT,
http2_initiate_write_due_to_transport_flow_control_unstalled_per_iteration:FLOAT,
http2_initiate_write_due_to_ping_response_per_iteration:FLOAT,
http2_initiate_write_due_to_force_rst_stream_per_iteration:FLOAT,
combiner_locks_initiated_per_iteration:FLOAT,
combiner_locks_scheduled_items_per_iteration:FLOAT,
combiner_locks_scheduled_final_items_per_iteration:FLOAT,

@ -86,10 +86,6 @@ void ChannelArguments::SetCompressionAlgorithm(
SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm);
}
void ChannelArguments::SetGrpclbFallbackTimeout(int fallback_timeout) {
SetInt(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS, fallback_timeout);
}
void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) {
if (!mutator) {
return;

@ -67,55 +67,6 @@ class GatherProto(setuptools.Command):
open(path, 'a').close()
class BuildProtoModules(setuptools.Command):
"""Command to generate project *_pb2.py modules from proto files."""
description = 'build protobuf modules'
user_options = [
('include=', None, 'path patterns to include in protobuf generation'),
('exclude=', None, 'path patterns to exclude from protobuf generation')
]
def initialize_options(self):
self.exclude = None
self.include = r'.*\.proto$'
def finalize_options(self):
pass
def run(self):
import grpc_tools.protoc as protoc
include_regex = re.compile(self.include)
exclude_regex = re.compile(self.exclude) if self.exclude else None
paths = []
for walk_root, directories, filenames in os.walk(PROTO_STEM):
for filename in filenames:
path = os.path.join(walk_root, filename)
if include_regex.match(path) and not (
exclude_regex and exclude_regex.match(path)):
paths.append(path)
# TODO(kpayson): It would be nice to do this in a batch command,
# but we currently have name conflicts in src/proto
for path in paths:
command = [
'grpc_tools.protoc',
'-I {}'.format(PROTO_STEM),
'--python_out={}'.format(PROTO_STEM),
'--grpc_python_out={}'.format(PROTO_STEM),
] + [path]
if protoc.main(command) != 0:
sys.stderr.write(
'warning: Command:\n{}\nFailed'.format(command))
# Generated proto directories dont include __init__.py, but
# these are needed for python package resolution
for walk_root, _, _ in os.walk(PROTO_STEM):
path = os.path.join(walk_root, '__init__.py')
open(path, 'a').close()
class BuildPy(build_py.build_py):
"""Custom project build command."""

@ -41,7 +41,7 @@ class ProxyBenchmarkClientServiceImpl < Grpc::Testing::ProxyClientService::Servi
@histogram = Histogram.new(@histres, @histmax)
@start_time = Time.now
# TODO(vjpai): Support multiple client channels by spawning off a PHP client per channel
command = "php " + File.expand_path(File.dirname(__FILE__)) + "/../../php/tests/qps/client.php " + @mytarget
command = "php -d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/ext/grpc/modules/grpc.so " + File.expand_path(File.dirname(__FILE__)) + "/../../php/tests/qps/client.php " + @mytarget
puts "Starting command: " + command
@php_pid = spawn(command)
end

@ -368,9 +368,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_fake_resolver_response_generator_unref(response_generator_);
}
void ResetStub(int fallback_timeout = 0) {
void ResetStub() {
ChannelArguments args;
args.SetGrpclbFallbackTimeout(fallback_timeout);
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_);
std::ostringstream uri;
@ -442,10 +441,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_exec_ctx_finish(&exec_ctx);
}
const std::vector<int> GetBackendPorts(const size_t start_index = 0) const {
const std::vector<int> GetBackendPorts() const {
std::vector<int> backend_ports;
for (size_t i = start_index; i < backend_servers_.size(); ++i) {
backend_ports.push_back(backend_servers_[i].port_);
for (const auto& bs : backend_servers_) {
backend_ports.push_back(bs.port_);
}
return backend_ports;
}
@ -616,71 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, Fallback) {
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendInResolution = backends_.size() / 2;
ResetStub(kFallbackTimeoutMs);
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
}
SetNextResolution(addresses);
// Send non-empty serverlist only after kServerlistDelayMs
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumBackendInResolution /* start_index */), {}),
kServerlistDelayMs);
// The first request. The client will block while it's still trying to
// contact the balancer.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(kNumBackendInResolution);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// Fallback is used: each backend returned by the resolver should have
// gotten one request.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
}
for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
}
// Wait until update has been processed, as signaled by the backend returned
// by the balancer receiving a request.
do {
CheckRpcSendOk(1);
} while (
backend_servers_[kNumBackendInResolution].service_->request_count() == 0);
for (size_t i = 0; i < backends_.size(); ++i) {
backend_servers_[i].service_->ResetCounters();
}
// Send out the second request.
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(backends_.size() - kNumBackendInResolution);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// Serverlist is used: each backend returned by the balancer should
// have gotten one request.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
}
for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
}
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
}
TEST_F(SingleBalancerTest, BackendsRestart) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(

@ -33,7 +33,8 @@ gcloud compute instances create $INSTANCE_NAME \
--machine-type n1-standard-16 \
--image ubuntu-15-10 \
--boot-disk-size 1000 \
--scopes https://www.googleapis.com/auth/xapi.zoo
--scopes https://www.googleapis.com/auth/xapi.zoo \
--tags=allow-ssh
echo 'Created GCE instance, waiting 60 seconds for it to come online.'
sleep 60

@ -36,7 +36,8 @@ gcloud compute instances create $INSTANCE_NAME \
--image-project ubuntu-os-cloud \
--image-family ubuntu-1704 \
--boot-disk-size 300 \
--scopes https://www.googleapis.com/auth/bigquery
--scopes https://www.googleapis.com/auth/bigquery \
--tags=allow-ssh
echo 'Created GCE instance, waiting 60 seconds for it to come online.'
sleep 60

@ -31,7 +31,8 @@ gcloud compute instances create $INSTANCE_NAME \
--image=ubuntu-1510 \
--image-project=grpc-testing \
--boot-disk-size 1000 \
--scopes https://www.googleapis.com/auth/bigquery
--scopes https://www.googleapis.com/auth/bigquery \
--tags=allow-ssh
echo 'Created GCE instance, waiting 60 seconds for it to come online.'
sleep 60

@ -22,6 +22,10 @@ def massage_qps_stats(scenario_result):
del stats["coreStats"]
stats["core_client_calls_created"] = massage_qps_stats_helpers.counter(core_stats, "client_calls_created")
stats["core_server_calls_created"] = massage_qps_stats_helpers.counter(core_stats, "server_calls_created")
stats["core_cqs_created"] = massage_qps_stats_helpers.counter(core_stats, "cqs_created")
stats["core_client_channels_created"] = massage_qps_stats_helpers.counter(core_stats, "client_channels_created")
stats["core_client_subchannels_created"] = massage_qps_stats_helpers.counter(core_stats, "client_subchannels_created")
stats["core_server_channels_created"] = massage_qps_stats_helpers.counter(core_stats, "server_channels_created")
stats["core_syscall_poll"] = massage_qps_stats_helpers.counter(core_stats, "syscall_poll")
stats["core_syscall_wait"] = massage_qps_stats_helpers.counter(core_stats, "syscall_wait")
stats["core_histogram_slow_lookups"] = massage_qps_stats_helpers.counter(core_stats, "histogram_slow_lookups")
@ -43,6 +47,27 @@ def massage_qps_stats(scenario_result):
stats["core_http2_writes_offloaded"] = massage_qps_stats_helpers.counter(core_stats, "http2_writes_offloaded")
stats["core_http2_writes_continued"] = massage_qps_stats_helpers.counter(core_stats, "http2_writes_continued")
stats["core_http2_partial_writes"] = massage_qps_stats_helpers.counter(core_stats, "http2_partial_writes")
stats["core_http2_initiate_write_due_to_initial_write"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_initial_write")
stats["core_http2_initiate_write_due_to_start_new_stream"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_start_new_stream")
stats["core_http2_initiate_write_due_to_send_message"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_send_message")
stats["core_http2_initiate_write_due_to_send_initial_metadata"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_send_initial_metadata")
stats["core_http2_initiate_write_due_to_send_trailing_metadata"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_send_trailing_metadata")
stats["core_http2_initiate_write_due_to_retry_send_ping"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_retry_send_ping")
stats["core_http2_initiate_write_due_to_continue_pings"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_continue_pings")
stats["core_http2_initiate_write_due_to_goaway_sent"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_goaway_sent")
stats["core_http2_initiate_write_due_to_rst_stream"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_rst_stream")
stats["core_http2_initiate_write_due_to_close_from_api"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_close_from_api")
stats["core_http2_initiate_write_due_to_stream_flow_control"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_stream_flow_control")
stats["core_http2_initiate_write_due_to_transport_flow_control"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_transport_flow_control")
stats["core_http2_initiate_write_due_to_send_settings"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_send_settings")
stats["core_http2_initiate_write_due_to_bdp_estimator_ping"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_bdp_estimator_ping")
stats["core_http2_initiate_write_due_to_flow_control_unstalled_by_setting"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_flow_control_unstalled_by_setting")
stats["core_http2_initiate_write_due_to_flow_control_unstalled_by_update"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_flow_control_unstalled_by_update")
stats["core_http2_initiate_write_due_to_application_ping"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_application_ping")
stats["core_http2_initiate_write_due_to_keepalive_ping"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_keepalive_ping")
stats["core_http2_initiate_write_due_to_transport_flow_control_unstalled"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_transport_flow_control_unstalled")
stats["core_http2_initiate_write_due_to_ping_response"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_ping_response")
stats["core_http2_initiate_write_due_to_force_rst_stream"] = massage_qps_stats_helpers.counter(core_stats, "http2_initiate_write_due_to_force_rst_stream")
stats["core_combiner_locks_initiated"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_initiated")
stats["core_combiner_locks_scheduled_items"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_scheduled_items")
stats["core_combiner_locks_scheduled_final_items"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_scheduled_final_items")

@ -120,6 +120,26 @@
"name": "core_server_calls_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_cqs_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_client_channels_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_client_subchannels_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_server_channels_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_syscall_poll",
@ -225,6 +245,111 @@
"name": "core_http2_partial_writes",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_initial_write",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_start_new_stream",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_send_message",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_send_initial_metadata",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_send_trailing_metadata",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_retry_send_ping",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_continue_pings",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_goaway_sent",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_rst_stream",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_close_from_api",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_stream_flow_control",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_transport_flow_control",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_send_settings",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_bdp_estimator_ping",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_flow_control_unstalled_by_setting",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_flow_control_unstalled_by_update",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_application_ping",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_keepalive_ping",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_transport_flow_control_unstalled",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_ping_response",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_force_rst_stream",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_combiner_locks_initiated",
@ -597,6 +722,26 @@
"name": "core_server_calls_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_cqs_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_client_channels_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_client_subchannels_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_server_channels_created",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_syscall_poll",
@ -702,6 +847,111 @@
"name": "core_http2_partial_writes",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_initial_write",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_start_new_stream",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_send_message",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_send_initial_metadata",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_send_trailing_metadata",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_retry_send_ping",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_continue_pings",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_goaway_sent",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_rst_stream",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_close_from_api",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_stream_flow_control",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_transport_flow_control",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_send_settings",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_bdp_estimator_ping",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_flow_control_unstalled_by_setting",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_flow_control_unstalled_by_update",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_application_ping",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_keepalive_ping",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_transport_flow_control_unstalled",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_ping_response",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_http2_initiate_write_due_to_force_rst_stream",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_combiner_locks_initiated",

@ -80,7 +80,7 @@ def get_bqtest_data(limit=None):
SELECT
filtered_test_name,
SUM(result != 'PASSED' AND result != 'SKIPPED') > 0 as flaky,
MAX(cpu_measured) as cpu
MAX(cpu_measured) + 0.01 as cpu
FROM (
SELECT
REGEXP_REPLACE(test_name, r'/\d+', '') AS filtered_test_name,
@ -92,9 +92,7 @@ SELECT
AND platform = '"""+platform_string()+"""'
AND NOT REGEXP_MATCH(job_name, '.*portability.*') )
GROUP BY
filtered_test_name
HAVING
flaky OR cpu > 0"""
filtered_test_name"""
if limit:
query += " limit {}".format(limit)
query_job = big_query_utils.sync_query_job(bq, 'grpc-testing', query)

Loading…
Cancel
Save