Merge github.com:grpc/grpc into pollset_kick_stats

pull/12370/head
Craig Tiller 7 years ago
commit 64f8b129dd
  1. 6
      include/grpc++/support/channel_arguments.h
  2. 6
      include/grpc/impl/codegen/grpc_types.h
  3. 197
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
  4. 2
      src/core/ext/filters/client_channel/lb_policy_factory.c
  5. 2
      src/core/ext/filters/client_channel/lb_policy_factory.h
  6. 82
      src/core/lib/debug/stats_data_bq_schema.sql
  7. 142
      src/core/lib/iomgr/ev_epoll1_linux.c
  8. 4
      src/cpp/common/channel_arguments.cc
  9. 74
      test/cpp/end2end/grpclb_end2end_test.cc
  10. 2
      tools/codegen/core/gen_stats_data.py
  11. 13
      tools/internal_ci/helper_scripts/prepare_build_macos_rc
  12. 2
      tools/internal_ci/macos/grpc_basictests_dbg.cfg
  13. 2
      tools/internal_ci/macos/grpc_basictests_opt.cfg
  14. 12
      tools/internal_ci/macos/grpc_build_artifacts.sh
  15. 2
      tools/internal_ci/windows/grpc_build_artifacts.bat
  16. 2
      tools/run_tests/performance/massage_qps_stats_helpers.py

@ -64,6 +64,12 @@ class ChannelArguments {
/// Set the compression algorithm for the channel. /// Set the compression algorithm for the channel.
void SetCompressionAlgorithm(grpc_compression_algorithm algorithm); void SetCompressionAlgorithm(grpc_compression_algorithm algorithm);
/// Set the grpclb fallback timeout (in ms) for the channel. If this amount
/// of time has passed but we have not gotten any non-empty \a serverlist from
/// the balancer, we will fall back to use the backend address(es) returned by
/// the resolver.
void SetGrpclbFallbackTimeout(int fallback_timeout);
/// Set the socket mutator for the channel. /// Set the socket mutator for the channel.
void SetSocketMutator(grpc_socket_mutator* mutator); void SetSocketMutator(grpc_socket_mutator* mutator);

@ -287,7 +287,11 @@ typedef struct {
"grpc.experimental.tcp_max_read_chunk_size" "grpc.experimental.tcp_max_read_chunk_size"
/* Timeout in milliseconds to use for calls to the grpclb load balancer. /* Timeout in milliseconds to use for calls to the grpclb load balancer.
If 0 or unset, the balancer calls will have no deadline. */ If 0 or unset, the balancer calls will have no deadline. */
#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_timeout_ms" #define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms"
/* Timeout in milliseconds to wait for the serverlist from the grpclb load
balancer before using fallback backend addresses from the resolver.
If 0, fallback will never be used. */
#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms"
/** If non-zero, grpc server's cronet compression workaround will be enabled */ /** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
"grpc.workaround.cronet_compression" "grpc.workaround.cronet_compression"

@ -122,6 +122,7 @@
#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb"); grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
@ -298,6 +299,10 @@ typedef struct glb_lb_policy {
/** timeout in milliseconds for the LB call. 0 means no deadline. */ /** timeout in milliseconds for the LB call. 0 means no deadline. */
int lb_call_timeout_ms; int lb_call_timeout_ms;
/** timeout in milliseconds for before using fallback backend addresses.
* 0 means not using fallback. */
int lb_fallback_timeout_ms;
/** for communicating with the LB server */ /** for communicating with the LB server */
grpc_channel *lb_channel; grpc_channel *lb_channel;
@ -324,6 +329,9 @@ typedef struct glb_lb_policy {
* Otherwise, we delegate to the RR policy. */ * Otherwise, we delegate to the RR policy. */
size_t serverlist_index; size_t serverlist_index;
/** stores the backend addresses from the resolver */
grpc_lb_addresses *fallback_backend_addresses;
/** list of picks that are waiting on RR's policy connectivity */ /** list of picks that are waiting on RR's policy connectivity */
pending_pick *pending_picks; pending_pick *pending_picks;
@ -344,6 +352,9 @@ typedef struct glb_lb_policy {
/** is \a lb_call_retry_timer active? */ /** is \a lb_call_retry_timer active? */
bool retry_timer_active; bool retry_timer_active;
/** is \a lb_fallback_timer active? */
bool fallback_timer_active;
/** called upon changes to the LB channel's connectivity. */ /** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed; grpc_closure lb_channel_on_connectivity_changed;
@ -366,6 +377,9 @@ typedef struct glb_lb_policy {
/* LB call retry timer callback. */ /* LB call retry timer callback. */
grpc_closure lb_on_call_retry; grpc_closure lb_on_call_retry;
/* LB fallback timer callback. */
grpc_closure lb_on_fallback;
grpc_call *lb_call; /* streaming call to the LB server, */ grpc_call *lb_call; /* streaming call to the LB server, */
grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
@ -389,6 +403,9 @@ typedef struct glb_lb_policy {
/** LB call retry timer */ /** LB call retry timer */
grpc_timer lb_call_retry_timer; grpc_timer lb_call_retry_timer;
/** LB fallback timer */
grpc_timer lb_fallback_timer;
bool initial_request_sent; bool initial_request_sent;
bool seen_initial_response; bool seen_initial_response;
@ -535,6 +552,32 @@ static grpc_lb_addresses *process_serverlist_locked(
return lb_addresses; return lb_addresses;
} }
/* Returns the backend addresses extracted from the given addresses */
static grpc_lb_addresses *extract_backend_addresses_locked(
grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) {
/* first pass: count the number of backend addresses */
size_t num_backends = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (!addresses->addresses[i].is_balancer) {
++num_backends;
}
}
/* second pass: actually populate the addresses and (empty) LB tokens */
grpc_lb_addresses *backend_addresses =
grpc_lb_addresses_create(num_backends, &lb_token_vtable);
size_t num_copied = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) continue;
const grpc_resolved_address *addr = &addresses->addresses[i].address;
grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
addr->len, false /* is_balancer */,
NULL /* balancer_name */,
(void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
++num_copied;
}
return backend_addresses;
}
static void update_lb_connectivity_status_locked( static void update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_connectivity_state rr_state, grpc_error *rr_state_error) { grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
@ -602,35 +645,38 @@ static bool pick_from_internal_rr_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
const grpc_lb_policy_pick_args *pick_args, bool force_async, const grpc_lb_policy_pick_args *pick_args, bool force_async,
grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
// Look at the index into the serverlist to see if we should drop this call. // Check for drops if we are not using fallback backend addresses.
grpc_grpclb_server *server = if (glb_policy->serverlist != NULL) {
glb_policy->serverlist->servers[glb_policy->serverlist_index++]; // Look at the index into the serverlist to see if we should drop this call.
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { grpc_grpclb_server *server =
glb_policy->serverlist_index = 0; // Wrap-around. glb_policy->serverlist->servers[glb_policy->serverlist_index++];
} if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
if (server->drop) { glb_policy->serverlist_index = 0; // Wrap-around.
// Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
(intptr_t)wc_arg->rr_policy);
} }
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); if (server->drop) {
// Update client load reporting stats to indicate the number of // Not using the RR policy, so unref it.
// dropped calls. Note that we have to do this here instead of in if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
// the client_load_reporting filter, because we do not create a gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
// subchannel call (and therefore no client_load_reporting filter) (intptr_t)wc_arg->rr_policy);
// for dropped calls. }
grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token, GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
wc_arg->client_stats); // Update client load reporting stats to indicate the number of
grpc_grpclb_client_stats_unref(wc_arg->client_stats); // dropped calls. Note that we have to do this here instead of in
if (force_async) { // the client_load_reporting filter, because we do not create a
GPR_ASSERT(wc_arg->wrapped_closure != NULL); // subchannel call (and therefore no client_load_reporting filter)
GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); // for dropped calls.
grpc_grpclb_client_stats_add_call_dropped_locked(
server->load_balance_token, wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
gpr_free(wc_arg->free_when_done);
return false;
}
gpr_free(wc_arg->free_when_done); gpr_free(wc_arg->free_when_done);
return false; return true;
} }
gpr_free(wc_arg->free_when_done);
return true;
} }
// Pick via the RR policy. // Pick via the RR policy.
const bool pick_done = grpc_lb_policy_pick_locked( const bool pick_done = grpc_lb_policy_pick_locked(
@ -668,8 +714,18 @@ static bool pick_from_internal_rr_locked(
static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) { glb_lb_policy *glb_policy) {
grpc_lb_addresses *addresses = grpc_lb_addresses *addresses;
process_serverlist_locked(exec_ctx, glb_policy->serverlist); if (glb_policy->serverlist != NULL) {
GPR_ASSERT(glb_policy->serverlist->num_servers > 0);
addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist);
} else {
// If rr_handover_locked() is invoked when we haven't received any
// serverlist from the balancer, we use the fallback backends returned by
// the resolver. Note that the fallback backend list may be empty, in which
// case the new round_robin policy will keep the requested picks pending.
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses);
}
GPR_ASSERT(addresses != NULL); GPR_ASSERT(addresses != NULL);
grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args)); grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args));
args->client_channel_factory = glb_policy->cc_factory; args->client_channel_factory = glb_policy->cc_factory;
@ -775,8 +831,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
/* glb_policy->rr_policy may be NULL (initial handover) */ /* glb_policy->rr_policy may be NULL (initial handover) */
static void rr_handover_locked(grpc_exec_ctx *exec_ctx, static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) { glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->serverlist != NULL &&
glb_policy->serverlist->num_servers > 0);
if (glb_policy->shutting_down) return; if (glb_policy->shutting_down) return;
grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
GPR_ASSERT(args != NULL); GPR_ASSERT(args != NULL);
@ -917,13 +971,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory, grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) { grpc_lb_policy_args *args) {
/* Count the number of gRPC-LB addresses. There must be at least one. /* Count the number of gRPC-LB addresses. There must be at least one. */
* TODO(roth): For now, we ignore non-balancer addresses, but in the
* future, we may change the behavior such that we fall back to using
* the non-balancer addresses if we cannot reach any balancers. In the
* fallback case, we should use the LB policy indicated by
* GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
* unset, we should default to pick_first). */
const grpc_arg *arg = const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) { if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@ -959,6 +1007,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
glb_policy->lb_call_timeout_ms = glb_policy->lb_call_timeout_ms =
grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer(
arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0,
INT_MAX});
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter. // since we use this to trigger the client_load_reporting filter.
grpc_arg new_arg = grpc_arg new_arg =
@ -967,6 +1020,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
glb_policy->args = grpc_channel_args_copy_and_add_and_remove( glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
/* Extract the backend addresses (may be empty) from the resolver for
* fallback. */
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
/* Create a client channel over them to communicate with a LB service */ /* Create a client channel over them to communicate with a LB service */
glb_policy->response_generator = glb_policy->response_generator =
grpc_fake_resolver_response_generator_create(); grpc_fake_resolver_response_generator_create();
@ -1010,6 +1068,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (glb_policy->serverlist != NULL) { if (glb_policy->serverlist != NULL) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist); grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
} }
if (glb_policy->fallback_backend_addresses != NULL) {
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
if (glb_policy->pending_update_args != NULL) { if (glb_policy->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
@ -1150,10 +1211,28 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy); glb_lb_policy *glb_policy);
static void start_picking_locked(grpc_exec_ctx *exec_ctx, static void start_picking_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) { glb_lb_policy *glb_policy) {
/* start a timer to fall back */
if (glb_policy->lb_fallback_timeout_ms > 0 &&
glb_policy->serverlist == NULL) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec deadline = gpr_time_add(
now,
gpr_time_from_millis(glb_policy->lb_fallback_timeout_ms, GPR_TIMESPAN));
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
glb_policy->fallback_timer_active = true;
grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline,
&glb_policy->lb_on_fallback, now);
}
glb_policy->started_picking = true; glb_policy->started_picking = true;
gpr_backoff_reset(&glb_policy->lb_call_backoff_state); gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
query_for_backends_locked(exec_ctx, glb_policy); query_for_backends_locked(exec_ctx, glb_policy);
@ -1607,6 +1686,15 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (glb_policy->serverlist != NULL) { if (glb_policy->serverlist != NULL) {
/* dispose of the old serverlist */ /* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist(glb_policy->serverlist); grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
} else {
/* or dispose of the fallback */
grpc_lb_addresses_destroy(exec_ctx,
glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses = NULL;
if (glb_policy->fallback_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer);
glb_policy->fallback_timer_active = false;
}
} }
/* and update the copy in the glb_lb_policy instance. This /* and update the copy in the glb_lb_policy instance. This
* serverlist instance will be destroyed either upon the next * serverlist instance will be destroyed either upon the next
@ -1617,9 +1705,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
} }
} else { } else {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "Received empty server list, ignoring.");
"Received empty server list. Picks will stay pending until "
"a response with > 0 servers is received");
} }
grpc_grpclb_destroy_serverlist(serverlist); grpc_grpclb_destroy_serverlist(serverlist);
} }
@ -1666,6 +1752,26 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
} }
static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = arg;
/* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't do anything. */
if (glb_policy->serverlist != NULL) return;
glb_policy->fallback_timer_active = false;
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"Falling back to use backends from resolver (grpclb %p)",
(void *)glb_policy);
}
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
rr_handover_locked(exec_ctx, glb_policy);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"grpclb_fallback_timer");
}
static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) { void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg; glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
@ -1786,6 +1892,17 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
&glb_policy->lb_channel_connectivity, &glb_policy->lb_channel_connectivity,
&glb_policy->lb_channel_on_connectivity_changed, NULL); &glb_policy->lb_channel_on_connectivity_changed, NULL);
} }
// Propagate update to fallback_backend_addresses if a non-empty serverlist
// hasn't been received from the balancer.
if (glb_policy->serverlist == NULL) {
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
if (glb_policy->rr_policy != NULL) {
rr_handover_locked(exec_ctx, glb_policy);
}
}
} }
// Invoked as part of the update process. It continues watching the LB channel // Invoked as part of the update process. It continues watching the LB channel

@ -56,7 +56,7 @@ grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) {
} }
void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
void* address, size_t address_len, const void* address, size_t address_len,
bool is_balancer, const char* balancer_name, bool is_balancer, const char* balancer_name,
void* user_data) { void* user_data) {
GPR_ASSERT(index < addresses->num_addresses); GPR_ASSERT(index < addresses->num_addresses);

@ -73,7 +73,7 @@ grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses);
* \a address is a socket address of length \a address_len. * \a address is a socket address of length \a address_len.
* Takes ownership of \a balancer_name. */ * Takes ownership of \a balancer_name. */
void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index, void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
void *address, size_t address_len, const void *address, size_t address_len,
bool is_balancer, const char *balancer_name, bool is_balancer, const char *balancer_name,
void *user_data); void *user_data);

@ -1,41 +1,41 @@
client_calls_created_per_iteration:INTEGER, client_calls_created_per_iteration:FLOAT,
server_calls_created_per_iteration:INTEGER, server_calls_created_per_iteration:FLOAT,
syscall_poll_per_iteration:INTEGER, syscall_poll_per_iteration:FLOAT,
syscall_wait_per_iteration:INTEGER, syscall_wait_per_iteration:FLOAT,
pollset_kick_per_iteration:INTEGER, pollset_kick_per_iteration:FLOAT,
pollset_kicked_without_poller_per_iteration:INTEGER, pollset_kicked_without_poller_per_iteration:FLOAT,
pollset_kicked_again_per_iteration:INTEGER, pollset_kicked_again_per_iteration:FLOAT,
pollset_kick_wakeup_fd_per_iteration:INTEGER, pollset_kick_wakeup_fd_per_iteration:FLOAT,
pollset_kick_wakeup_cv_per_iteration:INTEGER, pollset_kick_wakeup_cv_per_iteration:FLOAT,
pollset_kick_own_thread_per_iteration:INTEGER, pollset_kick_own_thread_per_iteration:FLOAT,
histogram_slow_lookups_per_iteration:INTEGER, histogram_slow_lookups_per_iteration:FLOAT,
syscall_write_per_iteration:INTEGER, syscall_write_per_iteration:FLOAT,
syscall_read_per_iteration:INTEGER, syscall_read_per_iteration:FLOAT,
tcp_backup_pollers_created_per_iteration:INTEGER, tcp_backup_pollers_created_per_iteration:FLOAT,
tcp_backup_poller_polls_per_iteration:INTEGER, tcp_backup_poller_polls_per_iteration:FLOAT,
http2_op_batches_per_iteration:INTEGER, http2_op_batches_per_iteration:FLOAT,
http2_op_cancel_per_iteration:INTEGER, http2_op_cancel_per_iteration:FLOAT,
http2_op_send_initial_metadata_per_iteration:INTEGER, http2_op_send_initial_metadata_per_iteration:FLOAT,
http2_op_send_message_per_iteration:INTEGER, http2_op_send_message_per_iteration:FLOAT,
http2_op_send_trailing_metadata_per_iteration:INTEGER, http2_op_send_trailing_metadata_per_iteration:FLOAT,
http2_op_recv_initial_metadata_per_iteration:INTEGER, http2_op_recv_initial_metadata_per_iteration:FLOAT,
http2_op_recv_message_per_iteration:INTEGER, http2_op_recv_message_per_iteration:FLOAT,
http2_op_recv_trailing_metadata_per_iteration:INTEGER, http2_op_recv_trailing_metadata_per_iteration:FLOAT,
http2_settings_writes_per_iteration:INTEGER, http2_settings_writes_per_iteration:FLOAT,
http2_pings_sent_per_iteration:INTEGER, http2_pings_sent_per_iteration:FLOAT,
http2_writes_begun_per_iteration:INTEGER, http2_writes_begun_per_iteration:FLOAT,
http2_writes_offloaded_per_iteration:INTEGER, http2_writes_offloaded_per_iteration:FLOAT,
http2_writes_continued_per_iteration:INTEGER, http2_writes_continued_per_iteration:FLOAT,
http2_partial_writes_per_iteration:INTEGER, http2_partial_writes_per_iteration:FLOAT,
combiner_locks_initiated_per_iteration:INTEGER, combiner_locks_initiated_per_iteration:FLOAT,
combiner_locks_scheduled_items_per_iteration:INTEGER, combiner_locks_scheduled_items_per_iteration:FLOAT,
combiner_locks_scheduled_final_items_per_iteration:INTEGER, combiner_locks_scheduled_final_items_per_iteration:FLOAT,
combiner_locks_offloaded_per_iteration:INTEGER, combiner_locks_offloaded_per_iteration:FLOAT,
executor_scheduled_short_items_per_iteration:INTEGER, executor_scheduled_short_items_per_iteration:FLOAT,
executor_scheduled_long_items_per_iteration:INTEGER, executor_scheduled_long_items_per_iteration:FLOAT,
executor_scheduled_to_self_per_iteration:INTEGER, executor_scheduled_to_self_per_iteration:FLOAT,
executor_wakeup_initiated_per_iteration:INTEGER, executor_wakeup_initiated_per_iteration:FLOAT,
executor_queue_drained_per_iteration:INTEGER, executor_queue_drained_per_iteration:FLOAT,
executor_push_retries_per_iteration:INTEGER, executor_push_retries_per_iteration:FLOAT,
server_requested_calls_per_iteration:INTEGER, server_requested_calls_per_iteration:FLOAT,
server_slowpath_requests_queued_per_iteration:INTEGER server_slowpath_requests_queued_per_iteration:FLOAT

@ -160,18 +160,18 @@ struct grpc_pollset_worker {
(worker)->kick_state_mutator = __LINE__; \ (worker)->kick_state_mutator = __LINE__; \
} while (false) } while (false)
#define MAX_NEIGHBOURHOODS 1024 #define MAX_NEIGHBORHOODS 1024
typedef struct pollset_neighbourhood { typedef struct pollset_neighborhood {
gpr_mu mu; gpr_mu mu;
grpc_pollset *active_root; grpc_pollset *active_root;
char pad[GPR_CACHELINE_SIZE]; char pad[GPR_CACHELINE_SIZE];
} pollset_neighbourhood; } pollset_neighborhood;
struct grpc_pollset { struct grpc_pollset {
gpr_mu mu; gpr_mu mu;
pollset_neighbourhood *neighbourhood; pollset_neighborhood *neighborhood;
bool reassigning_neighbourhood; bool reassigning_neighborhood;
grpc_pollset_worker *root_worker; grpc_pollset_worker *root_worker;
bool kicked_without_poller; bool kicked_without_poller;
@ -384,8 +384,8 @@ GPR_TLS_DECL(g_current_thread_worker);
/* The designated poller */ /* The designated poller */
static gpr_atm g_active_poller; static gpr_atm g_active_poller;
static pollset_neighbourhood *g_neighbourhoods; static pollset_neighborhood *g_neighborhoods;
static size_t g_num_neighbourhoods; static size_t g_num_neighborhoods;
/* Return true if first in list */ /* Return true if first in list */
static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) { static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
@ -424,8 +424,8 @@ static worker_remove_result worker_remove(grpc_pollset *pollset,
} }
} }
static size_t choose_neighbourhood(void) { static size_t choose_neighborhood(void) {
return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods; return (size_t)gpr_cpu_current_cpu() % g_num_neighborhoods;
} }
static grpc_error *pollset_global_init(void) { static grpc_error *pollset_global_init(void) {
@ -441,11 +441,11 @@ static grpc_error *pollset_global_init(void) {
&ev) != 0) { &ev) != 0) {
return GRPC_OS_ERROR(errno, "epoll_ctl"); return GRPC_OS_ERROR(errno, "epoll_ctl");
} }
g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS); g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
g_neighbourhoods = (pollset_neighbourhood *)gpr_zalloc( g_neighborhoods = (pollset_neighborhood *)gpr_zalloc(
sizeof(*g_neighbourhoods) * g_num_neighbourhoods); sizeof(*g_neighborhoods) * g_num_neighborhoods);
for (size_t i = 0; i < g_num_neighbourhoods; i++) { for (size_t i = 0; i < g_num_neighborhoods; i++) {
gpr_mu_init(&g_neighbourhoods[i].mu); gpr_mu_init(&g_neighborhoods[i].mu);
} }
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
@ -454,17 +454,17 @@ static void pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker); gpr_tls_destroy(&g_current_thread_worker);
if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd); if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
for (size_t i = 0; i < g_num_neighbourhoods; i++) { for (size_t i = 0; i < g_num_neighborhoods; i++) {
gpr_mu_destroy(&g_neighbourhoods[i].mu); gpr_mu_destroy(&g_neighborhoods[i].mu);
} }
gpr_free(g_neighbourhoods); gpr_free(g_neighborhoods);
} }
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu); gpr_mu_init(&pollset->mu);
*mu = &pollset->mu; *mu = &pollset->mu;
pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()]; pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
pollset->reassigning_neighbourhood = false; pollset->reassigning_neighborhood = false;
pollset->root_worker = NULL; pollset->root_worker = NULL;
pollset->kicked_without_poller = false; pollset->kicked_without_poller = false;
pollset->seen_inactive = true; pollset->seen_inactive = true;
@ -477,26 +477,26 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
if (!pollset->seen_inactive) { if (!pollset->seen_inactive) {
pollset_neighbourhood *neighbourhood = pollset->neighbourhood; pollset_neighborhood *neighborhood = pollset->neighborhood;
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->mu);
retry_lock_neighbourhood: retry_lock_neighborhood:
gpr_mu_lock(&neighbourhood->mu); gpr_mu_lock(&neighborhood->mu);
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
if (!pollset->seen_inactive) { if (!pollset->seen_inactive) {
if (pollset->neighbourhood != neighbourhood) { if (pollset->neighborhood != neighborhood) {
gpr_mu_unlock(&neighbourhood->mu); gpr_mu_unlock(&neighborhood->mu);
neighbourhood = pollset->neighbourhood; neighborhood = pollset->neighborhood;
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->mu);
goto retry_lock_neighbourhood; goto retry_lock_neighborhood;
} }
pollset->prev->next = pollset->next; pollset->prev->next = pollset->next;
pollset->next->prev = pollset->prev; pollset->next->prev = pollset->prev;
if (pollset == pollset->neighbourhood->active_root) { if (pollset == pollset->neighborhood->active_root) {
pollset->neighbourhood->active_root = pollset->neighborhood->active_root =
pollset->next == pollset ? NULL : pollset->next; pollset->next == pollset ? NULL : pollset->next;
} }
} }
gpr_mu_unlock(&pollset->neighbourhood->mu); gpr_mu_unlock(&pollset->neighborhood->mu);
} }
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->mu);
gpr_mu_destroy(&pollset->mu); gpr_mu_destroy(&pollset->mu);
@ -682,16 +682,16 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
// pollset has been observed to be inactive, we need to move back to the // pollset has been observed to be inactive, we need to move back to the
// active list // active list
bool is_reassigning = false; bool is_reassigning = false;
if (!pollset->reassigning_neighbourhood) { if (!pollset->reassigning_neighborhood) {
is_reassigning = true; is_reassigning = true;
pollset->reassigning_neighbourhood = true; pollset->reassigning_neighborhood = true;
pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()]; pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
} }
pollset_neighbourhood *neighbourhood = pollset->neighbourhood; pollset_neighborhood *neighborhood = pollset->neighborhood;
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->mu);
// pollset unlocked: state may change (even worker->kick_state) // pollset unlocked: state may change (even worker->kick_state)
retry_lock_neighbourhood: retry_lock_neighborhood:
gpr_mu_lock(&neighbourhood->mu); gpr_mu_lock(&neighborhood->mu);
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
@ -699,17 +699,17 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
is_reassigning); is_reassigning);
} }
if (pollset->seen_inactive) { if (pollset->seen_inactive) {
if (neighbourhood != pollset->neighbourhood) { if (neighborhood != pollset->neighborhood) {
gpr_mu_unlock(&neighbourhood->mu); gpr_mu_unlock(&neighborhood->mu);
neighbourhood = pollset->neighbourhood; neighborhood = pollset->neighborhood;
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->mu);
goto retry_lock_neighbourhood; goto retry_lock_neighborhood;
} }
/* In the brief time we released the pollset locks above, the worker MAY /* In the brief time we released the pollset locks above, the worker MAY
have been kicked. In this case, the worker should get out of this have been kicked. In this case, the worker should get out of this
pollset ASAP and hence this should neither add the pollset to pollset ASAP and hence this should neither add the pollset to
neighbourhood nor mark the pollset as active. neighborhood nor mark the pollset as active.
On a side note, the only way a worker's kick state could have changed On a side note, the only way a worker's kick state could have changed
at this point is if it were "kicked specifically". Since the worker has at this point is if it were "kicked specifically". Since the worker has
@ -717,25 +717,25 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
not visible in the "kick any" path yet */ not visible in the "kick any" path yet */
if (worker->kick_state == UNKICKED) { if (worker->kick_state == UNKICKED) {
pollset->seen_inactive = false; pollset->seen_inactive = false;
if (neighbourhood->active_root == NULL) { if (neighborhood->active_root == NULL) {
neighbourhood->active_root = pollset->next = pollset->prev = pollset; neighborhood->active_root = pollset->next = pollset->prev = pollset;
/* Make this the designated poller if there isn't one already */ /* Make this the designated poller if there isn't one already */
if (worker->kick_state == UNKICKED && if (worker->kick_state == UNKICKED &&
gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
SET_KICK_STATE(worker, DESIGNATED_POLLER); SET_KICK_STATE(worker, DESIGNATED_POLLER);
} }
} else { } else {
pollset->next = neighbourhood->active_root; pollset->next = neighborhood->active_root;
pollset->prev = pollset->next->prev; pollset->prev = pollset->next->prev;
pollset->next->prev = pollset->prev->next = pollset; pollset->next->prev = pollset->prev->next = pollset;
} }
} }
} }
if (is_reassigning) { if (is_reassigning) {
GPR_ASSERT(pollset->reassigning_neighbourhood); GPR_ASSERT(pollset->reassigning_neighborhood);
pollset->reassigning_neighbourhood = false; pollset->reassigning_neighborhood = false;
} }
gpr_mu_unlock(&neighbourhood->mu); gpr_mu_unlock(&neighborhood->mu);
} }
worker_insert(pollset, worker); worker_insert(pollset, worker);
@ -770,7 +770,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
} }
/* We release pollset lock in this function at a couple of places: /* We release pollset lock in this function at a couple of places:
* 1. Briefly when assigning pollset to a neighbourhood * 1. Briefly when assigning pollset to a neighborhood
* 2. When doing gpr_cv_wait() * 2. When doing gpr_cv_wait()
* It is possible that 'kicked_without_poller' was set to true during (1) and * It is possible that 'kicked_without_poller' was set to true during (1) and
* 'shutting_down' is set to true during (1) or (2). If either of them is * 'shutting_down' is set to true during (1) or (2). If either of them is
@ -788,12 +788,12 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
} }
static bool check_neighbourhood_for_available_poller( static bool check_neighborhood_for_available_poller(
grpc_exec_ctx *exec_ctx, pollset_neighbourhood *neighbourhood) { grpc_exec_ctx *exec_ctx, pollset_neighborhood *neighborhood) {
GPR_TIMER_BEGIN("check_neighbourhood_for_available_poller", 0); GPR_TIMER_BEGIN("check_neighborhood_for_available_poller", 0);
bool found_worker = false; bool found_worker = false;
do { do {
grpc_pollset *inspect = neighbourhood->active_root; grpc_pollset *inspect = neighborhood->active_root;
if (inspect == NULL) { if (inspect == NULL) {
break; break;
} }
@ -839,8 +839,8 @@ static bool check_neighbourhood_for_available_poller(
gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect); gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
} }
inspect->seen_inactive = true; inspect->seen_inactive = true;
if (inspect == neighbourhood->active_root) { if (inspect == neighborhood->active_root) {
neighbourhood->active_root = neighborhood->active_root =
inspect->next == inspect ? NULL : inspect->next; inspect->next == inspect ? NULL : inspect->next;
} }
inspect->next->prev = inspect->prev; inspect->next->prev = inspect->prev;
@ -849,7 +849,7 @@ static bool check_neighbourhood_for_available_poller(
} }
gpr_mu_unlock(&inspect->mu); gpr_mu_unlock(&inspect->mu);
} while (!found_worker); } while (!found_worker);
GPR_TIMER_END("check_neighbourhood_for_available_poller", 0); GPR_TIMER_END("check_neighborhood_for_available_poller", 0);
return found_worker; return found_worker;
} }
@ -881,33 +881,33 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
} }
} else { } else {
gpr_atm_no_barrier_store(&g_active_poller, 0); gpr_atm_no_barrier_store(&g_active_poller, 0);
size_t poller_neighbourhood_idx = size_t poller_neighborhood_idx =
(size_t)(pollset->neighbourhood - g_neighbourhoods); (size_t)(pollset->neighborhood - g_neighborhoods);
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->mu);
bool found_worker = false; bool found_worker = false;
bool scan_state[MAX_NEIGHBOURHOODS]; bool scan_state[MAX_NEIGHBORHOODS];
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
pollset_neighbourhood *neighbourhood = pollset_neighborhood *neighborhood =
&g_neighbourhoods[(poller_neighbourhood_idx + i) % &g_neighborhoods[(poller_neighborhood_idx + i) %
g_num_neighbourhoods]; g_num_neighborhoods];
if (gpr_mu_trylock(&neighbourhood->mu)) { if (gpr_mu_trylock(&neighborhood->mu)) {
found_worker = found_worker =
check_neighbourhood_for_available_poller(exec_ctx, neighbourhood); check_neighborhood_for_available_poller(exec_ctx, neighborhood);
gpr_mu_unlock(&neighbourhood->mu); gpr_mu_unlock(&neighborhood->mu);
scan_state[i] = true; scan_state[i] = true;
} else { } else {
scan_state[i] = false; scan_state[i] = false;
} }
} }
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
if (scan_state[i]) continue; if (scan_state[i]) continue;
pollset_neighbourhood *neighbourhood = pollset_neighborhood *neighborhood =
&g_neighbourhoods[(poller_neighbourhood_idx + i) % &g_neighborhoods[(poller_neighborhood_idx + i) %
g_num_neighbourhoods]; g_num_neighborhoods];
gpr_mu_lock(&neighbourhood->mu); gpr_mu_lock(&neighborhood->mu);
found_worker = found_worker =
check_neighbourhood_for_available_poller(exec_ctx, neighbourhood); check_neighborhood_for_available_poller(exec_ctx, neighborhood);
gpr_mu_unlock(&neighbourhood->mu); gpr_mu_unlock(&neighborhood->mu);
} }
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);

@ -86,6 +86,10 @@ void ChannelArguments::SetCompressionAlgorithm(
SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm); SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm);
} }
void ChannelArguments::SetGrpclbFallbackTimeout(int fallback_timeout) {
SetInt(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS, fallback_timeout);
}
void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) { void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) {
if (!mutator) { if (!mutator) {
return; return;

@ -368,8 +368,9 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_fake_resolver_response_generator_unref(response_generator_); grpc_fake_resolver_response_generator_unref(response_generator_);
} }
void ResetStub() { void ResetStub(int fallback_timeout = 0) {
ChannelArguments args; ChannelArguments args;
args.SetGrpclbFallbackTimeout(fallback_timeout);
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_); response_generator_);
std::ostringstream uri; std::ostringstream uri;
@ -441,10 +442,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
const std::vector<int> GetBackendPorts() const { const std::vector<int> GetBackendPorts(const size_t start_index = 0) const {
std::vector<int> backend_ports; std::vector<int> backend_ports;
for (const auto& bs : backend_servers_) { for (size_t i = start_index; i < backend_servers_.size(); ++i) {
backend_ports.push_back(bs.port_); backend_ports.push_back(backend_servers_[i].port_);
} }
return backend_ports; return backend_ports;
} }
@ -615,6 +616,71 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
} }
TEST_F(SingleBalancerTest, Fallback) {
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendInResolution = backends_.size() / 2;
ResetStub(kFallbackTimeoutMs);
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
}
SetNextResolution(addresses);
// Send non-empty serverlist only after kServerlistDelayMs
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumBackendInResolution /* start_index */), {}),
kServerlistDelayMs);
// The first request. The client will block while it's still trying to
// contact the balancer.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(kNumBackendInResolution);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// Fallback is used: each backend returned by the resolver should have
// gotten one request.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
}
for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
}
// Wait until update has been processed, as signaled by the backend returned
// by the balancer receiving a request.
do {
CheckRpcSendOk(1);
} while (
backend_servers_[kNumBackendInResolution].service_->request_count() == 0);
for (size_t i = 0; i < backends_.size(); ++i) {
backend_servers_[i].service_->ResetCounters();
}
// Send out the second request.
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(backends_.size() - kNumBackendInResolution);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// Serverlist is used: each backend returned by the balancer should
// have gotten one request.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
}
for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
}
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
}
TEST_F(SingleBalancerTest, BackendsRestart) { TEST_F(SingleBalancerTest, BackendsRestart) {
const size_t kNumRpcsPerAddress = 100; const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer( ScheduleResponseForBalancer(

@ -405,6 +405,6 @@ with open('tools/run_tests/performance/massage_qps_stats.py', 'w') as P:
with open('src/core/lib/debug/stats_data_bq_schema.sql', 'w') as S: with open('src/core/lib/debug/stats_data_bq_schema.sql', 'w') as S:
columns = [] columns = []
for counter in inst_map['Counter']: for counter in inst_map['Counter']:
columns.append(('%s_per_iteration' % counter.name, 'INTEGER')) columns.append(('%s_per_iteration' % counter.name, 'FLOAT'))
print >>S, ',\n'.join('%s:%s' % x for x in columns) print >>S, ',\n'.join('%s:%s' % x for x in columns)

@ -31,23 +31,16 @@ ulimit -a
pip install google-api-python-client --user python pip install google-api-python-client --user python
export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json
# required to build protobuf
brew install gflags
set +ex # rvm script is very verbose and exits with errorcode set +ex # rvm script is very verbose and exits with errorcode
source $HOME/.rvm/scripts/rvm source $HOME/.rvm/scripts/rvm
set -e # rvm commands are very verbose set -e # rvm commands are very verbose
rvm install ruby-2.4 rvm use ruby-2.4
rvm osx-ssl-certs status all rvm osx-ssl-certs status all
rvm osx-ssl-certs update all rvm osx-ssl-certs update all
set -ex set -ex
gem install bundler
# cocoapods # cocoapods
export LANG=en_US.UTF-8 export LANG=en_US.UTF-8
gem install cocoapods
gem install xcpretty
pod repo update # needed by python pod repo update # needed by python
# python # python
@ -56,10 +49,6 @@ pip install virtualenv --user python
pip install -U six tox setuptools --user python pip install -U six tox setuptools --user python
export PYTHONPATH=/Library/Python/3.4/site-packages export PYTHONPATH=/Library/Python/3.4/site-packages
# python 3.4
wget -q https://www.python.org/ftp/python/3.4.4/python-3.4.4-macosx10.6.pkg
sudo installer -pkg python-3.4.4-macosx10.6.pkg -target /
# set xcode version for Obj-C tests # set xcode version for Obj-C tests
sudo xcode-select -switch /Applications/Xcode_8.2.1.app/Contents/Developer sudo xcode-select -switch /Applications/Xcode_8.2.1.app/Contents/Developer

@ -27,5 +27,5 @@ action {
env_vars { env_vars {
key: "RUN_TESTS_FLAGS" key: "RUN_TESTS_FLAGS"
value: "-f basictests macos dbg --internal_ci -j 1 --inner_jobs 4" value: "-f basictests macos dbg --internal_ci -j 1 --inner_jobs 4 --bq_result_table aggregate_results"
} }

@ -27,5 +27,5 @@ action {
env_vars { env_vars {
key: "RUN_TESTS_FLAGS" key: "RUN_TESTS_FLAGS"
value: "-f basictests macos opt --internal_ci -j 1 --inner_jobs 4" value: "-f basictests macos opt --internal_ci -j 1 --inner_jobs 4 --bq_result_table aggregate_results"
} }

@ -20,25 +20,13 @@ cd $(dirname $0)/../../..
source tools/internal_ci/helper_scripts/prepare_build_macos_rc source tools/internal_ci/helper_scripts/prepare_build_macos_rc
# python 3.5
wget -q https://www.python.org/ftp/python/3.5.2/python-3.5.2-macosx10.6.pkg
sudo installer -pkg python-3.5.2-macosx10.6.pkg -target /
# install cython for all python versions # install cython for all python versions
python2.7 -m pip install cython setuptools wheel python2.7 -m pip install cython setuptools wheel
python3.4 -m pip install cython setuptools wheel python3.4 -m pip install cython setuptools wheel
python3.5 -m pip install cython setuptools wheel python3.5 -m pip install cython setuptools wheel
python3.6 -m pip install cython setuptools wheel python3.6 -m pip install cython setuptools wheel
# node-gyp (needed for node artifacts)
npm install -g node-gyp
# php dependencies: pecl
curl -O http://pear.php.net/go-pear.phar
sudo php -d detect_unicode=0 go-pear.phar
# needed to build ruby artifacts # needed to build ruby artifacts
gem install rake-compiler
wget https://raw.githubusercontent.com/grpc/grpc/master/tools/distrib/build_ruby_environment_macos.sh wget https://raw.githubusercontent.com/grpc/grpc/master/tools/distrib/build_ruby_environment_macos.sh
bash build_ruby_environment_macos.sh bash build_ruby_environment_macos.sh

@ -19,8 +19,6 @@ rename C:\Python34_32bit Python34_32bits
rename C:\Python35_32bit Python35_32bits rename C:\Python35_32bit Python35_32bits
rename C:\Python36_32bit Python36_32bits rename C:\Python36_32bit Python36_32bits
pacman -S --noconfirm mingw64/mingw-w64-x86_64-gcc mingw32/mingw-w64-i686-gcc
@rem enter repo root @rem enter repo root
cd /d %~dp0\..\..\.. cd /d %~dp0\..\..\..

@ -23,7 +23,7 @@ def _threshold_for_count_below(buckets, boundaries, count_below):
if count_so_far == count_below: if count_so_far == count_below:
# this bucket hits the threshold exactly... we should be midway through # this bucket hits the threshold exactly... we should be midway through
# any run of zero values following the bucket # any run of zero values following the bucket
for upper_idx in range(lower_idx + 1, num_buckets): for upper_idx in range(lower_idx + 1, len(buckets)):
if buckets[upper_idx] != 0: if buckets[upper_idx] != 0:
break break
return (boundaries[lower_idx] + boundaries[upper_idx]) / 2.0 return (boundaries[lower_idx] + boundaries[upper_idx]) / 2.0

Loading…
Cancel
Save