Merge pull request #8685 from dgquintas/glb_locks

Fixed locking for grpclb
pull/8698/head^2
David G. Quintas 8 years ago committed by GitHub
commit d5e53b043c
  1. 15
      src/core/ext/lb_policy/grpclb/grpclb.c

@ -186,6 +186,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
* addresses failed to connect). There won't be any user_data/token
* available */
if (wc_arg->target != NULL) {
GPR_ASSERT(wc_arg->lb_token != NULL);
initial_metadata_add_lb_token(wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
@ -605,10 +606,10 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
* right grpclb status. */
rr_connectivity_data *rr_conn_data = arg;
glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
gpr_mu_lock(&glb_policy->mu);
if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN &&
!glb_policy->shutting_down) {
gpr_mu_lock(&glb_policy->mu);
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_conn_data->state, GRPC_ERROR_REF(error),
@ -617,12 +618,12 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
&rr_conn_data->on_change);
gpr_mu_unlock(&glb_policy->mu);
} else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectivity_cb");
gpr_free(rr_conn_data);
}
gpr_mu_unlock(&glb_policy->mu);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@ -1081,6 +1082,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
gpr_mu_lock(&glb_policy->mu);
if (glb_policy->lb_response_payload != NULL) {
gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
/* Received data from the LB server. Look inside
@ -1109,7 +1111,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
/* update serverlist */
if (serverlist->num_servers > 0) {
gpr_mu_lock(&glb_policy->mu);
if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
@ -1125,7 +1126,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
rr_handover_locked(exec_ctx, glb_policy, error);
}
gpr_mu_unlock(&glb_policy->mu);
} else {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
@ -1153,9 +1153,11 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
&glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
gpr_mu_unlock(&glb_policy->mu);
} else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
/* dispose of the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_response_received_empty_payload");
}
@ -1175,7 +1177,6 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
query_for_backends_locked(exec_ctx, glb_policy);
}
gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"grpclb_on_retry_timer");
}

Loading…
Cancel
Save