diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/client_channel/lb_policy.h index 54ad7797920..120c641edc6 100644 --- a/src/core/ext/client_channel/lb_policy.h +++ b/src/core/ext/client_channel/lb_policy.h @@ -109,10 +109,16 @@ struct grpc_lb_policy_vtable { /*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG + +/* Strong references: the policy will shutdown when they reach zero */ #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \ grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) + +/* Weak references: they don't prevent the shutdown of the LB policy. When no + * strong references are left but there are still weak ones, shutdown is called. + * Once the weak reference also reaches zero, the LB policy is destroyed. */ #define GRPC_LB_POLICY_WEAK_REF(p, r) \ grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \ diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index bfcb9e6418c..451fbd65de3 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -561,7 +561,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, rr_connectivity->state, GRPC_ERROR_REF(error), "rr_handover"); /* subscribe */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectiviby_cb"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); @@ -609,15 +609,15 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_conn_data->state, GRPC_ERROR_REF(error), - "glb_rr_connectivity_changed"); - /* resubscribe */ + "rr_connectivity_cb"); + /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, &rr_conn_data->on_change); gpr_mu_unlock(&glb_policy->mu); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "rr_connectiviby_cb"); + "rr_connectivity_cb"); gpr_free(rr_conn_data); } } @@ -1010,9 +1010,6 @@ static void lb_call_destroy(glb_lb_policy *glb_policy) { static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); - /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref - * count goes to zero) to be unref'd in lb_on_server_status_received */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "query_for_backends"); lb_call_init(glb_policy); if (grpc_lb_glb_trace) { @@ -1056,6 +1053,9 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_server_status_received */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_server_status_received); @@ -1067,6 +1067,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; + /* take another weak ref to be unref'd in lb_on_response_received */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); @@ -1145,14 +1147,19 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, op->flags = 0; op->reserved = NULL; op++; + /* reuse the "lb_on_response_received" weak ref taken in + * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } - return; + } else { /* empty payload: call cancelled. */ + /* dispose of the "lb_on_response_received" weak ref taken in + * query_for_backends_locked() and reused in every reception loop */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_response_received_empty_payload"); } - /* else, empty payload: call cancelled. */ } static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 9b20edec92c..4700267ba2a 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -270,7 +270,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { subchannel_data *sd = p->subchannels[i]; GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable); + GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(sd->user_data); } gpr_free(sd);