From a5c2d6b234144647d340546b08052e645da186c3 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 9 Nov 2017 10:27:00 -0800 Subject: [PATCH 1/9] Don't expect LB call to be NULL inside LB call retry timer. The timer callback runs independently of query_for_backends_locked() (which initializes the LB call). It's possible for the timer callback to fire right after query_for_backends_locked() has initialized the LB call. These changes makes the timer cb be a no-op in that scenario. --- .../ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 01b243bc3e4..6e4276ce3d1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1264,8 +1264,9 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", (void*)glb_policy); } - GPR_ASSERT(glb_policy->lb_call == NULL); - query_for_backends_locked(exec_ctx, glb_policy); + if (glb_policy->lb_call == NULL) { + query_for_backends_locked(exec_ctx, glb_policy); + } } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); } From a1c65909c90d750cfeb2a157663b97204254c7f2 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 9 Nov 2017 10:37:35 -0800 Subject: [PATCH 2/9] grpclb: Improved logging --- .../client_channel/lb_policy/grpclb/grpclb.cc | 146 ++++++++++-------- 1 file changed, 83 insertions(+), 63 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 01b243bc3e4..564979c677a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -175,6 +175,9 @@ typedef struct wrapped_rr_closure_arg { /* The RR instance related to the closure */ grpc_lb_policy* rr_policy; + /* The grpclb instance that created the wrapping */ + grpc_lb_policy* glb_policy; + /* heap memory to be freed upon closure execution. */ void* free_when_done; } wrapped_rr_closure_arg; @@ -199,10 +202,11 @@ static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg, wc_arg->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); } else { - gpr_log(GPR_ERROR, - "No LB token for connected subchannel pick %p (from RR " - "instance %p).", - (void*)*wc_arg->target, (void*)wc_arg->rr_policy); + gpr_log( + GPR_ERROR, + "[grpclb %p] No LB token for connected subchannel pick %p (from RR " + "instance %p).", + wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy); abort(); } // Pass on client stats via context. Passes ownership of the reference. @@ -213,7 +217,8 @@ static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg, grpc_grpclb_client_stats_unref(wc_arg->client_stats); } if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Unreffing RR %p", (void*)wc_arg->rr_policy); + gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy, + wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); } @@ -619,8 +624,10 @@ static void update_lb_connectivity_status_locked( if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log( - GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.", - grpc_connectivity_state_name(rr_state), (void*)glb_policy->rr_policy); + GPR_INFO, + "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.", + glb_policy, grpc_connectivity_state_name(rr_state), + glb_policy->rr_policy); } grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state, rr_state_error, @@ -647,7 +654,7 @@ static bool pick_from_internal_rr_locked( if (server->drop) { // Not using the RR policy, so unref it. if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", + gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy, (intptr_t)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); @@ -656,6 +663,7 @@ static bool pick_from_internal_rr_locked( // the client_load_reporting filter, because we do not create a // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. + GPR_ASSERT(wc_arg->client_stats != NULL); grpc_grpclb_client_stats_add_call_dropped_locked( server->load_balance_token, wc_arg->client_stats); grpc_grpclb_client_stats_unref(wc_arg->client_stats); @@ -676,8 +684,8 @@ static bool pick_from_internal_rr_locked( if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); + gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy, + wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); /* add the load reporting initial metadata */ @@ -748,12 +756,12 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, grpc_lb_policy_create(exec_ctx, "round_robin", args); if (new_rr_policy == NULL) { gpr_log(GPR_ERROR, - "Failure creating a RoundRobin policy for serverlist update with " - "%lu entries. The previous RR instance (%p), if any, will continue " - "to be used. Future updates from the LB will attempt to create new " - "instances.", - (unsigned long)glb_policy->serverlist->num_servers, - (void*)glb_policy->rr_policy); + "[grpclb %p] Failure creating a RoundRobin policy for serverlist " + "update with %lu entries. The previous RR instance (%p), if any, " + "will continue to be used. Future updates from the LB will attempt " + "to create new instances.", + glb_policy, (unsigned long)glb_policy->serverlist->num_servers, + glb_policy->rr_policy); return; } glb_policy->rr_policy = new_rr_policy; @@ -797,8 +805,9 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, pp->wrapped_on_complete_arg.client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p", - (void*)glb_policy->rr_policy); + gpr_log(GPR_INFO, + "[grpclb %p] Pending pick about to (async) PICK from RR %p", + glb_policy, glb_policy->rr_policy); } pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, true /* force_async */, pp->target, @@ -811,8 +820,8 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", - (intptr_t)glb_policy->rr_policy); + gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", + glb_policy, glb_policy->rr_policy); } grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, &pping->wrapped_notify_arg.wrapper_closure); @@ -827,15 +836,15 @@ static void rr_handover_locked(grpc_exec_ctx* exec_ctx, GPR_ASSERT(args != NULL); if (glb_policy->rr_policy != NULL) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)", - (void*)glb_policy->rr_policy); + gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy, + glb_policy->rr_policy); } grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args); } else { create_rr_locked(exec_ctx, glb_policy, args); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)", - (void*)glb_policy->rr_policy); + gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy, + glb_policy->rr_policy); } } lb_policy_args_destroy(exec_ctx, args); @@ -1177,8 +1186,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, - "grpclb %p NOT picking from from RR %p: RR conn state=%s", - (void*)glb_policy, (void*)glb_policy->rr_policy, + "[grpclb %p] NOT picking from from RR %p: RR conn state=%s", + glb_policy, glb_policy->rr_policy, grpc_connectivity_state_name(rr_connectivity_state)); } add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, @@ -1186,8 +1195,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, pick_done = false; } else { // RR not in shutdown if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p", - (void*)glb_policy, (void*)glb_policy->rr_policy); + gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy, + glb_policy->rr_policy); } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); wrapped_rr_closure_arg* wc_arg = @@ -1204,6 +1213,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; + wc_arg->glb_policy = glb_policy; pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args, false /* force_async */, target, wc_arg); @@ -1211,9 +1221,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, } else { // glb_policy->rr_policy == NULL if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, - "No RR policy in grpclb instance %p. Adding to grpclb's pending " - "picks", - (void*)(glb_policy)); + "[grpclb %p] No RR policy. Adding to grpclb's pending picks", + glb_policy); } add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, on_complete); @@ -1261,8 +1270,7 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, glb_policy->retry_timer_active = false; if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", - (void*)glb_policy); + gpr_log(GPR_INFO, "[grpclb %p] Restaring call to LB server", glb_policy); } GPR_ASSERT(glb_policy->lb_call == NULL); query_for_backends_locked(exec_ctx, glb_policy); @@ -1284,14 +1292,16 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx, grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state) .next_attempt_start_time; if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...", - (void*)glb_policy); + gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", + glb_policy); grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); if (timeout > 0) { - gpr_log(GPR_DEBUG, "... retry_timer_active in %" PRIdPTR "ms.", - timeout); + gpr_log(GPR_DEBUG, + "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", + glb_policy, timeout); } else { - gpr_log(GPR_DEBUG, "... retry_timer_active immediately."); + gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", + glb_policy); } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); @@ -1392,7 +1402,7 @@ static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg, exec_ctx, glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); if (call_error != GRPC_CALL_OK) { - gpr_log(GPR_ERROR, "call_error=%d", call_error); + gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); GPR_ASSERT(GRPC_CALL_OK == call_error); } } @@ -1486,9 +1496,8 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx, if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, - "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)", - (void*)glb_policy, (void*)glb_policy->lb_channel, - (void*)glb_policy->lb_call); + "[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)", + glb_policy, glb_policy->lb_channel, glb_policy->lb_call); } GPR_ASSERT(glb_policy->lb_call != NULL); @@ -1578,9 +1587,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, &response->client_stats_report_interval)); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, - "received initial LB response message; " + "[grpclb %p] Received initial LB response message; " "client load reporting interval = %" PRIdPTR " milliseconds", - glb_policy->client_stats_report_interval); + glb_policy, glb_policy->client_stats_report_interval); } /* take a weak ref (won't prevent calling of \a glb_shutdown() if the * strong ref count goes to zero) to be unref'd in @@ -1590,8 +1599,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, schedule_next_client_load_report(exec_ctx, glb_policy); } else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, - "received initial LB response message; " - "client load reporting NOT enabled"); + "[grpclb %p] Received initial LB response message; client load " + "reporting NOT enabled", + glb_policy); } grpc_grpclb_initial_response_destroy(response); glb_policy->seen_initial_response = true; @@ -1601,14 +1611,16 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, if (serverlist != NULL) { GPR_ASSERT(glb_policy->lb_call != NULL); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Serverlist with %lu servers received", - (unsigned long)serverlist->num_servers); + gpr_log(GPR_INFO, + "[grpclb %p] Serverlist with %" PRIuPTR " servers received", + glb_policy, serverlist->num_servers); for (size_t i = 0; i < serverlist->num_servers; ++i) { grpc_resolved_address addr; parse_server(serverlist->servers[i], &addr); char* ipport; grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); + gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", + glb_policy, i, ipport); gpr_free(ipport); } } @@ -1618,7 +1630,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, serverlist)) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, - "Incoming server list identical to current, ignoring."); + "[grpclb %p] Incoming server list identical to current, " + "ignoring.", + glb_policy); } grpc_grpclb_destroy_serverlist(serverlist); } else { /* new serverlist */ @@ -1644,12 +1658,16 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, } } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Received empty server list, ignoring."); + gpr_log(GPR_INFO, + "[grpclb %p] Received empty server list, ignoring.", + glb_policy); } grpc_grpclb_destroy_serverlist(serverlist); } } else { /* serverlist == NULL */ - gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", + gpr_log(GPR_ERROR, + "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", + glb_policy, grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); } } @@ -1689,8 +1707,8 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, - "Falling back to use backends from resolver (grpclb %p)", - (void*)glb_policy); + "[grpclb %p] Falling back to use backends from resolver", + glb_policy); } GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); rr_handover_locked(exec_ctx, glb_policy); @@ -1708,10 +1726,11 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx, char* status_details = grpc_slice_to_c_string(glb_policy->lb_call_status_details); gpr_log(GPR_INFO, - "Status from LB server received. Status = %d, Details = '%s', " + "[grpclb %p] Status from LB server received. Status = %d, Details " + "= '%s', " "(call: %p), error %p", - glb_policy->lb_call_status, status_details, - (void*)glb_policy->lb_call, (void*)error); + glb_policy, glb_policy->lb_call_status, status_details, + glb_policy->lb_call, error); gpr_free(status_details); } /* We need to perform cleanups no matter what. */ @@ -1752,10 +1771,10 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, "glb_update_missing"); } else { // otherwise, keep using the current LB channel (ignore this update). - gpr_log(GPR_ERROR, - "No valid LB addresses channel arg for grpclb %p update, " - "ignoring.", - (void*)glb_policy); + gpr_log( + GPR_ERROR, + "[grpclb %p] No valid LB addresses channel arg in update, ignoring.", + glb_policy); } return; } @@ -1887,8 +1906,9 @@ static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx, glb_policy->server_name = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.", - glb_policy->server_name); + gpr_log(GPR_INFO, + "[grpclb %p] Will use '%s' as the server name for LB request.", + glb_policy, glb_policy->server_name); } grpc_uri_destroy(uri); From b90cb3fc29230641fd749fdb7b204f3f6deea000 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 9 Nov 2017 13:58:00 -0800 Subject: [PATCH 3/9] pr comments --- .../ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 6e4276ce3d1..73a198c62be 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1259,14 +1259,13 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; glb_policy->retry_timer_active = false; - if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { + if (!glb_policy->shutting_down && glb_policy->lb_call == NULL && + error == GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", (void*)glb_policy); } - if (glb_policy->lb_call == NULL) { - query_for_backends_locked(exec_ctx, glb_policy); - } + query_for_backends_locked(exec_ctx, glb_policy); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); } From 2b372e0176bc7e56f4ca768107a6c63c43570266 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 9 Nov 2017 14:15:59 -0800 Subject: [PATCH 4/9] pr comments --- .../client_channel/lb_policy/grpclb/grpclb.cc | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 564979c677a..93b7aede973 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -175,7 +175,8 @@ typedef struct wrapped_rr_closure_arg { /* The RR instance related to the closure */ grpc_lb_policy* rr_policy; - /* The grpclb instance that created the wrapping */ + /* The grpclb instance that created the wrapping. This instance is not owned, + * reference counts are untouched. Its used only for logging purposes. */ grpc_lb_policy* glb_policy; /* heap memory to be freed upon closure execution. */ @@ -655,7 +656,7 @@ static bool pick_from_internal_rr_locked( // Not using the RR policy, so unref it. if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy, - (intptr_t)wc_arg->rr_policy); + wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); // Update client load reporting stats to indicate the number of @@ -757,10 +758,11 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, if (new_rr_policy == NULL) { gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy for serverlist " - "update with %lu entries. The previous RR instance (%p), if any, " - "will continue to be used. Future updates from the LB will attempt " - "to create new instances.", - glb_policy, (unsigned long)glb_policy->serverlist->num_servers, + "update with %" PRIuPTR + " entries. The previous RR instance (%p), if any, will continue to " + "be used. Future updates from the LB will attempt to create new " + "instances.", + glb_policy, glb_policy->serverlist->num_servers, glb_policy->rr_policy); return; } @@ -1270,7 +1272,7 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, glb_policy->retry_timer_active = false; if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "[grpclb %p] Restaring call to LB server", glb_policy); + gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy); } GPR_ASSERT(glb_policy->lb_call == NULL); query_for_backends_locked(exec_ctx, glb_policy); @@ -1727,10 +1729,9 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx, grpc_slice_to_c_string(glb_policy->lb_call_status_details); gpr_log(GPR_INFO, "[grpclb %p] Status from LB server received. Status = %d, Details " - "= '%s', " - "(call: %p), error %p", + "= '%s', (call: %p), error '%s'", glb_policy, glb_policy->lb_call_status, status_details, - glb_policy->lb_call, error); + glb_policy->lb_call, grpc_error_string(error)); gpr_free(status_details); } /* We need to perform cleanups no matter what. */ From 59607907a84982bbbada0a4c2c10e890de915493 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 9 Nov 2017 14:39:59 -0800 Subject: [PATCH 5/9] pr comments --- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 93b7aede973..fb12ec52aba 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -176,7 +176,7 @@ typedef struct wrapped_rr_closure_arg { grpc_lb_policy* rr_policy; /* The grpclb instance that created the wrapping. This instance is not owned, - * reference counts are untouched. Its used only for logging purposes. */ + * reference counts are untouched. It's used only for logging purposes. */ grpc_lb_policy* glb_policy; /* heap memory to be freed upon closure execution. */ From 94eae043c723919442b5f837bc97e9c0763cc25b Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 9 Nov 2017 17:27:44 -0800 Subject: [PATCH 6/9] PF: don't unref errors when about to loop in pf_conn cb --- .../lb_policy/pick_first/pick_first.cc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 125a4186aa2..e87911bc94d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -444,6 +444,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); } + bool updated_error = false; while (true) { switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_READY: { @@ -486,6 +487,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (updated_error) GRPC_ERROR_UNREF(error); return; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { @@ -506,11 +508,13 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); - GRPC_ERROR_UNREF(error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GRPC_ERROR_UNREF(error); // Reuses the connectivity refs from the previous watch. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); return; + } else { + updated_error = true; } break; // Go back to top of loop. } @@ -524,6 +528,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (updated_error) GRPC_ERROR_UNREF(error); return; } case GRPC_CHANNEL_SHUTDOWN: { @@ -544,6 +549,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, shutdown_locked(exec_ctx, p, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick first exhausted channels", &error, 1)); + if (updated_error) GRPC_ERROR_UNREF(error); return; } if (sd->subchannel_list == p->subchannel_list) { @@ -553,11 +559,13 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); - GRPC_ERROR_UNREF(error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GRPC_ERROR_UNREF(error); // Reuses the connectivity refs from the previous watch. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); return; + } else { + updated_error = true; } // For any other state, go back to top of loop. // We will reuse the connectivity refs from the previous watch. From b07d7a8051eaa7aef58cd45733b54f072a9e80f3 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 10 Nov 2017 10:37:09 -0800 Subject: [PATCH 7/9] Removed call to grpc_subchannel_check_connectivity and loop in pf_conn cb --- .../lb_policy/pick_first/pick_first.cc | 217 ++++++++---------- 1 file changed, 99 insertions(+), 118 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index e87911bc94d..c4a2fa0bfff 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -445,131 +445,112 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); } bool updated_error = false; - while (true) { - switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_READY: { - // Case 2. Promote p->latest_pending_subchannel_list to - // p->subchannel_list. - if (sd->subchannel_list == p->latest_pending_subchannel_list) { - GPR_ASSERT(p->subchannel_list != NULL); - grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->subchannel_list, "finish_update"); - p->subchannel_list = p->latest_pending_subchannel_list; - p->latest_pending_subchannel_list = NULL; - } - // Cases 1 and 2. - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_READY, GRPC_ERROR_NONE, - "connecting_ready"); - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); - p->selected = sd; + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_READY: { + // Case 2. Promote p->latest_pending_subchannel_list to + // p->subchannel_list. + if (sd->subchannel_list == p->latest_pending_subchannel_list) { + GPR_ASSERT(p->subchannel_list != NULL); + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "finish_update"); + p->subchannel_list = p->latest_pending_subchannel_list; + p->latest_pending_subchannel_list = NULL; + } + // Cases 1 and 2. + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_READY, GRPC_ERROR_NONE, + "connecting_ready"); + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "connected"); + p->selected = sd; + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, + (void*)sd->subchannel); + } + // Drop all other subchannels, since we are now connected. + destroy_unselected_subchannels_locked(exec_ctx, p); + // Update any calls that were waiting for a pick. + pending_pick* pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "picked"); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, - (void*)sd->subchannel); - } - // Drop all other subchannels, since we are now connected. - destroy_unselected_subchannels_locked(exec_ctx, p); - // Update any calls that were waiting for a pick. - pending_pick* pp; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "Servicing pending pick with selected subchannel %p", - (void*)p->selected); - } - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); + gpr_log(GPR_INFO, + "Servicing pending pick with selected subchannel %p", + (void*)p->selected); } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - if (updated_error) GRPC_ERROR_UNREF(error); - return; + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + gpr_free(pp); } - case GRPC_CHANNEL_TRANSIENT_FAILURE: { - do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == NULL); - // Case 1: Only set state to TRANSIENT_FAILURE if we've tried - // all subchannels. - if (sd->subchannel_list->checking_subchannel == 0 && - sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connecting_transient_failure"); - } - sd->curr_connectivity_state = - grpc_subchannel_check_connectivity(sd->subchannel, &error); - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GRPC_ERROR_UNREF(error); - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - return; - } else { - updated_error = true; - } - break; // Go back to top of loop. + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (updated_error) GRPC_ERROR_UNREF(error); + break; + } + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL); + // Case 1: Only set state to TRANSIENT_FAILURE if we've tried + // all subchannels. + if (sd->subchannel_list->checking_subchannel == 0 && + sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "connecting_transient_failure"); } - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: { - // Only update connectivity state in case 1. - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_REF(error), "connecting_changed"); - } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + break; + } + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: { + // Only update connectivity state in case 1. + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_REF(error), "connecting_changed"); + } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (updated_error) GRPC_ERROR_UNREF(error); + break; + } + case GRPC_CHANNEL_SHUTDOWN: { + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "pf_candidate_shutdown"); + // Advance to next subchannel and check its state. + grpc_lb_subchannel_data* original_sd = sd; + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL && sd != original_sd); + if (sd == original_sd) { + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); + shutdown_locked(exec_ctx, p, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick first exhausted channels", &error, 1)); if (updated_error) GRPC_ERROR_UNREF(error); - return; + break; } - case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, - "pf_candidate_shutdown"); - // Advance to next subchannel and check its state. - grpc_lb_subchannel_data* original_sd = sd; - do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == NULL && sd != original_sd); - if (sd == original_sd) { - grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); - shutdown_locked(exec_ctx, p, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick first exhausted channels", &error, 1)); - if (updated_error) GRPC_ERROR_UNREF(error); - return; - } - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - sd->curr_connectivity_state = - grpc_subchannel_check_connectivity(sd->subchannel, &error); - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GRPC_ERROR_UNREF(error); - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - return; - } else { - updated_error = true; - } - // For any other state, go back to top of loop. - // We will reuse the connectivity refs from the previous watch. + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "subchannel_failed"); } + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + break; } } } From 0fa2c6bba8ac6ef9a299eb71d432a7995fa2753a Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 10 Nov 2017 11:28:33 -0800 Subject: [PATCH 8/9] Moar cleanups! --- .../client_channel/lb_policy/pick_first/pick_first.cc | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index c4a2fa0bfff..c79ee5687df 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -440,11 +440,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, // for a subchannel in p->latest_pending_subchannel_list. The // goal here is to find a subchannel from the update that we can // select in place of the current one. - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE || - sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - } - bool updated_error = false; switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to @@ -486,10 +481,10 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - if (updated_error) GRPC_ERROR_UNREF(error); break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); do { sd->subchannel_list->checking_subchannel = (sd->subchannel_list->checking_subchannel + 1) % @@ -519,10 +514,10 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - if (updated_error) GRPC_ERROR_UNREF(error); break; } case GRPC_CHANNEL_SHUTDOWN: { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_candidate_shutdown"); // Advance to next subchannel and check its state. @@ -540,7 +535,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, shutdown_locked(exec_ctx, p, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick first exhausted channels", &error, 1)); - if (updated_error) GRPC_ERROR_UNREF(error); break; } if (sd->subchannel_list == p->subchannel_list) { From 6712a7527801e73d592e3f5477231100a3c28f7f Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 10 Nov 2017 12:09:25 -0800 Subject: [PATCH 9/9] Fix wrong assignment --- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index fb12ec52aba..8cd68da35c0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1215,7 +1215,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; - wc_arg->glb_policy = glb_policy; + wc_arg->glb_policy = pol; pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args, false /* force_async */, target, wc_arg);