|
|
|
@ -367,11 +367,11 @@ typedef struct glb_lb_policy { |
|
|
|
|
/** are we already watching the LB channel's connectivity? */ |
|
|
|
|
bool watching_lb_channel; |
|
|
|
|
|
|
|
|
|
/** is \a lb_call_retry_timer active? */ |
|
|
|
|
bool retry_timer_active; |
|
|
|
|
/** is the callback associated with \a lb_call_retry_timer pending? */ |
|
|
|
|
bool retry_timer_callback_pending; |
|
|
|
|
|
|
|
|
|
/** is \a lb_fallback_timer active? */ |
|
|
|
|
bool fallback_timer_active; |
|
|
|
|
/** is the callback associated with \a lb_fallback_timer pending? */ |
|
|
|
|
bool fallback_timer_callback_pending; |
|
|
|
|
|
|
|
|
|
/** called upon changes to the LB channel's connectivity. */ |
|
|
|
|
grpc_closure lb_channel_on_connectivity_changed; |
|
|
|
@ -430,7 +430,7 @@ typedef struct glb_lb_policy { |
|
|
|
|
/* Interval and timer for next client load report. */ |
|
|
|
|
grpc_millis client_stats_report_interval; |
|
|
|
|
grpc_timer client_load_report_timer; |
|
|
|
|
bool client_load_report_timer_pending; |
|
|
|
|
bool client_load_report_timer_callback_pending; |
|
|
|
|
bool last_client_load_report_counters_were_zero; |
|
|
|
|
/* Closure used for either the load report timer or the callback for
|
|
|
|
|
* completion of sending the load report. */ |
|
|
|
@ -1025,13 +1025,11 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { |
|
|
|
|
grpc_call_cancel(lb_call, nullptr); |
|
|
|
|
/* lb_on_server_status_received will pick up the cancel and clean up */ |
|
|
|
|
} |
|
|
|
|
if (glb_policy->retry_timer_active) { |
|
|
|
|
if (glb_policy->retry_timer_callback_pending) { |
|
|
|
|
grpc_timer_cancel(&glb_policy->lb_call_retry_timer); |
|
|
|
|
glb_policy->retry_timer_active = false; |
|
|
|
|
} |
|
|
|
|
if (glb_policy->fallback_timer_active) { |
|
|
|
|
if (glb_policy->fallback_timer_callback_pending) { |
|
|
|
|
grpc_timer_cancel(&glb_policy->lb_fallback_timer); |
|
|
|
|
glb_policy->fallback_timer_active = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pending_pick* pp = glb_policy->pending_picks; |
|
|
|
@ -1160,14 +1158,15 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy); |
|
|
|
|
static void start_picking_locked(glb_lb_policy* glb_policy) { |
|
|
|
|
/* start a timer to fall back */ |
|
|
|
|
if (glb_policy->lb_fallback_timeout_ms > 0 && |
|
|
|
|
glb_policy->serverlist == nullptr && !glb_policy->fallback_timer_active) { |
|
|
|
|
glb_policy->serverlist == nullptr && |
|
|
|
|
!glb_policy->fallback_timer_callback_pending) { |
|
|
|
|
grpc_millis deadline = |
|
|
|
|
grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); |
|
|
|
|
GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, |
|
|
|
|
glb_policy, |
|
|
|
|
grpc_combiner_scheduler(glb_policy->base.combiner)); |
|
|
|
|
glb_policy->fallback_timer_active = true; |
|
|
|
|
glb_policy->fallback_timer_callback_pending = true; |
|
|
|
|
grpc_timer_init(&glb_policy->lb_fallback_timer, deadline, |
|
|
|
|
&glb_policy->lb_on_fallback); |
|
|
|
|
} |
|
|
|
@ -1288,7 +1287,7 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol, |
|
|
|
|
|
|
|
|
|
static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { |
|
|
|
|
glb_lb_policy* glb_policy = (glb_lb_policy*)arg; |
|
|
|
|
glb_policy->retry_timer_active = false; |
|
|
|
|
glb_policy->retry_timer_callback_pending = false; |
|
|
|
|
if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr && |
|
|
|
|
error == GRPC_ERROR_NONE) { |
|
|
|
|
if (grpc_lb_glb_trace.enabled()) { |
|
|
|
@ -1301,7 +1300,7 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { |
|
|
|
|
|
|
|
|
|
static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { |
|
|
|
|
if (glb_policy->started_picking && glb_policy->updating_lb_call) { |
|
|
|
|
if (glb_policy->retry_timer_active) { |
|
|
|
|
if (glb_policy->retry_timer_callback_pending) { |
|
|
|
|
grpc_timer_cancel(&glb_policy->lb_call_retry_timer); |
|
|
|
|
} |
|
|
|
|
if (!glb_policy->shutting_down) start_picking_locked(glb_policy); |
|
|
|
@ -1315,10 +1314,10 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { |
|
|
|
|
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); |
|
|
|
|
if (timeout > 0) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", |
|
|
|
|
"[grpclb %p] ... retry LB call after %" PRIuPTR "ms.", |
|
|
|
|
glb_policy, timeout); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", |
|
|
|
|
gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.", |
|
|
|
|
glb_policy); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1326,7 +1325,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { |
|
|
|
|
GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, |
|
|
|
|
lb_call_on_retry_timer_locked, glb_policy, |
|
|
|
|
grpc_combiner_scheduler(glb_policy->base.combiner)); |
|
|
|
|
glb_policy->retry_timer_active = true; |
|
|
|
|
glb_policy->retry_timer_callback_pending = true; |
|
|
|
|
grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, |
|
|
|
|
&glb_policy->lb_on_call_retry); |
|
|
|
|
} |
|
|
|
@ -1353,7 +1352,7 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) { |
|
|
|
|
grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); |
|
|
|
|
glb_policy->client_load_report_payload = nullptr; |
|
|
|
|
if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) { |
|
|
|
|
glb_policy->client_load_report_timer_pending = false; |
|
|
|
|
glb_policy->client_load_report_timer_callback_pending = false; |
|
|
|
|
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); |
|
|
|
|
if (glb_policy->lb_call == nullptr) { |
|
|
|
|
maybe_restart_lb_call(glb_policy); |
|
|
|
@ -1394,7 +1393,7 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) { |
|
|
|
|
static void send_client_load_report_locked(void* arg, grpc_error* error) { |
|
|
|
|
glb_lb_policy* glb_policy = (glb_lb_policy*)arg; |
|
|
|
|
if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) { |
|
|
|
|
glb_policy->client_load_report_timer_pending = false; |
|
|
|
|
glb_policy->client_load_report_timer_callback_pending = false; |
|
|
|
|
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); |
|
|
|
|
if (glb_policy->lb_call == nullptr) { |
|
|
|
|
maybe_restart_lb_call(glb_policy); |
|
|
|
@ -1505,7 +1504,7 @@ static void lb_call_destroy_locked(glb_lb_policy* glb_policy) { |
|
|
|
|
grpc_byte_buffer_destroy(glb_policy->lb_request_payload); |
|
|
|
|
grpc_slice_unref_internal(glb_policy->lb_call_status_details); |
|
|
|
|
|
|
|
|
|
if (glb_policy->client_load_report_timer_pending) { |
|
|
|
|
if (glb_policy->client_load_report_timer_callback_pending) { |
|
|
|
|
grpc_timer_cancel(&glb_policy->client_load_report_timer); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1635,7 +1634,7 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { |
|
|
|
|
/* take a weak ref (won't prevent calling of \a glb_shutdown() if the
|
|
|
|
|
* strong ref count goes to zero) to be unref'd in |
|
|
|
|
* send_client_load_report_locked() */ |
|
|
|
|
glb_policy->client_load_report_timer_pending = true; |
|
|
|
|
glb_policy->client_load_report_timer_callback_pending = true; |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); |
|
|
|
|
schedule_next_client_load_report(glb_policy); |
|
|
|
|
} else if (grpc_lb_glb_trace.enabled()) { |
|
|
|
@ -1684,9 +1683,8 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { |
|
|
|
|
/* or dispose of the fallback */ |
|
|
|
|
grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); |
|
|
|
|
glb_policy->fallback_backend_addresses = nullptr; |
|
|
|
|
if (glb_policy->fallback_timer_active) { |
|
|
|
|
if (glb_policy->fallback_timer_callback_pending) { |
|
|
|
|
grpc_timer_cancel(&glb_policy->lb_fallback_timer); |
|
|
|
|
glb_policy->fallback_timer_active = false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* and update the copy in the glb_lb_policy instance. This
|
|
|
|
@ -1739,7 +1737,7 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { |
|
|
|
|
|
|
|
|
|
static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { |
|
|
|
|
glb_lb_policy* glb_policy = (glb_lb_policy*)arg; |
|
|
|
|
glb_policy->fallback_timer_active = false; |
|
|
|
|
glb_policy->fallback_timer_callback_pending = false; |
|
|
|
|
/* If we receive a serverlist after the timer fires but before this callback
|
|
|
|
|
* actually runs, don't fall back. */ |
|
|
|
|
if (glb_policy->serverlist == nullptr) { |
|
|
|
@ -1774,7 +1772,7 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { |
|
|
|
|
// If the load report timer is still pending, we wait for it to be
|
|
|
|
|
// called before restarting the call. Otherwise, we restart the call
|
|
|
|
|
// here.
|
|
|
|
|
if (!glb_policy->client_load_report_timer_pending) { |
|
|
|
|
if (!glb_policy->client_load_report_timer_callback_pending) { |
|
|
|
|
maybe_restart_lb_call(glb_policy); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1884,9 +1882,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, |
|
|
|
|
// lb_on_server_status_received() will pick up the cancel and reinit
|
|
|
|
|
// lb_call.
|
|
|
|
|
} else if (glb_policy->started_picking) { |
|
|
|
|
if (glb_policy->retry_timer_active) { |
|
|
|
|
if (glb_policy->retry_timer_callback_pending) { |
|
|
|
|
grpc_timer_cancel(&glb_policy->lb_call_retry_timer); |
|
|
|
|
glb_policy->retry_timer_active = false; |
|
|
|
|
} |
|
|
|
|
start_picking_locked(glb_policy); |
|
|
|
|
} |
|
|
|
|