Merge pull request #7962 from y-zeng/call_holder_add_pollent

Add calls' polling entities to the pollset_set of client_channel
pull/8350/merge
Yuchen Zeng 8 years ago committed by GitHub
commit f3d71d21b9
  1. 22
      src/core/ext/client_config/client_channel.c
  2. 6
      src/core/ext/client_config/lb_policy.h
  3. 11
      src/core/ext/lb_policy/grpclb/grpclb.c
  4. 12
      src/core/ext/lb_policy/pick_first/pick_first.c
  5. 12
      src/core/ext/lb_policy/round_robin/round_robin.c

@ -513,10 +513,14 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
call_data *calld = arg; grpc_call_element *elem = arg;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_mu_lock(&calld->mu); gpr_mu_lock(&calld->mu);
GPR_ASSERT(calld->creation_phase == GPR_ASSERT(calld->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (calld->connected_subchannel == NULL) { if (calld->connected_subchannel == NULL) {
gpr_atm_no_barrier_store(&calld->subchannel_call, 1); gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
@ -564,6 +568,9 @@ typedef struct {
grpc_closure closure; grpc_closure closure;
} continue_picking_args; } continue_picking_args;
/** Return true if subchannel is available immediately (in which case on_ready
should not be called), or false otherwise (in which case on_ready should be
called when the subchannel is available). */
static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata, grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags, uint32_t initial_metadata_flags,
@ -629,8 +636,8 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
gpr_mu_unlock(&chand->mu); gpr_mu_unlock(&chand->mu);
// TODO(dgq): make this deadline configurable somehow. // TODO(dgq): make this deadline configurable somehow.
const grpc_lb_policy_pick_args inputs = { const grpc_lb_policy_pick_args inputs = {
calld->pollent, initial_metadata, initial_metadata_flags, initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
&calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)}; gpr_inf_future(GPR_CLOCK_MONOTONIC)};
r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
NULL, on_ready); NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
@ -672,6 +679,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op); GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
/* try to (atomically) get the call */ /* try to (atomically) get the call */
@ -739,14 +747,20 @@ retry:
calld->connected_subchannel == NULL && calld->connected_subchannel == NULL &&
op->send_initial_metadata != NULL) { op->send_initial_metadata != NULL) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
grpc_closure_init(&calld->next_step, subchannel_ready, calld); grpc_closure_init(&calld->next_step, subchannel_ready, elem);
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
that IO of the lb_policy and resolver could be done under it. */
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags, op->send_initial_metadata_flags,
&calld->connected_subchannel, &calld->next_step, &calld->connected_subchannel, &calld->next_step,
GRPC_ERROR_NONE)) { GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
} }
} }
/* if we've got a subchannel, then let's ask it to create a call */ /* if we've got a subchannel, then let's ask it to create a call */

@ -35,7 +35,6 @@
#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H #define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H
#include "src/core/ext/client_config/subchannel.h" #include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which /** A load balancing policy: specified by a vtable and a struct (which
@ -55,8 +54,6 @@ struct grpc_lb_policy {
/** Extra arguments for an LB pick */ /** Extra arguments for an LB pick */
typedef struct grpc_lb_policy_pick_args { typedef struct grpc_lb_policy_pick_args {
/** Parties interested in the pick's progress */
grpc_polling_entity *pollent;
/** Initial metadata associated with the picking call. */ /** Initial metadata associated with the picking call. */
grpc_metadata_batch *initial_metadata; grpc_metadata_batch *initial_metadata;
/** Bitmask used for selective cancelling. See \a /** Bitmask used for selective cancelling. See \a
@ -153,7 +150,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
once the pick is complete with its error argument set to indicate once the pick is complete with its error argument set to indicate
success or failure. success or failure.
Any I/O should be done under \a pick_args->pollent. */ Any IO should be done under the \a interested_parties \a grpc_pollset_set
in the \a grpc_lb_policy struct. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args, const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data, grpc_connected_subchannel **target, void **user_data,

@ -458,6 +458,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
(intptr_t)glb_policy->rr_policy); (intptr_t)glb_policy->rr_policy);
} }
GPR_ASSERT(glb_policy->rr_policy != NULL); GPR_ASSERT(glb_policy->rr_policy != NULL);
grpc_pollset_set_add_pollset_set(exec_ctx,
glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity( glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity(
exec_ctx, glb_policy->rr_policy, &error); exec_ctx, glb_policy->rr_policy, &error);
grpc_lb_policy_notify_on_state_change( grpc_lb_policy_notify_on_state_change(
@ -686,8 +689,6 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
if (pp->target == target) { if (pp->target == target) {
grpc_polling_entity_del_from_pollset_set(
exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties);
*target = NULL; *target = NULL;
grpc_exec_ctx_sched( grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete, exec_ctx, &pp->wrapped_on_complete,
@ -719,8 +720,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next; pending_pick *next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) { initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(
exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched( grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete, exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
@ -806,8 +805,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} else { } else {
/* else, the pending pick will be registered and taken care of by the /* else, the pending pick will be registered and taken care of by the
* pending pick list inside the RR policy (glb_policy->rr_policy) */ * pending pick list inside the RR policy (glb_policy->rr_policy) */
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
glb_policy->base.interested_parties);
add_pending_pick(&glb_policy->pending_picks, pick_args, target, add_pending_pick(&glb_policy->pending_picks, pick_args, target,
on_complete); on_complete);
@ -931,7 +928,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
/* Note the following LB call progresses every time there's activity in \a /* Note the following LB call progresses every time there's activity in \a
* glb_policy->base.interested_parties, which is comprised of the polling * glb_policy->base.interested_parties, which is comprised of the polling
* entities passed to glb_pick(). */ * entities from \a client_channel. */
lb_client->lb_call = grpc_channel_create_pollset_set_call( lb_client->lb_call = grpc_channel_create_pollset_set_call(
glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties, glb_policy->base.interested_parties,

@ -39,7 +39,6 @@
typedef struct pending_pick { typedef struct pending_pick {
struct pending_pick *next; struct pending_pick *next;
grpc_polling_entity *pollent;
uint32_t initial_metadata_flags; uint32_t initial_metadata_flags;
grpc_connected_subchannel **target; grpc_connected_subchannel **target;
grpc_closure *on_complete; grpc_closure *on_complete;
@ -119,8 +118,6 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
*pp->target = NULL; *pp->target = NULL;
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp); gpr_free(pp);
pp = next; pp = next;
@ -138,8 +135,6 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
if (pp->target == target) { if (pp->target == target) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL; *target = NULL;
grpc_exec_ctx_sched( grpc_exec_ctx_sched(
exec_ctx, pp->on_complete, exec_ctx, pp->on_complete,
@ -168,8 +163,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next; pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) { initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
grpc_exec_ctx_sched( grpc_exec_ctx_sched(
exec_ctx, pp->on_complete, exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
@ -229,11 +222,8 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) { if (!p->started_picking) {
start_picking(exec_ctx, p); start_picking(exec_ctx, p);
} }
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp)); pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks; pp->next = p->pending_picks;
pp->pollent = pick_args->pollent;
pp->target = target; pp->target = target;
pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete; pp->on_complete = on_complete;
@ -319,8 +309,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) { while ((pp = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
*pp->target = selected; *pp->target = selected;
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp); gpr_free(pp);
} }

@ -78,9 +78,6 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick { typedef struct pending_pick {
struct pending_pick *next; struct pending_pick *next;
/* polling entity for the pick()'s async notification */
grpc_polling_entity *pollent;
/* output argument where to store the pick()ed user_data. It'll be NULL if no /* output argument where to store the pick()ed user_data. It'll be NULL if no
* such data is present or there's an error (the definite test for errors is * such data is present or there's an error (the definite test for errors is
* \a target being NULL). */ * \a target being NULL). */
@ -318,8 +315,6 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
if (pp->target == target) { if (pp->target == target) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL; *target = NULL;
grpc_exec_ctx_sched( grpc_exec_ctx_sched(
exec_ctx, pp->on_complete, exec_ctx, pp->on_complete,
@ -348,8 +343,6 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next; pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) { initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*pp->target = NULL; *pp->target = NULL;
grpc_exec_ctx_sched( grpc_exec_ctx_sched(
exec_ctx, pp->on_complete, exec_ctx, pp->on_complete,
@ -422,11 +415,8 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) { if (!p->started_picking) {
start_picking(exec_ctx, p); start_picking(exec_ctx, p);
} }
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp)); pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks; pp->next = p->pending_picks;
pp->pollent = pick_args->pollent;
pp->target = target; pp->target = target;
pp->on_complete = on_complete; pp->on_complete = on_complete;
pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->initial_metadata_flags = pick_args->initial_metadata_flags;
@ -482,8 +472,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
(void *)selected->subchannel, (void *)selected); (void *)selected->subchannel, (void *)selected);
} }
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp); gpr_free(pp);
} }

Loading…
Cancel
Save