Merge remote-tracking branch 'origin/lb_registry' into rr_with_registry

pull/3320/head
David Garcia Quintas 9 years ago
commit 3924fcb1c0
  1. 13
      src/core/client_config/lb_policies/pick_first.c
  2. 6
      src/core/client_config/lb_policy_factory.c
  3. 11
      src/core/client_config/lb_policy_factory.h
  4. 5
      src/core/client_config/lb_policy_registry.c
  5. 3
      src/core/client_config/lb_policy_registry.h
  6. 6
      src/core/client_config/resolvers/dns_resolver.c
  7. 5
      src/core/client_config/resolvers/sockaddr_resolver.c

@ -320,18 +320,17 @@ static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
grpc_subchannel **subchannels, grpc_lb_policy_args *args) {
size_t num_subchannels) {
pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
GPR_ASSERT(num_subchannels); GPR_ASSERT(args->num_subchannels > 0);
memset(p, 0, sizeof(*p)); memset(p, 0, sizeof(*p));
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
p->num_subchannels = num_subchannels; p->num_subchannels = args->num_subchannels;
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"pick_first"); "pick_first");
memcpy(p->subchannels, subchannels, memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * num_subchannels); sizeof(grpc_subchannel *) * args->num_subchannels);
grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
gpr_mu_init(&p->mu); gpr_mu_init(&p->mu);
return &p->base; return &p->base;

@ -41,9 +41,7 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory) {
} }
grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy(
grpc_lb_policy_factory *factory, grpc_subchannel **subchannels, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) {
size_t num_subchannels) {
if (factory == NULL) return NULL; if (factory == NULL) return NULL;
return factory->vtable->create_lb_policy(factory, subchannels, return factory->vtable->create_lb_policy(factory, args);
num_subchannels);
} }

@ -46,14 +46,18 @@ struct grpc_lb_policy_factory {
const grpc_lb_policy_factory_vtable *vtable; const grpc_lb_policy_factory_vtable *vtable;
}; };
typedef struct grpc_lb_policy_args {
grpc_subchannel **subchannels;
size_t num_subchannels;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable { struct grpc_lb_policy_factory_vtable {
void (*ref)(grpc_lb_policy_factory *factory); void (*ref)(grpc_lb_policy_factory *factory);
void (*unref)(grpc_lb_policy_factory *factory); void (*unref)(grpc_lb_policy_factory *factory);
/** Implementation of grpc_lb_policy_factory_create_lb_policy */ /** Implementation of grpc_lb_policy_factory_create_lb_policy */
grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory, grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory,
grpc_subchannel **subchannels, grpc_lb_policy_args *args);
size_t num_subchannels);
/** Name for the LB policy this factory implements */ /** Name for the LB policy this factory implements */
const char *name; const char *name;
@ -64,7 +68,6 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory);
/** Create a lb_policy instance. */ /** Create a lb_policy instance. */
grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy(
grpc_lb_policy_factory *factory, grpc_subchannel **subchannels, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args);
size_t num_subchannels);
#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_FACTORY_H */ #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_FACTORY_H */

@ -80,10 +80,9 @@ static grpc_lb_policy_factory *lookup_factory(const char* name) {
} }
grpc_lb_policy *grpc_lb_policy_create(const char *name, grpc_lb_policy *grpc_lb_policy_create(const char *name,
grpc_subchannel **subchannels, grpc_lb_policy_args *args) {
size_t num_subchannels) {
grpc_lb_policy_factory *factory = lookup_factory(name); grpc_lb_policy_factory *factory = lookup_factory(name);
grpc_lb_policy *lb_policy = grpc_lb_policy_factory_create_lb_policy( grpc_lb_policy *lb_policy = grpc_lb_policy_factory_create_lb_policy(
factory, subchannels, num_subchannels); factory, args);
return lb_policy; return lb_policy;
} }

@ -49,7 +49,6 @@ void grpc_register_lb_policy(grpc_lb_policy_factory *factory);
* If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init
* will be returned. */ * will be returned. */
grpc_lb_policy *grpc_lb_policy_create(const char *name, grpc_lb_policy *grpc_lb_policy_create(const char *name,
grpc_subchannel **subchannels, grpc_lb_policy_args *args);
size_t num_subchannels);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */ #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */

@ -134,6 +134,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
grpc_lb_policy *lb_policy; grpc_lb_policy *lb_policy;
size_t i; size_t i;
if (addresses) { if (addresses) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create(); config = grpc_client_config_create();
subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
for (i = 0; i < addresses->naddrs; i++) { for (i = 0; i < addresses->naddrs; i++) {
@ -143,8 +144,9 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
subchannels[i] = grpc_subchannel_factory_create_subchannel( subchannels[i] = grpc_subchannel_factory_create_subchannel(
r->subchannel_factory, &args); r->subchannel_factory, &args);
} }
lb_policy = grpc_lb_policy_create(r->lb_policy_name, subchannels, lb_policy_args.subchannels = subchannels;
addresses->naddrs); lb_policy_args.num_subchannels = addresses->naddrs;
lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(config, lb_policy); grpc_client_config_set_lb_policy(config, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "construction"); GRPC_LB_POLICY_UNREF(lb_policy, "construction");
grpc_resolved_addresses_destroy(addresses); grpc_resolved_addresses_destroy(addresses);

@ -121,6 +121,7 @@ static void sockaddr_next(grpc_resolver *resolver,
static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
grpc_client_config *cfg; grpc_client_config *cfg;
grpc_lb_policy *lb_policy; grpc_lb_policy *lb_policy;
grpc_lb_policy_args lb_policy_args;
grpc_subchannel **subchannels; grpc_subchannel **subchannels;
grpc_subchannel_args args; grpc_subchannel_args args;
@ -135,8 +136,10 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
subchannels[i] = grpc_subchannel_factory_create_subchannel( subchannels[i] = grpc_subchannel_factory_create_subchannel(
r->subchannel_factory, &args); r->subchannel_factory, &args);
} }
lb_policy_args.subchannels = subchannels;
lb_policy_args.num_subchannels = r->num_addrs;
lb_policy = lb_policy =
grpc_lb_policy_create(r->lb_policy_name, subchannels, r->num_addrs); grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
gpr_free(subchannels); gpr_free(subchannels);
grpc_client_config_set_lb_policy(cfg, lb_policy); grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "unix"); GRPC_LB_POLICY_UNREF(lb_policy, "unix");

Loading…
Cancel
Save