PR comments, take three

pull/8568/head
David Garcia Quintas 8 years ago
parent cbc059eb7e
commit e224a76a6e
  1. 6
      src/core/ext/client_channel/lb_policy.h
  2. 25
      src/core/ext/lb_policy/grpclb/grpclb.c
  3. 2
      src/core/ext/lb_policy/round_robin/round_robin.c

@ -109,10 +109,16 @@ struct grpc_lb_policy_vtable {
/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ /*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
/* Strong references: the policy will shutdown when they reach zero */
#define GRPC_LB_POLICY_REF(p, r) \ #define GRPC_LB_POLICY_REF(p, r) \
grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \ #define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \
grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
/* Weak references: they don't prevent the shutdown of the LB policy. When no
* strong references are left but there are still weak ones, shutdown is called.
* Once the weak reference also reaches zero, the LB policy is destroyed. */
#define GRPC_LB_POLICY_WEAK_REF(p, r) \ #define GRPC_LB_POLICY_WEAK_REF(p, r) \
grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r)) grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \ #define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \

@ -561,7 +561,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
rr_connectivity->state, GRPC_ERROR_REF(error), rr_connectivity->state, GRPC_ERROR_REF(error),
"rr_handover"); "rr_handover");
/* subscribe */ /* subscribe */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectiviby_cb"); GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state, &rr_connectivity->state,
&rr_connectivity->on_change); &rr_connectivity->on_change);
@ -609,15 +609,15 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* RR not shutting down. Mimic the RR's policy state */ /* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_conn_data->state, GRPC_ERROR_REF(error), rr_conn_data->state, GRPC_ERROR_REF(error),
"glb_rr_connectivity_changed"); "rr_connectivity_cb");
/* resubscribe */ /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state, &rr_conn_data->state,
&rr_conn_data->on_change); &rr_conn_data->on_change);
gpr_mu_unlock(&glb_policy->mu); gpr_mu_unlock(&glb_policy->mu);
} else { } else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectiviby_cb"); "rr_connectivity_cb");
gpr_free(rr_conn_data); gpr_free(rr_conn_data);
} }
} }
@ -1010,9 +1010,6 @@ static void lb_call_destroy(glb_lb_policy *glb_policy) {
static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) { glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->lb_channel != NULL); GPR_ASSERT(glb_policy->lb_channel != NULL);
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
* count goes to zero) to be unref'd in lb_on_server_status_received */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "query_for_backends");
lb_call_init(glb_policy); lb_call_init(glb_policy);
if (grpc_lb_glb_trace) { if (grpc_lb_glb_trace) {
@ -1056,6 +1053,9 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
op->flags = 0; op->flags = 0;
op->reserved = NULL; op->reserved = NULL;
op++; op++;
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
* count goes to zero) to be unref'd in lb_on_server_status_received */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received");
call_error = grpc_call_start_batch_and_execute( call_error = grpc_call_start_batch_and_execute(
exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_server_status_received); &glb_policy->lb_on_server_status_received);
@ -1067,6 +1067,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
op->flags = 0; op->flags = 0;
op->reserved = NULL; op->reserved = NULL;
op++; op++;
/* take another weak ref to be unref'd in lb_on_response_received */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received");
call_error = grpc_call_start_batch_and_execute( call_error = grpc_call_start_batch_and_execute(
exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received); &glb_policy->lb_on_response_received);
@ -1145,14 +1147,19 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
op->flags = 0; op->flags = 0;
op->reserved = NULL; op->reserved = NULL;
op++; op++;
/* reuse the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() */
const grpc_call_error call_error = grpc_call_start_batch_and_execute( const grpc_call_error call_error = grpc_call_start_batch_and_execute(
exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received); /* loop */ &glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error); GPR_ASSERT(GRPC_CALL_OK == call_error);
} }
return; } else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_response_received_empty_payload");
} }
/* else, empty payload: call cancelled. */
} }
static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,

@ -270,7 +270,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
subchannel_data *sd = p->subchannels[i]; subchannel_data *sd = p->subchannels[i];
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy");
if (sd->user_data != NULL) { if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable); GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(sd->user_data); sd->user_data_vtable->destroy(sd->user_data);
} }
gpr_free(sd); gpr_free(sd);

Loading…
Cancel
Save