Extract lb_calld from glb_policy

pull/13911/head
Juanli Shen 7 years ago
parent 633add8161
commit 8e4c9d308c
  1. 14
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
  2. 687
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 3
      test/cpp/end2end/grpclb_end2end_test.cc

@ -69,11 +69,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
call_data* calld = (call_data*)elem->call_data;
// Get stats object from context and take a ref.
GPR_ASSERT(args->context != nullptr);
GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr);
if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
calld->client_stats = grpc_grpclb_client_stats_ref(
(grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS].value);
(grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS]
.value);
// Record call started.
grpc_grpclb_client_stats_add_call_started(calld->client_stats);
}
return GRPC_ERROR_NONE;
}
@ -81,6 +83,7 @@ static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = (call_data*)elem->call_data;
if (calld->client_stats != nullptr) {
// Record call finished, optionally setting client_failed_to_send and
// received.
grpc_grpclb_client_stats_add_call_finished(
@ -89,17 +92,19 @@ static void destroy_call_elem(grpc_call_element* elem,
calld->client_stats);
// All done, so unref the stats object.
grpc_grpclb_client_stats_unref(calld->client_stats);
}
}
static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = (call_data*)elem->call_data;
GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0);
if (calld->client_stats != nullptr) {
// Intercept send_initial_metadata.
if (batch->send_initial_metadata) {
calld->original_on_complete_for_send = batch->on_complete;
GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, calld,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send,
calld, grpc_schedule_on_exec_ctx);
batch->on_complete = &calld->on_complete_for_send;
}
// Intercept recv_initial_metadata.
@ -112,6 +117,7 @@ static void start_transport_stream_op_batch(
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
}
// Chain to next filter.
grpc_call_next_op(elem, batch);
GPR_TIMER_END("clr_start_transport_stream_op_batch", 0);

@ -169,25 +169,78 @@ struct pending_ping {
} // namespace
struct glb_lb_policy {
/** base policy: must be first */
typedef struct glb_lb_call_data {
struct glb_lb_policy* glb_policy;
// todo refactor
gpr_refcount refs;
/** The streaming call to the LB server. Always non-NULL. */
grpc_call* lb_call;
/** The initial metadata received from the LB server. */
grpc_metadata_array lb_initial_metadata_recv;
/** The message sent to the LB server. It's used to query for backends (the
* value may vary if the LB server indicates a redirect) or send client load
* report. */
grpc_byte_buffer* send_message_payload;
/** The callback after the initial request is sent. */
grpc_closure lb_on_sent_initial_request;
/** The response received from the LB server, if any. */
grpc_byte_buffer* recv_message_payload;
/** The callback to process the response received from the LB server. */
grpc_closure lb_on_response_received;
bool seen_initial_response;
/** The callback to process the status received from the LB server, which
* signals the end of the LB call. */
grpc_closure lb_on_server_status_received;
/** The trailing metadata from the LB server. */
grpc_metadata_array lb_trailing_metadata_recv;
/** The call status code and details. */
grpc_status_code lb_call_status;
grpc_slice lb_call_status_details;
/** The stats for client-side load reporting associated with this LB call.
* Created after the first serverlist is received. */
grpc_grpclb_client_stats* client_stats;
/** The interval and timer for next client load report. */
grpc_millis client_stats_report_interval;
grpc_timer client_load_report_timer;
bool client_load_report_timer_callback_pending;
bool last_client_load_report_counters_were_zero;
bool client_load_report_is_due;
/** The closure used for either the load report timer or the callback for
* completion of sending the load report. */
grpc_closure client_load_report_closure;
} glb_lb_call_data;
typedef struct glb_lb_policy {
/** Base policy: must be first. */
grpc_lb_policy base;
/** who the client is trying to communicate with */
/** Who the client is trying to communicate with. */
const char* server_name;
/** Channel related data that will be propagated to the internal RR policy. */
grpc_client_channel_factory* cc_factory;
grpc_channel_args* args;
/** timeout in milliseconds for the LB call. 0 means no deadline. */
int lb_call_timeout_ms;
/** timeout in milliseconds for before using fallback backend addresses.
/** Timeout in milliseconds for before using fallback backend addresses.
* 0 means not using fallback. */
int lb_fallback_timeout_ms;
/** for communicating with the LB server */
/** The channel for communicating with the LB server. */
grpc_channel* lb_channel;
/** The data associated with the current LB call. It holds a ref to this LB
* policy. It's initialized every time we query for backends. It's reset to
* NULL whenever the current LB call is no longer needed (e.g., the LB policy
* is shutting down, or the LB call has ended). A non-NULL lb_calld always
* contains a non-NULL lb_call. */
glb_lb_call_data* lb_calld;
/** response generator to inject address updates into \a lb_channel */
grpc_fake_resolver_response_generator* response_generator;
@ -225,9 +278,6 @@ struct glb_lb_policy {
bool shutting_down;
/** are we currently updating lb_call? */
bool updating_lb_call;
/** are we already watching the LB channel's connectivity? */
bool watching_lb_channel;
@ -243,65 +293,70 @@ struct glb_lb_policy {
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
/* Finished sending initial request. */
grpc_closure lb_on_sent_initial_request;
/* Status from the LB server has been received. This signals the end of the LB
* call. */
grpc_closure lb_on_server_status_received;
/* A response from the LB server has been received. Process it */
grpc_closure lb_on_response_received;
/* LB call retry timer callback. */
grpc_closure lb_on_call_retry;
/* LB fallback timer callback. */
grpc_closure lb_on_fallback;
grpc_call* lb_call; /* streaming call to the LB server, */
grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
grpc_metadata_array
lb_trailing_metadata_recv; /* trailing MD from LB server */
/* what's being sent to the LB server. Note that its value may vary if the LB
* server indicates a redirect. */
grpc_byte_buffer* lb_request_payload;
/* response the LB server, if any. Processed in lb_on_response_received() */
grpc_byte_buffer* lb_response_payload;
/* call status code and details, set in lb_on_server_status_received() */
grpc_status_code lb_call_status;
grpc_slice lb_call_status_details;
/** LB call retry backoff state */
grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff;
/** timeout in milliseconds for the LB call. 0 means no deadline. */
int lb_call_timeout_ms;
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
/** LB call retry timer callback */
grpc_closure lb_on_call_retry;
/** LB fallback timer */
grpc_timer lb_fallback_timer;
/** LB fallback timer callback */
grpc_closure lb_on_fallback;
} glb_lb_policy;
bool initial_request_sent;
bool seen_initial_response;
static void glb_lb_call_data_ref(glb_lb_call_data* lb_calld,
const char* reason) {
gpr_ref_non_zero(&lb_calld->refs);
if (grpc_lb_glb_trace.enabled()) {
const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p REF %lu->%lu (%s)",
grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
(unsigned long)(count - 1), (unsigned long)count, reason);
}
}
/* Stats for client-side load reporting. Should be unreffed and
* recreated whenever lb_call is replaced. */
grpc_grpclb_client_stats* client_stats;
/* Interval and timer for next client load report. */
grpc_millis client_stats_report_interval;
grpc_timer client_load_report_timer;
bool client_load_report_timer_callback_pending;
bool last_client_load_report_counters_were_zero;
/* Closure used for either the load report timer or the callback for
* completion of sending the load report. */
grpc_closure client_load_report_closure;
/* Client load report message payload. */
grpc_byte_buffer* client_load_report_payload;
};
static void glb_lb_call_data_unref(glb_lb_call_data* lb_calld,
const char* reason) {
const bool done = gpr_unref(&lb_calld->refs);
if (grpc_lb_glb_trace.enabled()) {
const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p UNREF %lu->%lu (%s)",
grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
(unsigned long)(count + 1), (unsigned long)count, reason);
}
if (done) {
GPR_ASSERT(lb_calld->lb_call != nullptr);
grpc_call_unref(lb_calld->lb_call);
grpc_metadata_array_destroy(&lb_calld->lb_initial_metadata_recv);
grpc_metadata_array_destroy(&lb_calld->lb_trailing_metadata_recv);
grpc_byte_buffer_destroy(lb_calld->send_message_payload);
grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
grpc_slice_unref_internal(lb_calld->lb_call_status_details);
if (lb_calld->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(lb_calld->client_stats);
}
GRPC_LB_POLICY_UNREF(&lb_calld->glb_policy->base, "lb_calld");
gpr_free(lb_calld);
}
}
static void lb_call_data_shutdown(glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->lb_calld != nullptr);
GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
// lb_on_server_status_received will complete the cancellation and clean up.
grpc_call_cancel(glb_policy->lb_calld->lb_call, nullptr);
if (glb_policy->lb_calld->client_load_report_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_calld->client_load_report_timer);
}
glb_policy->lb_calld = nullptr;
}
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
@ -334,11 +389,12 @@ static void pending_pick_set_metadata_and_context(pending_pick* pp) {
abort();
}
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT(pp->client_stats != nullptr);
if (pp->client_stats != nullptr) {
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
pp->client_stats;
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
destroy_client_stats;
}
} else {
if (pp->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(pp->client_stats);
@ -605,9 +661,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
GPR_ASSERT(glb_policy->client_stats != nullptr);
if (glb_policy->lb_calld != nullptr &&
glb_policy->lb_calld->client_stats != nullptr) {
grpc_grpclb_client_stats_add_call_dropped_locked(
server->load_balance_token, glb_policy->client_stats);
server->load_balance_token, glb_policy->lb_calld->client_stats);
}
if (force_async) {
GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
@ -618,7 +676,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
}
}
// Set client_stats and user_data.
pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats);
if (glb_policy->lb_calld != nullptr &&
glb_policy->lb_calld->client_stats != nullptr) {
pp->client_stats =
grpc_grpclb_client_stats_ref(glb_policy->lb_calld->client_stats);
}
GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
@ -872,9 +934,6 @@ static void glb_destroy(grpc_lb_policy* pol) {
GPR_ASSERT(glb_policy->pending_pings == nullptr);
gpr_free((void*)glb_policy->server_name);
grpc_channel_args_destroy(glb_policy->args);
if (glb_policy->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
}
grpc_connectivity_state_destroy(&glb_policy->state_tracker);
if (glb_policy->serverlist != nullptr) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
@ -892,13 +951,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
/* glb_policy->lb_call and this local lb_call must be consistent at this point
* because glb_policy->lb_call is only assigned in lb_call_init_locked as part
* of query_for_backends_locked, which can only be invoked while
* glb_policy->shutting_down is false. */
if (glb_policy->lb_call != nullptr) {
grpc_call_cancel(glb_policy->lb_call, nullptr);
/* lb_on_server_status_received will pick up the cancel and clean up */
if (glb_policy->lb_calld != nullptr) {
lb_call_data_shutdown(glb_policy);
}
if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
@ -1048,7 +1102,6 @@ static void start_picking_locked(glb_lb_policy* glb_policy) {
grpc_timer_init(&glb_policy->lb_fallback_timer, deadline,
&glb_policy->lb_on_fallback);
}
glb_policy->started_picking = true;
glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(glb_policy);
@ -1089,7 +1142,6 @@ static int glb_pick_locked(grpc_lb_policy* pol,
gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
glb_policy->rr_policy);
}
GPR_ASSERT(glb_policy->client_stats != nullptr);
pick_done =
pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp);
}
@ -1139,8 +1191,8 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol,
static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_policy->retry_timer_callback_pending = false;
if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr &&
error == GRPC_ERROR_NONE) {
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE &&
glb_policy->lb_calld == nullptr) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy);
}
@ -1149,15 +1201,7 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer");
}
static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
if (glb_policy->started_picking && glb_policy->updating_lb_call) {
if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
}
if (!glb_policy->shutting_down) start_picking_locked(glb_policy);
glb_policy->updating_lb_call = false;
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
static void start_lb_call_retry_timer_locked(glb_lb_policy* glb_policy) {
grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime();
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
@ -1165,10 +1209,10 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
if (timeout > 0) {
gpr_log(GPR_DEBUG,
"[grpclb %p] ... retry LB call after %" PRIuPTR "ms.",
"[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
glb_policy, timeout);
} else {
gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.",
gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
glb_policy);
}
}
@ -1179,54 +1223,33 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
glb_policy->retry_timer_callback_pending = true;
grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry);
}
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"lb_on_server_status_received_locked");
}
static void send_client_load_report_locked(void* arg, grpc_error* error);
static void maybe_send_client_load_report_locked(void* arg, grpc_error* error);
static void schedule_next_client_load_report(glb_lb_policy* glb_policy) {
static void schedule_next_client_load_report(glb_lb_call_data* lb_calld) {
const grpc_millis next_client_load_report_time =
grpc_core::ExecCtx::Get()->Now() +
glb_policy->client_stats_report_interval;
GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
send_client_load_report_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_timer_init(&glb_policy->client_load_report_timer,
grpc_core::ExecCtx::Get()->Now() + lb_calld->client_stats_report_interval;
GRPC_CLOSURE_INIT(
&lb_calld->client_load_report_closure,
maybe_send_client_load_report_locked, lb_calld,
grpc_combiner_scheduler(lb_calld->glb_policy->base.combiner));
grpc_timer_init(&lb_calld->client_load_report_timer,
next_client_load_report_time,
&glb_policy->client_load_report_closure);
&lb_calld->client_load_report_closure);
lb_calld->client_load_report_timer_callback_pending = true;
}
static void client_load_report_done_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
glb_policy->client_load_report_payload = nullptr;
if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) {
glb_policy->client_load_report_timer_callback_pending = false;
GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) {
maybe_restart_lb_call(glb_policy);
}
glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
glb_lb_policy* glb_policy = lb_calld->glb_policy;
grpc_byte_buffer_destroy(lb_calld->send_message_payload);
lb_calld->send_message_payload = nullptr;
if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
glb_lb_call_data_unref(lb_calld, "client_load_report");
return;
}
schedule_next_client_load_report(glb_policy);
}
static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) {
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = glb_policy->client_load_report_payload;
GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
client_load_report_done_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_call_error call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
if (call_error != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
schedule_next_client_load_report(lb_calld);
}
static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
@ -1241,258 +1264,256 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
(drop_entries == nullptr || drop_entries->num_entries == 0);
}
static void send_client_load_report_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) {
glb_policy->client_load_report_timer_callback_pending = false;
GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) {
maybe_restart_lb_call(glb_policy);
}
return;
}
static void send_client_load_report_locked(glb_lb_call_data* lb_calld) {
glb_lb_policy* glb_policy = lb_calld->glb_policy;
// Construct message payload.
GPR_ASSERT(glb_policy->client_load_report_payload == nullptr);
GPR_ASSERT(lb_calld->send_message_payload == nullptr);
grpc_grpclb_request* request =
grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
grpc_grpclb_load_report_request_create_locked(lb_calld->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (load_report_counters_are_zero(request)) {
if (glb_policy->last_client_load_report_counters_were_zero) {
if (lb_calld->last_client_load_report_counters_were_zero) {
grpc_grpclb_request_destroy(request);
schedule_next_client_load_report(glb_policy);
schedule_next_client_load_report(lb_calld);
return;
}
glb_policy->last_client_load_report_counters_were_zero = true;
lb_calld->last_client_load_report_counters_were_zero = true;
} else {
glb_policy->last_client_load_report_counters_were_zero = false;
lb_calld->last_client_load_report_counters_were_zero = false;
}
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->client_load_report_payload =
lb_calld->send_message_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
// Send the report.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = lb_calld->send_message_payload;
GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure,
client_load_report_done_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_calld->lb_call, &op, 1, &lb_calld->client_load_report_closure);
if (call_error != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
static void maybe_send_client_load_report_locked(void* arg, grpc_error* error) {
glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
glb_lb_policy* glb_policy = lb_calld->glb_policy;
lb_calld->client_load_report_timer_callback_pending = false;
if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
glb_lb_call_data_unref(lb_calld, "client_load_report");
return;
}
// If we've already sent the initial request, then we can go ahead and send
// the load report. Otherwise, we need to wait until the initial request has
// been sent to send this (see lb_on_sent_initial_request_locked() below).
if (glb_policy->initial_request_sent) {
do_send_client_load_report_locked(glb_policy);
// been sent to send this (see lb_on_sent_initial_request_locked()).
if (lb_calld->send_message_payload == nullptr) {
send_client_load_report_locked(lb_calld);
} else {
lb_calld->client_load_report_is_due = true;
}
}
static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error);
static void lb_on_server_status_received_locked(void* arg, grpc_error* error);
static void lb_on_response_received_locked(void* arg, grpc_error* error);
static void lb_call_init_locked(glb_lb_policy* glb_policy) {
static glb_lb_call_data* lb_call_data_create_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(!glb_policy->shutting_down);
// Init the LB call. Note that the LB call will progress every time there's
// activity in glb_policy->base.interested_parties, which is comprised of the
// polling entities from client_channel.
GPR_ASSERT(glb_policy->server_name != nullptr);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
GPR_ASSERT(glb_policy->lb_call == nullptr);
GPR_ASSERT(!glb_policy->shutting_down);
/* Note the following LB call progresses every time there's activity in \a
* glb_policy->base.interested_parties, which is comprised of the polling
* entities from \a client_channel. */
grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name);
grpc_millis deadline =
glb_policy->lb_call_timeout_ms == 0
? GRPC_MILLIS_INF_FUTURE
: grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms;
glb_policy->lb_call = grpc_channel_create_pollset_set_call(
glb_lb_call_data* lb_calld = (glb_lb_call_data*)gpr_zalloc(sizeof(*lb_calld));
lb_calld->lb_call = grpc_channel_create_pollset_set_call(
glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
&host, deadline, nullptr);
grpc_slice_unref_internal(host);
if (glb_policy->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
}
glb_policy->client_stats = grpc_grpclb_client_stats_create();
grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
// Init the LB call request payload.
grpc_grpclb_request* request =
grpc_grpclb_request_create(glb_policy->server_name);
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->lb_request_payload =
lb_calld->send_message_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
lb_on_sent_initial_request_locked, glb_policy,
// Init other data associated with the LB call.
lb_calld->glb_policy = glb_policy;
gpr_ref_init(&lb_calld->refs, 1);
grpc_metadata_array_init(&lb_calld->lb_initial_metadata_recv);
grpc_metadata_array_init(&lb_calld->lb_trailing_metadata_recv);
GRPC_CLOSURE_INIT(&lb_calld->lb_on_sent_initial_request,
lb_on_sent_initial_request_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received_locked, glb_policy,
GRPC_CLOSURE_INIT(&lb_calld->lb_on_response_received,
lb_on_response_received_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received,
lb_on_response_received_locked, glb_policy,
GRPC_CLOSURE_INIT(&lb_calld->lb_on_server_status_received,
lb_on_server_status_received_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_core::BackOff::Options backoff_options;
backoff_options
.set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
.set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
glb_policy->lb_call_backoff.Init(backoff_options);
glb_policy->initial_request_sent = false;
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
}
static void lb_call_destroy_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->lb_call != nullptr);
grpc_call_unref(glb_policy->lb_call);
glb_policy->lb_call = nullptr;
grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
grpc_slice_unref_internal(glb_policy->lb_call_status_details);
if (glb_policy->client_load_report_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->client_load_report_timer);
}
// Hold a ref to the glb_policy.
GRPC_LB_POLICY_REF(&glb_policy->base, "lb_calld");
return lb_calld;
}
/*
* Auxiliary functions and LB client callbacks.
*/
static void query_for_backends_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->lb_channel != nullptr);
if (glb_policy->shutting_down) return;
lb_call_init_locked(glb_policy);
// Init the LB call data.
GPR_ASSERT(glb_policy->lb_calld == nullptr);
glb_policy->lb_calld = lb_call_data_create_locked(glb_policy);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)",
glb_policy, glb_policy->lb_channel, glb_policy->lb_call);
"[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p, "
"lb_call: %p)",
glb_policy, glb_policy->lb_channel, glb_policy->lb_calld,
glb_policy->lb_calld->lb_call);
}
GPR_ASSERT(glb_policy->lb_call != nullptr);
GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
// Create the ops.
grpc_call_error call_error;
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
// Op: send initial metadata.
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
// Op: send request message.
GPR_ASSERT(glb_policy->lb_calld->send_message_payload != nullptr);
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message =
glb_policy->lb_calld->send_message_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
glb_lb_call_data_ref(glb_policy->lb_calld,
"lb_on_sent_initial_request_locked");
call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_calld->lb_on_sent_initial_request);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv initial metadata.
op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&glb_policy->lb_initial_metadata_recv;
&glb_policy->lb_calld->lb_initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
GPR_ASSERT(glb_policy->lb_request_payload != nullptr);
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = glb_policy->lb_request_payload;
// Op: recv response.
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message =
&glb_policy->lb_calld->recv_message_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
/* take a ref to be released in lb_on_sent_initial_request_locked() */
GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked");
glb_lb_call_data_ref(glb_policy->lb_calld, "lb_on_response_received_locked");
call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_sent_initial_request);
glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_calld->lb_on_response_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv server status.
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata =
&glb_policy->lb_trailing_metadata_recv;
op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
&glb_policy->lb_calld->lb_trailing_metadata_recv;
op->data.recv_status_on_client.status = &glb_policy->lb_calld->lb_call_status;
op->data.recv_status_on_client.status_details =
&glb_policy->lb_call_status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
/* take a ref to be released in lb_on_server_status_received_locked() */
GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked");
call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_server_status_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
&glb_policy->lb_calld->lb_call_status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
/* take a ref to be unref'd/reused in lb_on_response_received_locked() */
GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked");
// This callback signals the end of the LB call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received);
glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_calld->lb_on_server_status_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_policy->initial_request_sent = true;
glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
grpc_byte_buffer_destroy(lb_calld->send_message_payload);
lb_calld->send_message_payload = nullptr;
// If we attempted to send a client load report before the initial request was
// sent, send the load report now.
if (glb_policy->client_load_report_payload != nullptr) {
do_send_client_load_report_locked(glb_policy);
// sent (and this lb_calld is still in use), send the load report now.
if (lb_calld->client_load_report_is_due &&
lb_calld == lb_calld->glb_policy->lb_calld) {
send_client_load_report_locked(lb_calld);
lb_calld->client_load_report_is_due = false;
}
GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked");
glb_lb_call_data_unref(lb_calld, "lb_on_sent_initial_request_locked");
}
static void lb_on_response_received_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
glb_lb_policy* glb_policy = lb_calld->glb_policy;
// Empty payload means the LB call was cancelled.
if (lb_calld != glb_policy->lb_calld ||
lb_calld->recv_message_payload == nullptr) {
glb_lb_call_data_unref(lb_calld, "lb_on_response_received_locked");
return;
}
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op* op = ops;
if (glb_policy->lb_response_payload != nullptr) {
glb_policy->lb_call_backoff->Reset();
/* Received data from the LB server. Look inside
* glb_policy->lb_response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload);
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
grpc_grpclb_initial_response* response = nullptr;
if (!glb_policy->seen_initial_response &&
(response = grpc_grpclb_initial_response_parse(response_slice)) !=
grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
lb_calld->recv_message_payload = nullptr;
grpc_grpclb_initial_response* initial_response;
grpc_grpclb_serverlist* serverlist;
if (!lb_calld->seen_initial_response &&
(initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
nullptr) {
if (response->has_client_stats_report_interval) {
glb_policy->client_stats_report_interval = GPR_MAX(
// Have NOT seen initial response, look for initial response.
if (initial_response->has_client_stats_report_interval) {
lb_calld->client_stats_report_interval = GPR_MAX(
GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
&response->client_stats_report_interval));
&initial_response->client_stats_report_interval));
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Received initial LB response message; "
"client load reporting interval = %" PRIdPTR " milliseconds",
glb_policy, glb_policy->client_stats_report_interval);
glb_policy, lb_calld->client_stats_report_interval);
}
/* take a ref to be unref'd in send_client_load_report_locked() */
glb_policy->client_load_report_timer_callback_pending = true;
GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report");
schedule_next_client_load_report(glb_policy);
} else if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Received initial LB response message; client load "
"reporting NOT enabled",
glb_policy);
}
grpc_grpclb_initial_response_destroy(response);
glb_policy->seen_initial_response = true;
} else {
grpc_grpclb_serverlist* serverlist =
grpc_grpclb_response_parse_serverlist(response_slice);
if (serverlist != nullptr) {
GPR_ASSERT(glb_policy->lb_call != nullptr);
grpc_grpclb_initial_response_destroy(initial_response);
lb_calld->seen_initial_response = true;
} else if ((serverlist = grpc_grpclb_response_parse_serverlist(
response_slice)) != nullptr) {
// Have seen initial response, look for serverlist.
GPR_ASSERT(lb_calld->lb_call != nullptr);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Serverlist with %" PRIuPTR " servers received",
@ -1509,8 +1530,15 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
}
/* update serverlist */
if (serverlist->num_servers > 0) {
if (grpc_grpclb_serverlist_equals(glb_policy->serverlist,
serverlist)) {
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
if (lb_calld->client_stats_report_interval > 0 &&
lb_calld->client_stats == nullptr) {
lb_calld->client_stats = grpc_grpclb_client_stats_create();
glb_lb_call_data_ref(lb_calld, "client_load_report");
schedule_next_client_load_report(lb_calld);
}
if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Incoming server list identical to current, "
@ -1528,6 +1556,7 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
glb_policy->fallback_backend_addresses = nullptr;
if (glb_policy->fallback_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_fallback_timer);
glb_policy->fallback_timer_callback_pending = false;
}
}
/* and update the copy in the glb_lb_policy instance. This
@ -1539,43 +1568,73 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
}
} else {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Received empty server list, ignoring.",
gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
glb_policy);
}
grpc_grpclb_destroy_serverlist(serverlist);
}
} else { /* serverlist == nullptr */
} else {
// No valid initial response or serverlist found.
gpr_log(GPR_ERROR,
"[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
glb_policy,
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
}
}
grpc_slice_unref_internal(response_slice);
if (!glb_policy->shutting_down) {
/* keep listening for serverlist updates */
// Keep listening for serverlist updates.
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
op->data.recv_message.recv_message = &lb_calld->recv_message_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
/* reuse the "lb_on_response_received_locked" ref taken in
* query_for_backends_locked() */
// Reuse the "lb_on_response_received_locked" ref taken in
// query_for_backends_locked().
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received); /* loop */
lb_calld->lb_call, ops, (size_t)(op - ops),
&lb_calld->lb_on_response_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
} else {
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"lb_on_response_received_locked_shutdown");
glb_lb_call_data_unref(lb_calld,
"lb_on_response_received_locked+glb_shutdown");
}
} else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received_locked" ref taken in
* query_for_backends_locked() and reused in every reception loop */
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"lb_on_response_received_locked_empty_payload");
}
static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
glb_lb_policy* glb_policy = lb_calld->glb_policy;
GPR_ASSERT(lb_calld->lb_call != nullptr);
if (grpc_lb_glb_trace.enabled()) {
char* status_details =
grpc_slice_to_c_string(lb_calld->lb_call_status_details);
gpr_log(GPR_INFO,
"[grpclb %p] Status from LB server received. Status = %d, details "
"= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
lb_calld->glb_policy, lb_calld->lb_call_status, status_details,
lb_calld, lb_calld->lb_call, grpc_error_string(error));
gpr_free(status_details);
}
// If this lb_calld is still in use, this call ended because of a failure so
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
if (lb_calld == glb_policy->lb_calld) {
glb_policy->lb_calld = nullptr;
if (lb_calld->client_load_report_timer_callback_pending) {
grpc_timer_cancel(&lb_calld->client_load_report_timer);
}
GPR_ASSERT(!glb_policy->shutting_down);
if (lb_calld->seen_initial_response) {
// If we lose connection to the LB server, reset the backoff and restart
// the LB call immediately.
glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(glb_policy);
} else {
// If this LB call fails establishing any connection to the LB server,
// retry later.
start_lb_call_retry_timer_locked(glb_policy);
}
}
glb_lb_call_data_unref(lb_calld, "lb_call_ended");
}
static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
@ -1597,29 +1656,6 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
GPR_ASSERT(glb_policy->lb_call != nullptr);
if (grpc_lb_glb_trace.enabled()) {
char* status_details =
grpc_slice_to_c_string(glb_policy->lb_call_status_details);
gpr_log(GPR_INFO,
"[grpclb %p] Status from LB server received. Status = %d, Details "
"= '%s', (call: %p), error '%s'",
glb_policy, glb_policy->lb_call_status, status_details,
glb_policy->lb_call, grpc_error_string(error));
gpr_free(status_details);
}
/* We need to perform cleanups no matter what. */
lb_call_destroy_locked(glb_policy);
// If the load report timer is still pending, we wait for it to be
// called before restarting the call. Otherwise, we restart the call
// here.
if (!glb_policy->client_load_report_timer_callback_pending) {
maybe_restart_lb_call(glb_policy);
}
}
static void fallback_update_locked(glb_lb_policy* glb_policy,
const grpc_lb_addresses* addresses) {
GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
@ -1701,7 +1737,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
switch (glb_policy->lb_channel_connectivity) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
/* resub. */
// Keep watching the LB channel.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(glb_policy->lb_channel));
@ -1714,29 +1750,26 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
&glb_policy->lb_channel_on_connectivity_changed, nullptr);
break;
}
// The LB channel may be IDLE because it's shut down before the update.
// Restart the LB call to kick the LB channel into gear.
case GRPC_CHANNEL_IDLE:
// lb channel inactive (probably shutdown prior to update). Restart lb
// call to kick the lb channel into gear.
/* fallthrough */
case GRPC_CHANNEL_READY:
if (glb_policy->lb_call != nullptr) {
glb_policy->updating_lb_call = true;
grpc_call_cancel(glb_policy->lb_call, nullptr);
// lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
} else if (glb_policy->started_picking) {
if (glb_policy->lb_calld != nullptr) {
lb_call_data_shutdown(glb_policy);
}
if (glb_policy->started_picking) {
if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
}
start_picking_locked(glb_policy);
glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(glb_policy);
}
/* fallthrough */
// Fall through.
case GRPC_CHANNEL_SHUTDOWN:
done:
glb_policy->watching_lb_channel = false;
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"watch_lb_channel_connectivity_cb_shutdown");
break;
}
}
@ -1851,6 +1884,14 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
// Init LB call backoff option.
grpc_core::BackOff::Options backoff_options;
backoff_options
.set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
.set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
glb_policy->lb_call_backoff.Init(backoff_options);
return &glb_policy->base;
}

@ -220,7 +220,7 @@ class BalancerServiceImpl : public BalancerService {
if (client_load_reporting_interval_seconds_ > 0) {
request.Clear();
stream->Read(&request);
if (stream->Read(&request)) {
gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this,
request.DebugString().c_str());
GPR_ASSERT(request.has_client_stats());
@ -245,6 +245,7 @@ class BalancerServiceImpl : public BalancerService {
load_report_ready_ = true;
load_report_cond_.notify_one();
}
}
done:
gpr_log(GPR_INFO, "LB[%p]: done", this);
return Status::OK;

Loading…
Cancel
Save