Merge master

pull/13058/head
Yash Tibrewal 7 years ago
commit 36cd68f0d5
  1. 5
      include/grpc++/server_builder.h
  2. 149
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 209
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  4. 28
      src/core/lib/iomgr/timer_generic.cc

@ -202,10 +202,7 @@ class ServerBuilder {
struct SyncServerSettings { struct SyncServerSettings {
SyncServerSettings() SyncServerSettings()
: num_cqs(GPR_MAX(1, gpr_cpu_num_cores())), : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
min_pollers(1),
max_pollers(2),
cq_timeout_msec(10000) {}
/// Number of server completion queues to create to listen to incoming RPCs. /// Number of server completion queues to create to listen to incoming RPCs.
int num_cqs; int num_cqs;

@ -175,6 +175,10 @@ typedef struct wrapped_rr_closure_arg {
/* The RR instance related to the closure */ /* The RR instance related to the closure */
grpc_lb_policy* rr_policy; grpc_lb_policy* rr_policy;
/* The grpclb instance that created the wrapping. This instance is not owned,
* reference counts are untouched. It's used only for logging purposes. */
grpc_lb_policy* glb_policy;
/* heap memory to be freed upon closure execution. */ /* heap memory to be freed upon closure execution. */
void* free_when_done; void* free_when_done;
} wrapped_rr_closure_arg; } wrapped_rr_closure_arg;
@ -198,10 +202,11 @@ static void wrapped_rr_closure(void* arg, grpc_error* error) {
wc_arg->lb_token_mdelem_storage, wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token)); GRPC_MDELEM_REF(wc_arg->lb_token));
} else { } else {
gpr_log(GPR_ERROR, gpr_log(
"No LB token for connected subchannel pick %p (from RR " GPR_ERROR,
"instance %p).", "[grpclb %p] No LB token for connected subchannel pick %p (from RR "
(void*)*wc_arg->target, (void*)wc_arg->rr_policy); "instance %p).",
wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy);
abort(); abort();
} }
// Pass on client stats via context. Passes ownership of the reference. // Pass on client stats via context. Passes ownership of the reference.
@ -212,7 +217,8 @@ static void wrapped_rr_closure(void* arg, grpc_error* error) {
grpc_grpclb_client_stats_unref(wc_arg->client_stats); grpc_grpclb_client_stats_unref(wc_arg->client_stats);
} }
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR %p", (void*)wc_arg->rr_policy); gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy,
wc_arg->rr_policy);
} }
GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure"); GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure");
} }
@ -618,8 +624,10 @@ static void update_lb_connectivity_status_locked(
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log( gpr_log(
GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.", GPR_INFO,
grpc_connectivity_state_name(rr_state), (void*)glb_policy->rr_policy); "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
glb_policy, grpc_connectivity_state_name(rr_state),
glb_policy->rr_policy);
} }
grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state, grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state,
rr_state_error, rr_state_error,
@ -646,8 +654,8 @@ static bool pick_from_internal_rr_locked(
if (server->drop) { if (server->drop) {
// Not using the RR policy, so unref it. // Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy,
(intptr_t)wc_arg->rr_policy); wc_arg->rr_policy);
} }
GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
// Update client load reporting stats to indicate the number of // Update client load reporting stats to indicate the number of
@ -655,6 +663,7 @@ static bool pick_from_internal_rr_locked(
// the client_load_reporting filter, because we do not create a // the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter) // subchannel call (and therefore no client_load_reporting filter)
// for dropped calls. // for dropped calls.
GPR_ASSERT(wc_arg->client_stats != NULL);
grpc_grpclb_client_stats_add_call_dropped_locked( grpc_grpclb_client_stats_add_call_dropped_locked(
server->load_balance_token, wc_arg->client_stats); server->load_balance_token, wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats); grpc_grpclb_client_stats_unref(wc_arg->client_stats);
@ -675,8 +684,8 @@ static bool pick_from_internal_rr_locked(
if (pick_done) { if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy,
(intptr_t)wc_arg->rr_policy); wc_arg->rr_policy);
} }
GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
/* add the load reporting initial metadata */ /* add the load reporting initial metadata */
@ -743,12 +752,13 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args); grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args);
if (new_rr_policy == NULL) { if (new_rr_policy == NULL) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"Failure creating a RoundRobin policy for serverlist update with " "[grpclb %p] Failure creating a RoundRobin policy for serverlist "
"%lu entries. The previous RR instance (%p), if any, will continue " "update with %" PRIuPTR
"to be used. Future updates from the LB will attempt to create new " " entries. The previous RR instance (%p), if any, will continue to "
"be used. Future updates from the LB will attempt to create new "
"instances.", "instances.",
(unsigned long)glb_policy->serverlist->num_servers, glb_policy, glb_policy->serverlist->num_servers,
(void*)glb_policy->rr_policy); glb_policy->rr_policy);
return; return;
} }
glb_policy->rr_policy = new_rr_policy; glb_policy->rr_policy = new_rr_policy;
@ -790,8 +800,9 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
pp->wrapped_on_complete_arg.client_stats = pp->wrapped_on_complete_arg.client_stats =
grpc_grpclb_client_stats_ref(glb_policy->client_stats); grpc_grpclb_client_stats_ref(glb_policy->client_stats);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p", gpr_log(GPR_INFO,
(void*)glb_policy->rr_policy); "[grpclb %p] Pending pick about to (async) PICK from RR %p",
glb_policy, glb_policy->rr_policy);
} }
pick_from_internal_rr_locked(glb_policy, &pp->pick_args, pick_from_internal_rr_locked(glb_policy, &pp->pick_args,
true /* force_async */, pp->target, true /* force_async */, pp->target,
@ -804,8 +815,8 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
(intptr_t)glb_policy->rr_policy); glb_policy, glb_policy->rr_policy);
} }
grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, grpc_lb_policy_ping_one_locked(glb_policy->rr_policy,
&pping->wrapped_notify_arg.wrapper_closure); &pping->wrapped_notify_arg.wrapper_closure);
@ -819,15 +830,15 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(args != NULL); GPR_ASSERT(args != NULL);
if (glb_policy->rr_policy != NULL) { if (glb_policy->rr_policy != NULL) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)", gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy,
(void*)glb_policy->rr_policy); glb_policy->rr_policy);
} }
grpc_lb_policy_update_locked(glb_policy->rr_policy, args); grpc_lb_policy_update_locked(glb_policy->rr_policy, args);
} else { } else {
create_rr_locked(glb_policy, args); create_rr_locked(glb_policy, args);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)", gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy,
(void*)glb_policy->rr_policy); glb_policy->rr_policy);
} }
} }
lb_policy_args_destroy(args); lb_policy_args_destroy(args);
@ -1159,8 +1170,8 @@ static int glb_pick_locked(grpc_lb_policy* pol,
if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"grpclb %p NOT picking from from RR %p: RR conn state=%s", "[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
(void*)glb_policy, (void*)glb_policy->rr_policy, glb_policy, glb_policy->rr_policy,
grpc_connectivity_state_name(rr_connectivity_state)); grpc_connectivity_state_name(rr_connectivity_state));
} }
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
@ -1168,8 +1179,8 @@ static int glb_pick_locked(grpc_lb_policy* pol,
pick_done = false; pick_done = false;
} else { // RR not in shutdown } else { // RR not in shutdown
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p", gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
(void*)glb_policy, (void*)glb_policy->rr_policy); glb_policy->rr_policy);
} }
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
wrapped_rr_closure_arg* wc_arg = wrapped_rr_closure_arg* wc_arg =
@ -1186,15 +1197,15 @@ static int glb_pick_locked(grpc_lb_policy* pol,
wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->initial_metadata = pick_args->initial_metadata;
wc_arg->free_when_done = wc_arg; wc_arg->free_when_done = wc_arg;
wc_arg->glb_policy = pol;
pick_done = pick_from_internal_rr_locked( pick_done = pick_from_internal_rr_locked(
glb_policy, pick_args, false /* force_async */, target, wc_arg); glb_policy, pick_args, false /* force_async */, target, wc_arg);
} }
} else { // glb_policy->rr_policy == NULL } else { // glb_policy->rr_policy == NULL
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"No RR policy in grpclb instance %p. Adding to grpclb's pending " "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
"picks", glb_policy);
(void*)(glb_policy));
} }
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
on_complete); on_complete);
@ -1239,8 +1250,7 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
if (!glb_policy->shutting_down && glb_policy->lb_call == NULL && if (!glb_policy->shutting_down && glb_policy->lb_call == NULL &&
error == GRPC_ERROR_NONE) { error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy);
(void*)glb_policy);
} }
query_for_backends_locked(glb_policy); query_for_backends_locked(glb_policy);
} }
@ -1259,14 +1269,16 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state) grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state)
.next_attempt_start_time; .next_attempt_start_time;
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...", gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
(void*)glb_policy); glb_policy);
grpc_millis timeout = next_try - grpc_exec_ctx_now(); grpc_millis timeout = next_try - grpc_exec_ctx_now();
if (timeout > 0) { if (timeout > 0) {
gpr_log(GPR_DEBUG, "... retry_timer_active in %" PRIdPTR "ms.", gpr_log(GPR_DEBUG,
timeout); "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
glb_policy, timeout);
} else { } else {
gpr_log(GPR_DEBUG, "... retry_timer_active immediately."); gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
glb_policy);
} }
} }
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
@ -1360,7 +1372,7 @@ static void send_client_load_report_locked(void* arg, grpc_error* error) {
grpc_call_error call_error = grpc_call_start_batch_and_execute( grpc_call_error call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
if (call_error != GRPC_CALL_OK) { if (call_error != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "call_error=%d", call_error); gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error); GPR_ASSERT(GRPC_CALL_OK == call_error);
} }
} }
@ -1449,9 +1461,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)", "[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)",
(void*)glb_policy, (void*)glb_policy->lb_channel, glb_policy, glb_policy->lb_channel, glb_policy->lb_call);
(void*)glb_policy->lb_call);
} }
GPR_ASSERT(glb_policy->lb_call != NULL); GPR_ASSERT(glb_policy->lb_call != NULL);
@ -1540,9 +1551,9 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
&response->client_stats_report_interval)); &response->client_stats_report_interval));
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"received initial LB response message; " "[grpclb %p] Received initial LB response message; "
"client load reporting interval = %" PRIdPTR " milliseconds", "client load reporting interval = %" PRIdPTR " milliseconds",
glb_policy->client_stats_report_interval); glb_policy, glb_policy->client_stats_report_interval);
} }
/* take a weak ref (won't prevent calling of \a glb_shutdown() if the /* take a weak ref (won't prevent calling of \a glb_shutdown() if the
* strong ref count goes to zero) to be unref'd in * strong ref count goes to zero) to be unref'd in
@ -1552,8 +1563,9 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
schedule_next_client_load_report(glb_policy); schedule_next_client_load_report(glb_policy);
} else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { } else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"received initial LB response message; " "[grpclb %p] Received initial LB response message; client load "
"client load reporting NOT enabled"); "reporting NOT enabled",
glb_policy);
} }
grpc_grpclb_initial_response_destroy(response); grpc_grpclb_initial_response_destroy(response);
glb_policy->seen_initial_response = true; glb_policy->seen_initial_response = true;
@ -1563,14 +1575,16 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
if (serverlist != NULL) { if (serverlist != NULL) {
GPR_ASSERT(glb_policy->lb_call != NULL); GPR_ASSERT(glb_policy->lb_call != NULL);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Serverlist with %lu servers received", gpr_log(GPR_INFO,
(unsigned long)serverlist->num_servers); "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
glb_policy, serverlist->num_servers);
for (size_t i = 0; i < serverlist->num_servers; ++i) { for (size_t i = 0; i < serverlist->num_servers; ++i) {
grpc_resolved_address addr; grpc_resolved_address addr;
parse_server(serverlist->servers[i], &addr); parse_server(serverlist->servers[i], &addr);
char* ipport; char* ipport;
grpc_sockaddr_to_string(&ipport, &addr, false); grpc_sockaddr_to_string(&ipport, &addr, false);
gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
glb_policy, i, ipport);
gpr_free(ipport); gpr_free(ipport);
} }
} }
@ -1580,7 +1594,9 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
serverlist)) { serverlist)) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Incoming server list identical to current, ignoring."); "[grpclb %p] Incoming server list identical to current, "
"ignoring.",
glb_policy);
} }
grpc_grpclb_destroy_serverlist(serverlist); grpc_grpclb_destroy_serverlist(serverlist);
} else { /* new serverlist */ } else { /* new serverlist */
@ -1605,12 +1621,16 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
} }
} else { } else {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Received empty server list, ignoring."); gpr_log(GPR_INFO,
"[grpclb %p] Received empty server list, ignoring.",
glb_policy);
} }
grpc_grpclb_destroy_serverlist(serverlist); grpc_grpclb_destroy_serverlist(serverlist);
} }
} else { /* serverlist == NULL */ } else { /* serverlist == NULL */
gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", gpr_log(GPR_ERROR,
"[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
glb_policy,
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
} }
} }
@ -1649,8 +1669,8 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Falling back to use backends from resolver (grpclb %p)", "[grpclb %p] Falling back to use backends from resolver",
(void*)glb_policy); glb_policy);
} }
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
rr_handover_locked(glb_policy); rr_handover_locked(glb_policy);
@ -1666,10 +1686,10 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
char* status_details = char* status_details =
grpc_slice_to_c_string(glb_policy->lb_call_status_details); grpc_slice_to_c_string(glb_policy->lb_call_status_details);
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Status from LB server received. Status = %d, Details = '%s', " "[grpclb %p] Status from LB server received. Status = %d, Details "
"(call: %p), error %p", "= '%s', (call: %p), error '%s'",
glb_policy->lb_call_status, status_details, glb_policy, glb_policy->lb_call_status, status_details,
(void*)glb_policy->lb_call, (void*)error); glb_policy->lb_call, grpc_error_string(error));
gpr_free(status_details); gpr_free(status_details);
} }
/* We need to perform cleanups no matter what. */ /* We need to perform cleanups no matter what. */
@ -1709,10 +1729,10 @@ static void glb_update_locked(grpc_lb_policy* policy,
"glb_update_missing"); "glb_update_missing");
} else { } else {
// otherwise, keep using the current LB channel (ignore this update). // otherwise, keep using the current LB channel (ignore this update).
gpr_log(GPR_ERROR, gpr_log(
"No valid LB addresses channel arg for grpclb %p update, " GPR_ERROR,
"ignoring.", "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
(void*)glb_policy); glb_policy);
} }
return; return;
} }
@ -1842,8 +1862,9 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
glb_policy->server_name = glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.", gpr_log(GPR_INFO,
glb_policy->server_name); "[grpclb %p] Will use '%s' as the server name for LB request.",
glb_policy, glb_policy->server_name);
} }
grpc_uri_destroy(uri); grpc_uri_destroy(uri);

@ -430,125 +430,108 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// for a subchannel in p->latest_pending_subchannel_list. The // for a subchannel in p->latest_pending_subchannel_list. The
// goal here is to find a subchannel from the update that we can // goal here is to find a subchannel from the update that we can
// select in place of the current one. // select in place of the current one.
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE || switch (sd->curr_connectivity_state) {
sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { case GRPC_CHANNEL_READY: {
grpc_lb_subchannel_data_stop_connectivity_watch(sd); // Case 2. Promote p->latest_pending_subchannel_list to
} // p->subchannel_list.
while (true) { if (sd->subchannel_list == p->latest_pending_subchannel_list) {
switch (sd->curr_connectivity_state) { GPR_ASSERT(p->subchannel_list != NULL);
case GRPC_CHANNEL_READY: { grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
// Case 2. Promote p->latest_pending_subchannel_list to "finish_update");
// p->subchannel_list. p->subchannel_list = p->latest_pending_subchannel_list;
if (sd->subchannel_list == p->latest_pending_subchannel_list) { p->latest_pending_subchannel_list = NULL;
GPR_ASSERT(p->subchannel_list != NULL); }
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, // Cases 1 and 2.
"finish_update"); grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
p->subchannel_list = p->latest_pending_subchannel_list; GRPC_ERROR_NONE, "connecting_ready");
p->latest_pending_subchannel_list = NULL; sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
} grpc_subchannel_get_connected_subchannel(sd->subchannel),
// Cases 1 and 2. "connected");
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, p->selected = sd;
GRPC_ERROR_NONE, "connecting_ready"); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
grpc_subchannel_get_connected_subchannel(sd->subchannel), (void*)sd->subchannel);
"connected"); }
p->selected = sd; // Drop all other subchannels, since we are now connected.
destroy_unselected_subchannels_locked(p);
// Update any calls that were waiting for a pick.
pending_pick* pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "picked");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, gpr_log(GPR_INFO,
(void*)sd->subchannel); "Servicing pending pick with selected subchannel %p",
} (void*)p->selected);
// Drop all other subchannels, since we are now connected.
destroy_unselected_subchannels_locked(p);
// Update any calls that were waiting for a pick.
pending_pick* pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "picked");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
(void*)p->selected);
}
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
} }
// Renew notification. GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
grpc_lb_subchannel_data_start_connectivity_watch(sd); gpr_free(pp);
return;
} }
case GRPC_CHANNEL_TRANSIENT_FAILURE: { // Renew notification.
do { grpc_lb_subchannel_data_start_connectivity_watch(sd);
sd->subchannel_list->checking_subchannel = break;
(sd->subchannel_list->checking_subchannel + 1) % }
sd->subchannel_list->num_subchannels; case GRPC_CHANNEL_TRANSIENT_FAILURE: {
sd = &sd->subchannel_list grpc_lb_subchannel_data_stop_connectivity_watch(sd);
->subchannels[sd->subchannel_list->checking_subchannel]; do {
} while (sd->subchannel == NULL); sd->subchannel_list->checking_subchannel =
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried (sd->subchannel_list->checking_subchannel + 1) %
// all subchannels. sd->subchannel_list->num_subchannels;
if (sd->subchannel_list->checking_subchannel == 0 && sd = &sd->subchannel_list
sd->subchannel_list == p->subchannel_list) { ->subchannels[sd->subchannel_list->checking_subchannel];
grpc_connectivity_state_set( } while (sd->subchannel == NULL);
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, // Case 1: Only set state to TRANSIENT_FAILURE if we've tried
GRPC_ERROR_REF(error), "connecting_transient_failure"); // all subchannels.
} if (sd->subchannel_list->checking_subchannel == 0 &&
sd->curr_connectivity_state = sd->subchannel_list == p->subchannel_list) {
grpc_subchannel_check_connectivity(sd->subchannel, &error); grpc_connectivity_state_set(
GRPC_ERROR_UNREF(error); &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GRPC_ERROR_REF(error), "connecting_transient_failure");
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
return;
}
break; // Go back to top of loop.
} }
case GRPC_CHANNEL_CONNECTING: // Reuses the connectivity refs from the previous watch.
case GRPC_CHANNEL_IDLE: { grpc_lb_subchannel_data_start_connectivity_watch(sd);
// Only update connectivity state in case 1. break;
if (sd->subchannel_list == p->subchannel_list) { }
grpc_connectivity_state_set( case GRPC_CHANNEL_CONNECTING:
&p->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_REF(error), case GRPC_CHANNEL_IDLE: {
"connecting_changed"); // Only update connectivity state in case 1.
} if (sd->subchannel_list == p->subchannel_list) {
// Renew notification. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
grpc_lb_subchannel_data_start_connectivity_watch(sd); GRPC_ERROR_REF(error),
return; "connecting_changed");
} }
case GRPC_CHANNEL_SHUTDOWN: { // Renew notification.
grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown"); grpc_lb_subchannel_data_start_connectivity_watch(sd);
// Advance to next subchannel and check its state. break;
grpc_lb_subchannel_data* original_sd = sd; }
do { case GRPC_CHANNEL_SHUTDOWN: {
sd->subchannel_list->checking_subchannel = grpc_lb_subchannel_data_stop_connectivity_watch(sd);
(sd->subchannel_list->checking_subchannel + 1) % grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown");
sd->subchannel_list->num_subchannels; // Advance to next subchannel and check its state.
sd = &sd->subchannel_list grpc_lb_subchannel_data* original_sd = sd;
->subchannels[sd->subchannel_list->checking_subchannel]; do {
} while (sd->subchannel == NULL && sd != original_sd); sd->subchannel_list->checking_subchannel =
if (sd == original_sd) { (sd->subchannel_list->checking_subchannel + 1) %
grpc_lb_subchannel_list_unref_for_connectivity_watch( sd->subchannel_list->num_subchannels;
sd->subchannel_list, "pf_candidate_shutdown"); sd = &sd->subchannel_list
shutdown_locked(p, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( ->subchannels[sd->subchannel_list->checking_subchannel];
"Pick first exhausted channels", &error, 1)); } while (sd->subchannel == NULL && sd != original_sd);
return; if (sd == original_sd) {
} grpc_lb_subchannel_list_unref_for_connectivity_watch(
if (sd->subchannel_list == p->subchannel_list) { sd->subchannel_list, "pf_candidate_shutdown");
grpc_connectivity_state_set( shutdown_locked(p, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "Pick first exhausted channels", &error, 1));
GRPC_ERROR_REF(error), "subchannel_failed"); break;
} }
sd->curr_connectivity_state = if (sd->subchannel_list == p->subchannel_list) {
grpc_subchannel_check_connectivity(sd->subchannel, &error); grpc_connectivity_state_set(&p->state_tracker,
GRPC_ERROR_UNREF(error); GRPC_CHANNEL_TRANSIENT_FAILURE,
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GRPC_ERROR_REF(error), "subchannel_failed");
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
return;
}
// For any other state, go back to top of loop.
// We will reuse the connectivity refs from the previous watch.
} }
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
} }
} }
} }

@ -25,6 +25,7 @@
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
@ -37,8 +38,6 @@
#define INVALID_HEAP_INDEX 0xffffffffu #define INVALID_HEAP_INDEX 0xffffffffu
#define LOG2_NUM_SHARDS 5
#define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
#define ADD_DEADLINE_SCALE 0.33 #define ADD_DEADLINE_SCALE 0.33
#define MIN_QUEUE_WINDOW_DURATION 0.01 #define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1 #define MAX_QUEUE_WINDOW_DURATION 1
@ -74,14 +73,16 @@ typedef struct {
grpc_timer list; grpc_timer list;
} timer_shard; } timer_shard;
static size_t g_num_shards;
/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address /* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
* is hashed to select the timer shard to add the timer to */ * is hashed to select the timer shard to add the timer to */
static timer_shard g_shards[NUM_SHARDS]; static timer_shard* g_shards;
/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e /* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
* the deadline of the next timer in each shard). * the deadline of the next timer in each shard).
* Access to this is protected by g_shared_mutables.mu */ * Access to this is protected by g_shared_mutables.mu */
static timer_shard* g_shard_queue[NUM_SHARDS]; static timer_shard** g_shard_queue;
#ifndef NDEBUG #ifndef NDEBUG
@ -240,6 +241,11 @@ static gpr_atm compute_min_deadline(timer_shard* shard) {
void grpc_timer_list_init() { void grpc_timer_list_init() {
uint32_t i; uint32_t i;
g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores());
g_shards = (timer_shard*)gpr_zalloc(g_num_shards * sizeof(*g_shards));
g_shard_queue =
(timer_shard**)gpr_zalloc(g_num_shards * sizeof(*g_shard_queue));
g_shared_mutables.initialized = true; g_shared_mutables.initialized = true;
g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
gpr_mu_init(&g_shared_mutables.mu); gpr_mu_init(&g_shared_mutables.mu);
@ -249,7 +255,7 @@ void grpc_timer_list_init() {
grpc_register_tracer(&grpc_timer_trace); grpc_register_tracer(&grpc_timer_trace);
grpc_register_tracer(&grpc_timer_check_trace); grpc_register_tracer(&grpc_timer_check_trace);
for (i = 0; i < NUM_SHARDS; i++) { for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i]; timer_shard* shard = &g_shards[i];
gpr_mu_init(&shard->mu); gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
@ -266,17 +272,19 @@ void grpc_timer_list_init() {
} }
void grpc_timer_list_shutdown() { void grpc_timer_list_shutdown() {
int i; size_t i;
run_some_expired_timers( run_some_expired_timers(
GPR_ATM_MAX, NULL, GPR_ATM_MAX, NULL,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) { for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i]; timer_shard* shard = &g_shards[i];
gpr_mu_destroy(&shard->mu); gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap); grpc_timer_heap_destroy(&shard->heap);
} }
gpr_mu_destroy(&g_shared_mutables.mu); gpr_mu_destroy(&g_shared_mutables.mu);
gpr_tls_destroy(&g_last_seen_min_timer); gpr_tls_destroy(&g_last_seen_min_timer);
gpr_free(g_shards);
gpr_free(g_shard_queue);
g_shared_mutables.initialized = false; g_shared_mutables.initialized = false;
} }
@ -310,7 +318,7 @@ static void note_deadline_change(timer_shard* shard) {
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
} }
while (shard->shard_queue_index < NUM_SHARDS - 1 && while (shard->shard_queue_index < g_num_shards - 1 &&
shard->min_deadline > shard->min_deadline >
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) { g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index); swap_adjacent_shards_in_queue(shard->shard_queue_index);
@ -322,7 +330,7 @@ void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
void grpc_timer_init(grpc_timer* timer, grpc_millis deadline, void grpc_timer_init(grpc_timer* timer, grpc_millis deadline,
grpc_closure* closure) { grpc_closure* closure) {
int is_first_timer = 0; int is_first_timer = 0;
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
timer->closure = closure; timer->closure = closure;
timer->deadline = deadline; timer->deadline = deadline;
@ -416,7 +424,7 @@ void grpc_timer_cancel(grpc_timer* timer) {
return; return;
} }
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
gpr_mu_lock(&shard->mu); gpr_mu_lock(&shard->mu);
if (GRPC_TRACER_ON(grpc_timer_trace)) { if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,

Loading…
Cancel
Save