From aba0a0a54412fe59bc2090334ae26c99bb148097 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Sun, 10 Sep 2017 18:28:03 -0700 Subject: [PATCH 1/7] Add fallback (use backends from resolver if can't reach balancer) to grpclb. --- include/grpc++/support/channel_arguments.h | 6 + include/grpc/impl/codegen/grpc_types.h | 6 +- .../client_channel/lb_policy/grpclb/grpclb.c | 197 ++++++++++++++---- .../client_channel/lb_policy_factory.c | 2 +- .../client_channel/lb_policy_factory.h | 2 +- src/cpp/common/channel_arguments.cc | 4 + test/cpp/end2end/grpclb_end2end_test.cc | 74 ++++++- 7 files changed, 244 insertions(+), 47 deletions(-) diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h index 7b6befeaf1e..9dc505f0082 100644 --- a/include/grpc++/support/channel_arguments.h +++ b/include/grpc++/support/channel_arguments.h @@ -64,6 +64,12 @@ class ChannelArguments { /// Set the compression algorithm for the channel. void SetCompressionAlgorithm(grpc_compression_algorithm algorithm); + /// Set the grpclb fallback timeout (in ms) for the channel. If this amount + /// of time has passed but we have not gotten any non-empty \a serverlist from + /// the balancer, we will fall back to use the backend address(es) returned by + /// the resolver. + void SetGrpclbFallbackTimeout(int fallback_timeout); + /// Set the socket mutator for the channel. void SetSocketMutator(grpc_socket_mutator* mutator); diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 59b90af03a3..4745fddd71c 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -287,7 +287,11 @@ typedef struct { "grpc.experimental.tcp_max_read_chunk_size" /* Timeout in milliseconds to use for calls to the grpclb load balancer. If 0 or unset, the balancer calls will have no deadline. */ -#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_timeout_ms" +#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms" +/* Timeout in milliseconds to wait for the serverlist from the grpclb load + balancer before using fallback backend addresses from the resolver. + If 0, fallback will never be used. */ +#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms" /** If non-zero, grpc server's cronet compression workaround will be enabled */ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ "grpc.workaround.cronet_compression" diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 5aafed1374d..18979829bd2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -122,6 +122,7 @@ #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 +#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb"); @@ -298,6 +299,10 @@ typedef struct glb_lb_policy { /** timeout in milliseconds for the LB call. 0 means no deadline. */ int lb_call_timeout_ms; + /** timeout in milliseconds for before using fallback backend addresses. + * 0 means not using fallback. */ + int lb_fallback_timeout_ms; + /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -324,6 +329,9 @@ typedef struct glb_lb_policy { * Otherwise, we delegate to the RR policy. */ size_t serverlist_index; + /** stores the backend addresses from the resolver */ + grpc_lb_addresses *fallback_backend_addresses; + /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -344,6 +352,9 @@ typedef struct glb_lb_policy { /** is \a lb_call_retry_timer active? */ bool retry_timer_active; + /** is \a lb_fallback_timer active? */ + bool fallback_timer_active; + /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; @@ -366,6 +377,9 @@ typedef struct glb_lb_policy { /* LB call retry timer callback. */ grpc_closure lb_on_call_retry; + /* LB fallback timer callback. */ + grpc_closure lb_on_fallback; + grpc_call *lb_call; /* streaming call to the LB server, */ grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ @@ -389,6 +403,9 @@ typedef struct glb_lb_policy { /** LB call retry timer */ grpc_timer lb_call_retry_timer; + /** LB fallback timer */ + grpc_timer lb_fallback_timer; + bool initial_request_sent; bool seen_initial_response; @@ -535,6 +552,32 @@ static grpc_lb_addresses *process_serverlist_locked( return lb_addresses; } +/* Returns the backend addresses extracted from the given addresses */ +static grpc_lb_addresses *extract_backend_addresses_locked( + grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) { + /* first pass: count the number of backend addresses */ + size_t num_backends = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (!addresses->addresses[i].is_balancer) { + ++num_backends; + } + } + /* second pass: actually populate the addresses and (empty) LB tokens */ + grpc_lb_addresses *backend_addresses = + grpc_lb_addresses_create(num_backends, &lb_token_vtable); + size_t num_copied = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) continue; + const grpc_resolved_address *addr = &addresses->addresses[i].address; + grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, + addr->len, false /* is_balancer */, + NULL /* balancer_name */, + (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); + ++num_copied; + } + return backend_addresses; +} + static void update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_connectivity_state rr_state, grpc_error *rr_state_error) { @@ -602,35 +645,38 @@ static bool pick_from_internal_rr_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, const grpc_lb_policy_pick_args *pick_args, bool force_async, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { - // Look at the index into the serverlist to see if we should drop this call. - grpc_grpclb_server *server = - glb_policy->serverlist->servers[glb_policy->serverlist_index++]; - if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { - glb_policy->serverlist_index = 0; // Wrap-around. - } - if (server->drop) { - // Not using the RR policy, so unref it. - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); + // Check for drops if we are not using fallback backend addresses. + if (glb_policy->serverlist != NULL) { + // Look at the index into the serverlist to see if we should drop this call. + grpc_grpclb_server *server = + glb_policy->serverlist->servers[glb_policy->serverlist_index++]; + if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { + glb_policy->serverlist_index = 0; // Wrap-around. } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); - // Update client load reporting stats to indicate the number of - // dropped calls. Note that we have to do this here instead of in - // the client_load_reporting filter, because we do not create a - // subchannel call (and therefore no client_load_reporting filter) - // for dropped calls. - grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token, - wc_arg->client_stats); - grpc_grpclb_client_stats_unref(wc_arg->client_stats); - if (force_async) { - GPR_ASSERT(wc_arg->wrapped_closure != NULL); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + if (server->drop) { + // Not using the RR policy, so unref it. + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + grpc_grpclb_client_stats_add_call_dropped_locked( + server->load_balance_token, wc_arg->client_stats); + grpc_grpclb_client_stats_unref(wc_arg->client_stats); + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } gpr_free(wc_arg->free_when_done); - return false; + return true; } - gpr_free(wc_arg->free_when_done); - return true; } // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( @@ -668,8 +714,18 @@ static bool pick_from_internal_rr_locked( static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { - grpc_lb_addresses *addresses = - process_serverlist_locked(exec_ctx, glb_policy->serverlist); + grpc_lb_addresses *addresses; + if (glb_policy->serverlist != NULL) { + GPR_ASSERT(glb_policy->serverlist->num_servers > 0); + addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist); + } else { + // If rr_handover_locked() is invoked when we haven't received any + // serverlist from the balancer, we use the fallback backends returned by + // the resolver. Note that the fallback backend list may be empty, in which + // case the new round_robin policy will keep the requested picks pending. + GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); + addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses); + } GPR_ASSERT(addresses != NULL); grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args)); args->client_channel_factory = glb_policy->cc_factory; @@ -775,8 +831,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { - GPR_ASSERT(glb_policy->serverlist != NULL && - glb_policy->serverlist->num_servers > 0); if (glb_policy->shutting_down) return; grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); GPR_ASSERT(args != NULL); @@ -917,13 +971,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - /* Count the number of gRPC-LB addresses. There must be at least one. - * TODO(roth): For now, we ignore non-balancer addresses, but in the - * future, we may change the behavior such that we fall back to using - * the non-balancer addresses if we cannot reach any balancers. In the - * fallback case, we should use the LB policy indicated by - * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is - * unset, we should default to pick_first). */ + /* Count the number of gRPC-LB addresses. There must be at least one. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { @@ -959,6 +1007,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->lb_call_timeout_ms = grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); + arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); + glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer( + arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, + INT_MAX}); + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. grpc_arg new_arg = @@ -967,6 +1020,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->args = grpc_channel_args_copy_and_add_and_remove( args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + /* Extract the backend addresses (may be empty) from the resolver for + * fallback. */ + glb_policy->fallback_backend_addresses = + extract_backend_addresses_locked(exec_ctx, addresses); + /* Create a client channel over them to communicate with a LB service */ glb_policy->response_generator = grpc_fake_resolver_response_generator_create(); @@ -1010,6 +1068,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } + if (glb_policy->fallback_backend_addresses != NULL) { + grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); + } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); if (glb_policy->pending_update_args != NULL) { grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); @@ -1150,10 +1211,28 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } +static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy); static void start_picking_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { + /* start a timer to fall back */ + if (glb_policy->lb_fallback_timeout_ms > 0 && + glb_policy->serverlist == NULL) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = gpr_time_add( + now, + gpr_time_from_millis(glb_policy->lb_fallback_timeout_ms, GPR_TIMESPAN)); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); + GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, + glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); + glb_policy->fallback_timer_active = true; + grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline, + &glb_policy->lb_on_fallback, now); + } + glb_policy->started_picking = true; gpr_backoff_reset(&glb_policy->lb_call_backoff_state); query_for_backends_locked(exec_ctx, glb_policy); @@ -1607,6 +1686,15 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, if (glb_policy->serverlist != NULL) { /* dispose of the old serverlist */ grpc_grpclb_destroy_serverlist(glb_policy->serverlist); + } else { + /* or dispose of the fallback */ + grpc_lb_addresses_destroy(exec_ctx, + glb_policy->fallback_backend_addresses); + glb_policy->fallback_backend_addresses = NULL; + if (glb_policy->fallback_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer); + glb_policy->fallback_timer_active = false; + } } /* and update the copy in the glb_lb_policy instance. This * serverlist instance will be destroyed either upon the next @@ -1617,9 +1705,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, } } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Received empty server list. Picks will stay pending until " - "a response with > 0 servers is received"); + gpr_log(GPR_INFO, "Received empty server list, ignoring."); } grpc_grpclb_destroy_serverlist(serverlist); } @@ -1666,6 +1752,26 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); } +static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + /* If we receive a serverlist after the timer fires but before this callback + * actually runs, don't do anything. */ + if (glb_policy->serverlist != NULL) return; + glb_policy->fallback_timer_active = false; + if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "Falling back to use backends from resolver (grpclb %p)", + (void *)glb_policy); + } + GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); + rr_handover_locked(exec_ctx, glb_policy); + } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "grpclb_fallback_timer"); +} + static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; @@ -1786,6 +1892,17 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, &glb_policy->lb_channel_connectivity, &glb_policy->lb_channel_on_connectivity_changed, NULL); } + + // Propagate update to fallback_backend_addresses if a non-empty serverlist + // hasn't been received from the balancer. + if (glb_policy->serverlist == NULL) { + grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); + glb_policy->fallback_backend_addresses = + extract_backend_addresses_locked(exec_ctx, addresses); + if (glb_policy->rr_policy != NULL) { + rr_handover_locked(exec_ctx, glb_policy); + } + } } // Invoked as part of the update process. It continues watching the LB channel diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c index cdcaf17544b..918bab745c6 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.c +++ b/src/core/ext/filters/client_channel/lb_policy_factory.c @@ -56,7 +56,7 @@ grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) { } void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, - void* address, size_t address_len, + const void* address, size_t address_len, bool is_balancer, const char* balancer_name, void* user_data) { GPR_ASSERT(index < addresses->num_addresses); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index 9d9fb143df5..cf0f8cb6157 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -73,7 +73,7 @@ grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses); * \a address is a socket address of length \a address_len. * Takes ownership of \a balancer_name. */ void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index, - void *address, size_t address_len, + const void *address, size_t address_len, bool is_balancer, const char *balancer_name, void *user_data); diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc index f130aecd4b5..f89f5f1f03d 100644 --- a/src/cpp/common/channel_arguments.cc +++ b/src/cpp/common/channel_arguments.cc @@ -86,6 +86,10 @@ void ChannelArguments::SetCompressionAlgorithm( SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm); } +void ChannelArguments::SetGrpclbFallbackTimeout(int fallback_timeout) { + SetInt(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS, fallback_timeout); +} + void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) { if (!mutator) { return; diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 570a3d10677..17a094f7a21 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -368,8 +368,9 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_fake_resolver_response_generator_unref(response_generator_); } - void ResetStub() { + void ResetStub(int fallback_timeout = 0) { ChannelArguments args; + args.SetGrpclbFallbackTimeout(fallback_timeout); args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_); std::ostringstream uri; @@ -441,10 +442,10 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_exec_ctx_finish(&exec_ctx); } - const std::vector GetBackendPorts() const { + const std::vector GetBackendPorts(const size_t start_index = 0) const { std::vector backend_ports; - for (const auto& bs : backend_servers_) { - backend_ports.push_back(bs.port_); + for (size_t i = start_index; i < backend_servers_.size(); ++i) { + backend_ports.push_back(backend_servers_[i].port_); } return backend_ports; } @@ -615,6 +616,71 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, Fallback) { + const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); + const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); + const size_t kNumBackendInResolution = backends_.size() / 2; + + ResetStub(kFallbackTimeoutMs); + std::vector addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""}); + } + SetNextResolution(addresses); + + // Send non-empty serverlist only after kServerlistDelayMs + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(kNumBackendInResolution /* start_index */), {}), + kServerlistDelayMs); + + // The first request. The client will block while it's still trying to + // contact the balancer. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(kNumBackendInResolution); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + // Fallback is used: each backend returned by the resolver should have + // gotten one request. + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); + } + for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) { + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); + } + + // Wait until update has been processed, as signaled by the backend returned + // by the balancer receiving a request. + do { + CheckRpcSendOk(1); + } while ( + backend_servers_[kNumBackendInResolution].service_->request_count() == 0); + for (size_t i = 0; i < backends_.size(); ++i) { + backend_servers_[i].service_->ResetCounters(); + } + + // Send out the second request. + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + CheckRpcSendOk(backends_.size() - kNumBackendInResolution); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + + // Serverlist is used: each backend returned by the balancer should + // have gotten one request. + for (size_t i = 0; i < kNumBackendInResolution; ++i) { + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); + } + for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) { + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); + } + + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); +} + TEST_F(SingleBalancerTest, BackendsRestart) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( From 4b7ef4d52546e05a1b33aac1eeb218409c4b0a3a Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 11 Sep 2017 23:09:22 -0700 Subject: [PATCH 2/7] Build a wall and make 'u' pay for it --- src/core/lib/iomgr/ev_epoll1_linux.c | 143 +++++++++++++-------------- 1 file changed, 71 insertions(+), 72 deletions(-) diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 5bc7e878de0..4efd705fa81 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -160,18 +160,18 @@ struct grpc_pollset_worker { (worker)->kick_state_mutator = __LINE__; \ } while (false) -#define MAX_NEIGHBOURHOODS 1024 +#define MAX_NEIGHBORHOODS 1024 -typedef struct pollset_neighbourhood { +typedef struct pollset_neighborhood { gpr_mu mu; grpc_pollset *active_root; char pad[GPR_CACHELINE_SIZE]; -} pollset_neighbourhood; +} pollset_neighborhood; struct grpc_pollset { gpr_mu mu; - pollset_neighbourhood *neighbourhood; - bool reassigning_neighbourhood; + pollset_neighborhood *neighborhood; + bool reassigning_neighborhood; grpc_pollset_worker *root_worker; bool kicked_without_poller; @@ -384,8 +384,8 @@ GPR_TLS_DECL(g_current_thread_worker); /* The designated poller */ static gpr_atm g_active_poller; -static pollset_neighbourhood *g_neighbourhoods; -static size_t g_num_neighbourhoods; +static pollset_neighborhood *g_neighborhoods; +static size_t g_num_neighborhoods; /* Return true if first in list */ static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) { @@ -424,8 +424,8 @@ static worker_remove_result worker_remove(grpc_pollset *pollset, } } -static size_t choose_neighbourhood(void) { - return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods; +static size_t choose_neighborhood(void) { + return (size_t)gpr_cpu_current_cpu() % g_num_neighborhoods; } static grpc_error *pollset_global_init(void) { @@ -441,11 +441,11 @@ static grpc_error *pollset_global_init(void) { &ev) != 0) { return GRPC_OS_ERROR(errno, "epoll_ctl"); } - g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS); - g_neighbourhoods = (pollset_neighbourhood *)gpr_zalloc( - sizeof(*g_neighbourhoods) * g_num_neighbourhoods); - for (size_t i = 0; i < g_num_neighbourhoods; i++) { - gpr_mu_init(&g_neighbourhoods[i].mu); + g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS); + g_neighborhoods = (pollset_neighborhood *)gpr_zalloc( + sizeof(*g_neighborhoods) * g_num_neighborhoods); + for (size_t i = 0; i < g_num_neighborhoods; i++) { + gpr_mu_init(&g_neighborhoods[i].mu); } return GRPC_ERROR_NONE; } @@ -454,17 +454,17 @@ static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd); - for (size_t i = 0; i < g_num_neighbourhoods; i++) { - gpr_mu_destroy(&g_neighbourhoods[i].mu); + for (size_t i = 0; i < g_num_neighborhoods; i++) { + gpr_mu_destroy(&g_neighborhoods[i].mu); } - gpr_free(g_neighbourhoods); + gpr_free(g_neighborhoods); } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); *mu = &pollset->mu; - pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()]; - pollset->reassigning_neighbourhood = false; + pollset->neighborhood = &g_neighborhoods[choose_neighborhood()]; + pollset->reassigning_neighborhood = false; pollset->root_worker = NULL; pollset->kicked_without_poller = false; pollset->seen_inactive = true; @@ -477,26 +477,26 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { gpr_mu_lock(&pollset->mu); if (!pollset->seen_inactive) { - pollset_neighbourhood *neighbourhood = pollset->neighbourhood; + pollset_neighborhood *neighborhood = pollset->neighborhood; gpr_mu_unlock(&pollset->mu); - retry_lock_neighbourhood: - gpr_mu_lock(&neighbourhood->mu); + retry_lock_neighborhood: + gpr_mu_lock(&neighborhood->mu); gpr_mu_lock(&pollset->mu); if (!pollset->seen_inactive) { - if (pollset->neighbourhood != neighbourhood) { - gpr_mu_unlock(&neighbourhood->mu); - neighbourhood = pollset->neighbourhood; + if (pollset->neighborhood != neighborhood) { + gpr_mu_unlock(&neighborhood->mu); + neighborhood = pollset->neighborhood; gpr_mu_unlock(&pollset->mu); - goto retry_lock_neighbourhood; + goto retry_lock_neighborhood; } pollset->prev->next = pollset->next; pollset->next->prev = pollset->prev; - if (pollset == pollset->neighbourhood->active_root) { - pollset->neighbourhood->active_root = + if (pollset == pollset->neighborhood->active_root) { + pollset->neighborhood->active_root = pollset->next == pollset ? NULL : pollset->next; } } - gpr_mu_unlock(&pollset->neighbourhood->mu); + gpr_mu_unlock(&pollset->neighborhood->mu); } gpr_mu_unlock(&pollset->mu); gpr_mu_destroy(&pollset->mu); @@ -675,16 +675,16 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, // pollset has been observed to be inactive, we need to move back to the // active list bool is_reassigning = false; - if (!pollset->reassigning_neighbourhood) { + if (!pollset->reassigning_neighborhood) { is_reassigning = true; - pollset->reassigning_neighbourhood = true; - pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()]; + pollset->reassigning_neighborhood = true; + pollset->neighborhood = &g_neighborhoods[choose_neighborhood()]; } - pollset_neighbourhood *neighbourhood = pollset->neighbourhood; + pollset_neighborhood *neighborhood = pollset->neighborhood; gpr_mu_unlock(&pollset->mu); // pollset unlocked: state may change (even worker->kick_state) - retry_lock_neighbourhood: - gpr_mu_lock(&neighbourhood->mu); + retry_lock_neighborhood: + gpr_mu_lock(&neighborhood->mu); gpr_mu_lock(&pollset->mu); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", @@ -692,17 +692,17 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, is_reassigning); } if (pollset->seen_inactive) { - if (neighbourhood != pollset->neighbourhood) { - gpr_mu_unlock(&neighbourhood->mu); - neighbourhood = pollset->neighbourhood; + if (neighborhood != pollset->neighborhood) { + gpr_mu_unlock(&neighborhood->mu); + neighborhood = pollset->neighborhood; gpr_mu_unlock(&pollset->mu); - goto retry_lock_neighbourhood; + goto retry_lock_neighborhood; } /* In the brief time we released the pollset locks above, the worker MAY have been kicked. In this case, the worker should get out of this pollset ASAP and hence this should neither add the pollset to - neighbourhood nor mark the pollset as active. + neighborhood nor mark the pollset as active. On a side note, the only way a worker's kick state could have changed at this point is if it were "kicked specifically". Since the worker has @@ -710,25 +710,25 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, not visible in the "kick any" path yet */ if (worker->kick_state == UNKICKED) { pollset->seen_inactive = false; - if (neighbourhood->active_root == NULL) { - neighbourhood->active_root = pollset->next = pollset->prev = pollset; + if (neighborhood->active_root == NULL) { + neighborhood->active_root = pollset->next = pollset->prev = pollset; /* Make this the designated poller if there isn't one already */ if (worker->kick_state == UNKICKED && gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { SET_KICK_STATE(worker, DESIGNATED_POLLER); } } else { - pollset->next = neighbourhood->active_root; + pollset->next = neighborhood->active_root; pollset->prev = pollset->next->prev; pollset->next->prev = pollset->prev->next = pollset; } } } if (is_reassigning) { - GPR_ASSERT(pollset->reassigning_neighbourhood); - pollset->reassigning_neighbourhood = false; + GPR_ASSERT(pollset->reassigning_neighborhood); + pollset->reassigning_neighborhood = false; } - gpr_mu_unlock(&neighbourhood->mu); + gpr_mu_unlock(&neighborhood->mu); } worker_insert(pollset, worker); @@ -763,7 +763,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, } /* We release pollset lock in this function at a couple of places: - * 1. Briefly when assigning pollset to a neighbourhood + * 1. Briefly when assigning pollset to a neighborhood * 2. When doing gpr_cv_wait() * It is possible that 'kicked_without_poller' was set to true during (1) and * 'shutting_down' is set to true during (1) or (2). If either of them is @@ -781,12 +781,12 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; } -static bool check_neighbourhood_for_available_poller( - pollset_neighbourhood *neighbourhood) { - GPR_TIMER_BEGIN("check_neighbourhood_for_available_poller", 0); +static bool check_neighborhood_for_available_poller( + pollset_neighborhood *neighborhood) { + GPR_TIMER_BEGIN("check_neighborhood_for_available_poller", 0); bool found_worker = false; do { - grpc_pollset *inspect = neighbourhood->active_root; + grpc_pollset *inspect = neighborhood->active_root; if (inspect == NULL) { break; } @@ -831,8 +831,8 @@ static bool check_neighbourhood_for_available_poller( gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect); } inspect->seen_inactive = true; - if (inspect == neighbourhood->active_root) { - neighbourhood->active_root = + if (inspect == neighborhood->active_root) { + neighborhood->active_root = inspect->next == inspect ? NULL : inspect->next; } inspect->next->prev = inspect->prev; @@ -841,7 +841,7 @@ static bool check_neighbourhood_for_available_poller( } gpr_mu_unlock(&inspect->mu); } while (!found_worker); - GPR_TIMER_END("check_neighbourhood_for_available_poller", 0); + GPR_TIMER_END("check_neighborhood_for_available_poller", 0); return found_worker; } @@ -873,32 +873,31 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { gpr_atm_no_barrier_store(&g_active_poller, 0); - size_t poller_neighbourhood_idx = - (size_t)(pollset->neighbourhood - g_neighbourhoods); + size_t poller_neighborhood_idx = + (size_t)(pollset->neighborhood - g_neighborhoods); gpr_mu_unlock(&pollset->mu); bool found_worker = false; - bool scan_state[MAX_NEIGHBOURHOODS]; - for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { - pollset_neighbourhood *neighbourhood = - &g_neighbourhoods[(poller_neighbourhood_idx + i) % - g_num_neighbourhoods]; - if (gpr_mu_trylock(&neighbourhood->mu)) { - found_worker = - check_neighbourhood_for_available_poller(neighbourhood); - gpr_mu_unlock(&neighbourhood->mu); + bool scan_state[MAX_NEIGHBORHOODS]; + for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) { + pollset_neighborhood *neighborhood = + &g_neighborhoods[(poller_neighborhood_idx + i) % + g_num_neighborhoods]; + if (gpr_mu_trylock(&neighborhood->mu)) { + found_worker = check_neighborhood_for_available_poller(neighborhood); + gpr_mu_unlock(&neighborhood->mu); scan_state[i] = true; } else { scan_state[i] = false; } } - for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { + for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) { if (scan_state[i]) continue; - pollset_neighbourhood *neighbourhood = - &g_neighbourhoods[(poller_neighbourhood_idx + i) % - g_num_neighbourhoods]; - gpr_mu_lock(&neighbourhood->mu); - found_worker = check_neighbourhood_for_available_poller(neighbourhood); - gpr_mu_unlock(&neighbourhood->mu); + pollset_neighborhood *neighborhood = + &g_neighborhoods[(poller_neighborhood_idx + i) % + g_num_neighborhoods]; + gpr_mu_lock(&neighborhood->mu); + found_worker = check_neighborhood_for_available_poller(neighborhood); + gpr_mu_unlock(&neighborhood->mu); } grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); From 707145c78a7e1d65ddf14836adbbb7032172a3a0 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 13 Sep 2017 10:02:58 +0200 Subject: [PATCH 3/7] mingw already installed on win kokoro workers --- tools/internal_ci/windows/grpc_build_artifacts.bat | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/internal_ci/windows/grpc_build_artifacts.bat b/tools/internal_ci/windows/grpc_build_artifacts.bat index 17d9571d43e..c6e523697ca 100644 --- a/tools/internal_ci/windows/grpc_build_artifacts.bat +++ b/tools/internal_ci/windows/grpc_build_artifacts.bat @@ -19,8 +19,6 @@ rename C:\Python34_32bit Python34_32bits rename C:\Python35_32bit Python35_32bits rename C:\Python36_32bit Python36_32bits -pacman -S --noconfirm mingw64/mingw-w64-x86_64-gcc mingw32/mingw-w64-i686-gcc - @rem enter repo root cd /d %~dp0\..\..\.. From e97bc51d3966aaa6d8bbcd6ec9238063bba8dad2 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 13 Sep 2017 12:26:32 +0200 Subject: [PATCH 4/7] already installed on mac workers --- .../helper_scripts/prepare_build_macos_rc | 13 +------------ tools/internal_ci/macos/grpc_build_artifacts.sh | 12 ------------ 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc index f7fbec93ffa..dd2741e595b 100644 --- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc +++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc @@ -31,23 +31,16 @@ ulimit -a pip install google-api-python-client --user python export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json -# required to build protobuf -brew install gflags - set +ex # rvm script is very verbose and exits with errorcode source $HOME/.rvm/scripts/rvm set -e # rvm commands are very verbose -rvm install ruby-2.4 +rvm use ruby-2.4 rvm osx-ssl-certs status all rvm osx-ssl-certs update all set -ex -gem install bundler - # cocoapods export LANG=en_US.UTF-8 -gem install cocoapods -gem install xcpretty pod repo update # needed by python # python @@ -56,10 +49,6 @@ pip install virtualenv --user python pip install -U six tox setuptools --user python export PYTHONPATH=/Library/Python/3.4/site-packages -# python 3.4 -wget -q https://www.python.org/ftp/python/3.4.4/python-3.4.4-macosx10.6.pkg -sudo installer -pkg python-3.4.4-macosx10.6.pkg -target / - # set xcode version for Obj-C tests sudo xcode-select -switch /Applications/Xcode_8.2.1.app/Contents/Developer diff --git a/tools/internal_ci/macos/grpc_build_artifacts.sh b/tools/internal_ci/macos/grpc_build_artifacts.sh index 603c15f210f..09784e3bb47 100755 --- a/tools/internal_ci/macos/grpc_build_artifacts.sh +++ b/tools/internal_ci/macos/grpc_build_artifacts.sh @@ -20,25 +20,13 @@ cd $(dirname $0)/../../.. source tools/internal_ci/helper_scripts/prepare_build_macos_rc -# python 3.5 -wget -q https://www.python.org/ftp/python/3.5.2/python-3.5.2-macosx10.6.pkg -sudo installer -pkg python-3.5.2-macosx10.6.pkg -target / - # install cython for all python versions python2.7 -m pip install cython setuptools wheel python3.4 -m pip install cython setuptools wheel python3.5 -m pip install cython setuptools wheel python3.6 -m pip install cython setuptools wheel -# node-gyp (needed for node artifacts) -npm install -g node-gyp - -# php dependencies: pecl -curl -O http://pear.php.net/go-pear.phar -sudo php -d detect_unicode=0 go-pear.phar - # needed to build ruby artifacts -gem install rake-compiler wget https://raw.githubusercontent.com/grpc/grpc/master/tools/distrib/build_ruby_environment_macos.sh bash build_ruby_environment_macos.sh From 70bbb6b1176626346efc7c7549fb027896f8d8c2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 13 Sep 2017 09:04:30 -0700 Subject: [PATCH 5/7] Fix schema generation --- src/core/lib/debug/stats_data_bq_schema.sql | 70 ++++++++++----------- tools/codegen/core/gen_stats_data.py | 2 +- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 7291bbf0709..53547692918 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -1,35 +1,35 @@ -client_calls_created_per_iteration:INTEGER, -server_calls_created_per_iteration:INTEGER, -syscall_poll_per_iteration:INTEGER, -syscall_wait_per_iteration:INTEGER, -histogram_slow_lookups_per_iteration:INTEGER, -syscall_write_per_iteration:INTEGER, -syscall_read_per_iteration:INTEGER, -tcp_backup_pollers_created_per_iteration:INTEGER, -tcp_backup_poller_polls_per_iteration:INTEGER, -http2_op_batches_per_iteration:INTEGER, -http2_op_cancel_per_iteration:INTEGER, -http2_op_send_initial_metadata_per_iteration:INTEGER, -http2_op_send_message_per_iteration:INTEGER, -http2_op_send_trailing_metadata_per_iteration:INTEGER, -http2_op_recv_initial_metadata_per_iteration:INTEGER, -http2_op_recv_message_per_iteration:INTEGER, -http2_op_recv_trailing_metadata_per_iteration:INTEGER, -http2_settings_writes_per_iteration:INTEGER, -http2_pings_sent_per_iteration:INTEGER, -http2_writes_begun_per_iteration:INTEGER, -http2_writes_offloaded_per_iteration:INTEGER, -http2_writes_continued_per_iteration:INTEGER, -http2_partial_writes_per_iteration:INTEGER, -combiner_locks_initiated_per_iteration:INTEGER, -combiner_locks_scheduled_items_per_iteration:INTEGER, -combiner_locks_scheduled_final_items_per_iteration:INTEGER, -combiner_locks_offloaded_per_iteration:INTEGER, -executor_scheduled_short_items_per_iteration:INTEGER, -executor_scheduled_long_items_per_iteration:INTEGER, -executor_scheduled_to_self_per_iteration:INTEGER, -executor_wakeup_initiated_per_iteration:INTEGER, -executor_queue_drained_per_iteration:INTEGER, -executor_push_retries_per_iteration:INTEGER, -server_requested_calls_per_iteration:INTEGER, -server_slowpath_requests_queued_per_iteration:INTEGER +client_calls_created_per_iteration:FLOAT, +server_calls_created_per_iteration:FLOAT, +syscall_poll_per_iteration:FLOAT, +syscall_wait_per_iteration:FLOAT, +histogram_slow_lookups_per_iteration:FLOAT, +syscall_write_per_iteration:FLOAT, +syscall_read_per_iteration:FLOAT, +tcp_backup_pollers_created_per_iteration:FLOAT, +tcp_backup_poller_polls_per_iteration:FLOAT, +http2_op_batches_per_iteration:FLOAT, +http2_op_cancel_per_iteration:FLOAT, +http2_op_send_initial_metadata_per_iteration:FLOAT, +http2_op_send_message_per_iteration:FLOAT, +http2_op_send_trailing_metadata_per_iteration:FLOAT, +http2_op_recv_initial_metadata_per_iteration:FLOAT, +http2_op_recv_message_per_iteration:FLOAT, +http2_op_recv_trailing_metadata_per_iteration:FLOAT, +http2_settings_writes_per_iteration:FLOAT, +http2_pings_sent_per_iteration:FLOAT, +http2_writes_begun_per_iteration:FLOAT, +http2_writes_offloaded_per_iteration:FLOAT, +http2_writes_continued_per_iteration:FLOAT, +http2_partial_writes_per_iteration:FLOAT, +combiner_locks_initiated_per_iteration:FLOAT, +combiner_locks_scheduled_items_per_iteration:FLOAT, +combiner_locks_scheduled_final_items_per_iteration:FLOAT, +combiner_locks_offloaded_per_iteration:FLOAT, +executor_scheduled_short_items_per_iteration:FLOAT, +executor_scheduled_long_items_per_iteration:FLOAT, +executor_scheduled_to_self_per_iteration:FLOAT, +executor_wakeup_initiated_per_iteration:FLOAT, +executor_queue_drained_per_iteration:FLOAT, +executor_push_retries_per_iteration:FLOAT, +server_requested_calls_per_iteration:FLOAT, +server_slowpath_requests_queued_per_iteration:FLOAT diff --git a/tools/codegen/core/gen_stats_data.py b/tools/codegen/core/gen_stats_data.py index 877a5d9d430..8359734c848 100755 --- a/tools/codegen/core/gen_stats_data.py +++ b/tools/codegen/core/gen_stats_data.py @@ -405,6 +405,6 @@ with open('tools/run_tests/performance/massage_qps_stats.py', 'w') as P: with open('src/core/lib/debug/stats_data_bq_schema.sql', 'w') as S: columns = [] for counter in inst_map['Counter']: - columns.append(('%s_per_iteration' % counter.name, 'INTEGER')) + columns.append(('%s_per_iteration' % counter.name, 'FLOAT')) print >>S, ',\n'.join('%s:%s' % x for x in columns) From f2cbd6c81eab267fb8c04121ff76ffbbe689dfa1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 13 Sep 2017 09:31:16 -0700 Subject: [PATCH 6/7] Fix qps_test histogram python code --- tools/run_tests/performance/massage_qps_stats_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/run_tests/performance/massage_qps_stats_helpers.py b/tools/run_tests/performance/massage_qps_stats_helpers.py index 400a0c82bfd..a2fe4ae6c34 100644 --- a/tools/run_tests/performance/massage_qps_stats_helpers.py +++ b/tools/run_tests/performance/massage_qps_stats_helpers.py @@ -23,7 +23,7 @@ def _threshold_for_count_below(buckets, boundaries, count_below): if count_so_far == count_below: # this bucket hits the threshold exactly... we should be midway through # any run of zero values following the bucket - for upper_idx in range(lower_idx + 1, num_buckets): + for upper_idx in range(lower_idx + 1, len(buckets)): if buckets[upper_idx] != 0: break return (boundaries[lower_idx] + boundaries[upper_idx]) / 2.0 From 5160155005c7d0f1ee9b855dfbe6aa45d8602bf1 Mon Sep 17 00:00:00 2001 From: Matt Kwong Date: Tue, 12 Sep 2017 09:31:07 -0700 Subject: [PATCH 7/7] Upload Kokoro MacOS master results --- tools/internal_ci/macos/grpc_basictests_dbg.cfg | 2 +- tools/internal_ci/macos/grpc_basictests_opt.cfg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/internal_ci/macos/grpc_basictests_dbg.cfg b/tools/internal_ci/macos/grpc_basictests_dbg.cfg index f058f0c7e47..53bda1ff0a6 100644 --- a/tools/internal_ci/macos/grpc_basictests_dbg.cfg +++ b/tools/internal_ci/macos/grpc_basictests_dbg.cfg @@ -27,5 +27,5 @@ action { env_vars { key: "RUN_TESTS_FLAGS" - value: "-f basictests macos dbg --internal_ci -j 1 --inner_jobs 4" + value: "-f basictests macos dbg --internal_ci -j 1 --inner_jobs 4 --bq_result_table aggregate_results" } diff --git a/tools/internal_ci/macos/grpc_basictests_opt.cfg b/tools/internal_ci/macos/grpc_basictests_opt.cfg index 5048baaf483..d359eb601a0 100644 --- a/tools/internal_ci/macos/grpc_basictests_opt.cfg +++ b/tools/internal_ci/macos/grpc_basictests_opt.cfg @@ -27,5 +27,5 @@ action { env_vars { key: "RUN_TESTS_FLAGS" - value: "-f basictests macos opt --internal_ci -j 1 --inner_jobs 4" + value: "-f basictests macos opt --internal_ci -j 1 --inner_jobs 4 --bq_result_table aggregate_results" }