Process updates immediately instead of waiting for a previous one to complete.

pull/12736/head
Mark D. Roth 7 years ago
parent 9078c82dcb
commit 97b6e5db9f
  1. 55
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -345,9 +345,6 @@ typedef struct glb_lb_policy {
/** are we currently updating lb_call? */
bool updating_lb_call;
/** are we currently updating lb_channel? */
bool updating_lb_channel;
/** are we already watching the LB channel's connectivity? */
bool watching_lb_channel;
@ -360,9 +357,6 @@ typedef struct glb_lb_policy {
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
/** args from the latest update received while already updating, or NULL */
grpc_lb_policy_args *pending_update_args;
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
@ -982,10 +976,6 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
grpc_subchannel_index_unref();
if (glb_policy->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
gpr_free(glb_policy->pending_update_args);
}
gpr_free(glb_policy);
}
@ -1752,45 +1742,22 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
const grpc_lb_addresses *addresses =
(const grpc_lb_addresses *)arg->value.pointer.p;
// If a non-empty serverlist hasn't been received from the balancer,
// propagate the update to fallback_backend_addresses.
if (glb_policy->serverlist == NULL) {
// If a non-empty serverlist hasn't been received from the balancer,
// propagate the update to fallback_backend_addresses.
fallback_update_locked(exec_ctx, glb_policy, addresses);
} else if (glb_policy->updating_lb_channel) {
// If we have recieved serverlist from the balancer, we need to defer update
// when there is an in-progress one.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"Update already in progress for grpclb %p. Deferring update.",
(void *)glb_policy);
}
if (glb_policy->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx,
glb_policy->pending_update_args->args);
gpr_free(glb_policy->pending_update_args);
}
glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc(
sizeof(*glb_policy->pending_update_args));
glb_policy->pending_update_args->client_channel_factory =
args->client_channel_factory;
glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
glb_policy->pending_update_args->combiner = args->combiner;
return;
}
glb_policy->updating_lb_channel = true;
GPR_ASSERT(glb_policy->lb_channel != NULL);
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.
grpc_channel_args *lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
/* Propagate updates to the LB channel (pick first) through the fake resolver
*/
grpc_fake_resolver_response_generator_set_response(
exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
// Start watching the LB channel connectivity for connection, if not
// already doing so.
if (!glb_policy->watching_lb_channel) {
// Watch the LB channel connectivity for connection.
glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
glb_policy->lb_channel, true /* try to connect */);
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
@ -1842,18 +1809,10 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
/* fallthrough */
case GRPC_CHANNEL_READY:
if (glb_policy->lb_call != NULL) {
glb_policy->updating_lb_channel = false;
glb_policy->updating_lb_call = true;
grpc_call_cancel(glb_policy->lb_call, NULL);
// lb_on_server_status_received will pick up the cancel and reinit
// lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
if (glb_policy->pending_update_args != NULL) {
grpc_lb_policy_args *args = glb_policy->pending_update_args;
glb_policy->pending_update_args = NULL;
glb_update_locked(exec_ctx, &glb_policy->base, args);
grpc_channel_args_destroy(exec_ctx, args->args);
gpr_free(args);
}
} else if (glb_policy->started_picking && !glb_policy->shutting_down) {
if (glb_policy->retry_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);

Loading…
Cancel
Save