From 19656b1d5923809a46f3e0075fdcd694607d07ca Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 1 Sep 2016 18:00:45 -0700 Subject: [PATCH 1/6] Add call_data's pollent to channel_data's interested_parties --- src/core/ext/client_config/client_channel.c | 27 ++++++++++++++----- src/core/ext/client_config/lb_policy.c | 3 +-- src/core/ext/client_config/lb_policy.h | 6 ++--- src/core/ext/lb_policy/grpclb/grpclb.c | 27 ++++++------------- .../ext/lb_policy/pick_first/pick_first.c | 13 --------- .../ext/lb_policy/round_robin/round_robin.c | 11 -------- 6 files changed, 32 insertions(+), 55 deletions(-) diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 61e012578e0..b5b6cc4df0b 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -456,10 +456,16 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - call_data *calld = arg; + grpc_call_element *elem = arg; + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); + // gpr_mu_lock(&chand->mu); + grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, + chand->interested_parties); + // gpr_mu_unlock(&chand->mu); calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (calld->connected_subchannel == NULL) { gpr_atm_no_barrier_store(&calld->subchannel_call, 1); @@ -536,7 +542,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; continue_picking_args *cpa; grpc_closure *closure; @@ -566,9 +571,9 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, int r; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); gpr_mu_unlock(&chand->mu); - r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent, - initial_metadata, initial_metadata_flags, - connected_subchannel, on_ready); + r = grpc_lb_policy_pick(exec_ctx, lb_policy, initial_metadata, + initial_metadata_flags, connected_subchannel, + on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GPR_TIMER_END("pick_subchannel", 0); return r; @@ -608,6 +613,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); /* try to (atomically) get the call */ grpc_subchannel_call *call = GET_CALL(calld); @@ -668,13 +674,22 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, calld); + grpc_closure_init(&calld->next_step, subchannel_ready, elem); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); + // grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, + // chand->interested_parties); if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, &calld->connected_subchannel, &calld->next_step)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + // grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, + // chand->interested_parties); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); + } else { + // gpr_mu_lock(&chand->mu); + grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, + chand->interested_parties); + // gpr_mu_unlock(&chand->mu); } } /* if we've got a subchannel, then let's ask it to create a call */ diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c index 8b980b2cca2..0e02de04ed2 100644 --- a/src/core/ext/client_config/lb_policy.c +++ b/src/core/ext/client_config/lb_policy.c @@ -100,12 +100,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, } int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_polling_entity *pollent, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, grpc_closure *on_complete) { - return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata, + return policy->vtable->pick(exec_ctx, policy, initial_metadata, initial_metadata_flags, target, on_complete); } diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index a2f5446fc6c..b92a73c09aa 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -35,7 +35,6 @@ #define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H #include "src/core/ext/client_config/subchannel.h" -#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" /** A load balancing policy: specified by a vtable and a struct (which @@ -60,7 +59,6 @@ struct grpc_lb_policy_vtable { /** implement grpc_lb_policy_pick */ int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_polling_entity *pollent, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, grpc_closure *on_complete); @@ -127,9 +125,9 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting \a target. - Picking can be asynchronous. Any IO should be done under \a pollent. */ + Picking can be asynchronous. Any IO should be done under the pollset_set + interested_parties. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_polling_entity *pollent, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index af913d8a9df..515d379dbb4 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -158,9 +158,6 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, typedef struct pending_pick { struct pending_pick *next; - /* polling entity for the pick()'s async notification */ - grpc_polling_entity *pollent; - /* the initial metadata for the pick. See grpc_lb_policy_pick() */ grpc_metadata_batch *initial_metadata; @@ -180,7 +177,7 @@ typedef struct pending_pick { wrapped_rr_closure_arg wrapped_on_complete_arg; } pending_pick; -static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent, +static void add_pending_pick(pending_pick **root, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, @@ -189,7 +186,6 @@ static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent, memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; - pp->pollent = pollent; pp->target = target; pp->initial_metadata = initial_metadata; pp->initial_metadata_flags = initial_metadata_flags; @@ -359,9 +355,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent, - pp->initial_metadata, pp->initial_metadata_flags, - pp->target, &pp->wrapped_on_complete); + grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->initial_metadata, + pp->initial_metadata_flags, pp->target, + &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; } @@ -541,8 +537,6 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pollent, glb_policy->base.interested_parties); *target = NULL; grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CANCELLED, NULL); @@ -572,8 +566,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pollent, glb_policy->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CANCELLED, NULL); gpr_free(pp); @@ -603,7 +595,6 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, @@ -623,8 +614,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, glb_policy->wc_arg.wrapped_closure = on_complete; grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, &glb_policy->wc_arg); - r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent, - initial_metadata, initial_metadata_flags, target, + r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, initial_metadata, + initial_metadata_flags, target, &glb_policy->wrapped_on_complete); if (r != 0) { /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR @@ -639,9 +630,7 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_NONE, NULL); } } else { - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, - glb_policy->base.interested_parties); - add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata, + add_pending_pick(&glb_policy->pending_picks, initial_metadata, initial_metadata_flags, target, on_complete); if (!glb_policy->started_picking) { @@ -769,7 +758,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling - * entities passed to glb_pick(). */ + * entities from client_channel. */ lb_client->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, "/BalanceLoad", diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 9decf70692a..a1a438c1df7 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -39,7 +39,6 @@ typedef struct pending_pick { struct pending_pick *next; - grpc_polling_entity *pollent; uint32_t initial_metadata_flags; grpc_connected_subchannel **target; grpc_closure *on_complete; @@ -119,8 +118,6 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); pp = next; @@ -137,8 +134,6 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); *target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE("Pick Cancelled"), NULL); @@ -164,8 +159,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE("Pick Cancelled"), NULL); gpr_free(pp); @@ -199,7 +192,6 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, @@ -225,11 +217,8 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, - p->base.interested_parties); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollent = pollent; pp->target = target; pp->initial_metadata_flags = initial_metadata_flags; pp->on_complete = on_complete; @@ -315,8 +304,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 7bcf608ab9b..9b557f423e6 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -76,7 +76,6 @@ int grpc_lb_round_robin_trace = 0; * Once a pick is available, \a target is updated and \a on_complete called. */ typedef struct pending_pick { struct pending_pick *next; - grpc_polling_entity *pollent; uint32_t initial_metadata_flags; grpc_connected_subchannel **target; grpc_closure *on_complete; @@ -290,8 +289,6 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); *target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); @@ -317,8 +314,6 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); @@ -361,7 +356,6 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, @@ -385,11 +379,8 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, - p->base.interested_parties); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollent = pollent; pp->target = target; pp->on_complete = on_complete; pp->initial_metadata_flags = initial_metadata_flags; @@ -440,8 +431,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } From 9a0e63c1345711acb28288bd68ccd87cd032960b Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 1 Sep 2016 18:04:32 -0700 Subject: [PATCH 2/6] Remove unnecessary locks on channel_data's mutex --- src/core/ext/client_config/client_channel.c | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index b5b6cc4df0b..8f2311a6a8b 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -462,10 +462,8 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); - // gpr_mu_lock(&chand->mu); grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); - // gpr_mu_unlock(&chand->mu); calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (calld->connected_subchannel == NULL) { gpr_atm_no_barrier_store(&calld->subchannel_call, 1); @@ -676,20 +674,14 @@ retry: calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; grpc_closure_init(&calld->next_step, subchannel_ready, elem); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); - // grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, - // chand->interested_parties); if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, &calld->connected_subchannel, &calld->next_step)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - // grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, - // chand->interested_parties); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { - // gpr_mu_lock(&chand->mu); grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); - // gpr_mu_unlock(&chand->mu); } } /* if we've got a subchannel, then let's ask it to create a call */ From 144ce657c6a2cc81104007509226c54817003b7c Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 1 Sep 2016 18:19:34 -0700 Subject: [PATCH 3/6] Add comments for pick_subchannel --- src/core/ext/client_config/client_channel.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 8f2311a6a8b..dfa9746004e 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -511,6 +511,9 @@ typedef struct { grpc_closure closure; } continue_picking_args; +/** Return true if subchannel is available immediately (in which case on_ready + should not be called), or false otherwise (in which case on_ready should be + called when the subchannel is available). */ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, @@ -674,6 +677,9 @@ retry: calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; grpc_closure_init(&calld->next_step, subchannel_ready, elem); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); + /* If a subchannel is not available immediately, the polling entity from + call_data should be provided to channel_data's interested_parties, so + that IO of the lb_policy and resolver could be done under it. */ if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, &calld->connected_subchannel, &calld->next_step)) { From b4291642758342a596a11146bef88b4aaf059a76 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 1 Sep 2016 19:17:14 -0700 Subject: [PATCH 4/6] Fix grpclb --- src/core/ext/lb_policy/grpclb/grpclb.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 515d379dbb4..2a23011e819 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -335,6 +335,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, (intptr_t)glb_policy->rr_policy); } GPR_ASSERT(glb_policy->rr_policy != NULL); + grpc_pollset_set_add_pollset_set(exec_ctx, + glb_policy->rr_policy->interested_parties, + glb_policy->base.interested_parties); glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity( exec_ctx, glb_policy->rr_policy, &error); grpc_lb_policy_notify_on_state_change( From f7c45aea5367c1340b1c05b6011528f2f9e39d43 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 15 Sep 2016 13:40:32 -0700 Subject: [PATCH 5/6] Address review comments --- src/core/ext/client_config/lb_policy.h | 4 ++-- src/core/ext/lb_policy/grpclb/grpclb.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index b92a73c09aa..a244b06ec66 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -125,8 +125,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting \a target. - Picking can be asynchronous. Any IO should be done under the pollset_set - interested_parties. */ + Picking can be asynchronous. Any IO should be done under the \a + interested_parties \a grpc_pollset_set in the \a grpc_lb_policy struct. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 2a23011e819..a9e329ed6d4 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -761,7 +761,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling - * entities from client_channel. */ + * entities from \a client_channel. */ lb_client->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, "/BalanceLoad", From 6222926d682600e840cc5f421a07eb56ce8d4f00 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 15 Sep 2016 16:08:10 -0700 Subject: [PATCH 6/6] Remove pollent from grpc_lb_policy_pick_args --- src/core/ext/client_config/client_channel.c | 6 +++--- src/core/ext/client_config/lb_policy.h | 2 -- src/core/ext/lb_policy/grpclb/grpclb.c | 8 +++----- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 42050d7043e..c16e9a6765e 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -545,6 +545,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; continue_picking_args *cpa; grpc_closure *closure; @@ -574,9 +575,8 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, int r; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); gpr_mu_unlock(&chand->mu); - const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata, - initial_metadata_flags, - &calld->lb_token_mdelem}; + const grpc_lb_policy_pick_args inputs = { + initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem}; r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index ef412b7ca3f..ac387e4b80c 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -54,8 +54,6 @@ struct grpc_lb_policy { /** Extra arguments for an LB pick */ typedef struct grpc_lb_policy_pick_args { - /** Parties interested in the pick's progress */ - grpc_polling_entity *pollent; /** Initial metadata associated with the picking call. */ grpc_metadata_batch *initial_metadata; /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */ diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index b44ca1b6b29..f213342ecb4 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -500,9 +500,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - const grpc_lb_policy_pick_args pick_args = { - pp->pollent, pp->initial_metadata, pp->initial_metadata_flags, - pp->lb_token_mdelem_storage}; + const grpc_lb_policy_pick_args pick_args = {pp->initial_metadata, + pp->initial_metadata_flags, + pp->lb_token_mdelem_storage}; grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target, (void **)&pp->wrapped_on_complete_arg.lb_token, &pp->wrapped_on_complete); @@ -806,8 +806,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); } } else { - grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, - glb_policy->base.interested_parties); add_pending_pick(&glb_policy->pending_picks, pick_args, target, on_complete);