Pass channel args through resolver.

pull/8462/head
Mark D. Roth 8 years ago
parent 68469290d7
commit 98abfd3d64
  1. 6
      src/core/ext/client_config/client_channel.c
  2. 3
      src/core/ext/client_config/lb_policy_factory.h
  3. 5
      src/core/ext/client_config/resolver_factory.h
  4. 12
      src/core/ext/client_config/resolver_registry.c
  5. 3
      src/core/ext/client_config/resolver_registry.h
  6. 12
      src/core/ext/client_config/resolver_result.c
  7. 6
      src/core/ext/client_config/resolver_result.h
  8. 6
      src/core/ext/lb_policy/grpclb/grpclb.c
  9. 2
      src/core/ext/lb_policy/pick_first/pick_first.c
  10. 2
      src/core/ext/lb_policy/round_robin/round_robin.c
  11. 10
      src/core/ext/resolver/dns/native/dns_resolver.c
  12. 6
      src/core/ext/resolver/sockaddr/sockaddr_resolver.c
  13. 57
      src/core/ext/transport/chttp2/client/insecure/channel_create.c
  14. 66
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.c

@ -188,8 +188,8 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolver_result_get_server_name(chand->resolver_result); grpc_resolver_result_get_server_name(chand->resolver_result);
lb_policy_args.addresses = lb_policy_args.addresses =
grpc_resolver_result_get_addresses(chand->resolver_result); grpc_resolver_result_get_addresses(chand->resolver_result);
lb_policy_args.additional_args = lb_policy_args.args =
grpc_resolver_result_get_lb_policy_args(chand->resolver_result); grpc_resolver_result_get_channel_args(chand->resolver_result);
lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.client_channel_factory = chand->client_channel_factory;
// Special case: If all of the addresses are balancer addresses, // Special case: If all of the addresses are balancer addresses,
@ -227,7 +227,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
} }
const grpc_arg *channel_arg = grpc_channel_args_find( const grpc_arg *channel_arg = grpc_channel_args_find(
lb_policy_args.additional_args, GRPC_ARG_SERVICE_CONFIG); lb_policy_args.args, GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != NULL) { if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
method_config_table = grpc_method_config_table_ref( method_config_table = grpc_method_config_table_ref(

@ -93,8 +93,7 @@ typedef struct grpc_lb_policy_args {
const char *server_name; const char *server_name;
grpc_lb_addresses *addresses; grpc_lb_addresses *addresses;
grpc_client_channel_factory *client_channel_factory; grpc_client_channel_factory *client_channel_factory;
/* Can be used to pass implementation-specific parameters to the LB policy. */ grpc_channel_args *args;
grpc_channel_args *additional_args;
} grpc_lb_policy_args; } grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable { struct grpc_lb_policy_factory_vtable {

@ -47,7 +47,10 @@ struct grpc_resolver_factory {
const grpc_resolver_factory_vtable *vtable; const grpc_resolver_factory_vtable *vtable;
}; };
typedef struct grpc_resolver_args { grpc_uri *uri; } grpc_resolver_args; typedef struct grpc_resolver_args {
grpc_uri *uri;
const grpc_channel_args *args;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable { struct grpc_resolver_factory_vtable {
void (*ref)(grpc_resolver_factory *factory); void (*ref)(grpc_resolver_factory *factory);

@ -131,14 +131,16 @@ static grpc_resolver_factory *resolve_factory(const char *target,
return factory; return factory;
} }
grpc_resolver *grpc_resolver_create(const char *target) { grpc_resolver *grpc_resolver_create(const char *target,
const grpc_channel_args *args) {
grpc_uri *uri = NULL; grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri); grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver; grpc_resolver *resolver;
grpc_resolver_args args; grpc_resolver_args resolver_args;
memset(&args, 0, sizeof(args)); memset(&resolver_args, 0, sizeof(resolver_args));
args.uri = uri; resolver_args.uri = uri;
resolver = grpc_resolver_factory_create_resolver(factory, &args); resolver_args.args = args;
resolver = grpc_resolver_factory_create_resolver(factory, &resolver_args);
grpc_uri_destroy(uri); grpc_uri_destroy(uri);
return resolver; return resolver;
} }

@ -58,7 +58,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
If a resolver factory was found, use it to instantiate a resolver and If a resolver factory was found, use it to instantiate a resolver and
return it. return it.
If a resolver factory was not found, return NULL. */ If a resolver factory was not found, return NULL. */
grpc_resolver *grpc_resolver_create(const char *target); grpc_resolver *grpc_resolver_create(const char *target,
const grpc_channel_args* args);
/** Find a resolver factory given a name and return an (owned-by-the-caller) /** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */ * reference to it */

@ -43,19 +43,19 @@ struct grpc_resolver_result {
char* server_name; char* server_name;
grpc_lb_addresses* addresses; grpc_lb_addresses* addresses;
char* lb_policy_name; char* lb_policy_name;
grpc_channel_args* lb_policy_args; grpc_channel_args* channel_args;
}; };
grpc_resolver_result* grpc_resolver_result_create( grpc_resolver_result* grpc_resolver_result_create(
const char* server_name, grpc_lb_addresses* addresses, const char* server_name, grpc_lb_addresses* addresses,
const char* lb_policy_name, grpc_channel_args* lb_policy_args) { const char* lb_policy_name, grpc_channel_args* args) {
grpc_resolver_result* result = gpr_malloc(sizeof(*result)); grpc_resolver_result* result = gpr_malloc(sizeof(*result));
memset(result, 0, sizeof(*result)); memset(result, 0, sizeof(*result));
gpr_ref_init(&result->refs, 1); gpr_ref_init(&result->refs, 1);
result->server_name = gpr_strdup(server_name); result->server_name = gpr_strdup(server_name);
result->addresses = addresses; result->addresses = addresses;
result->lb_policy_name = gpr_strdup(lb_policy_name); result->lb_policy_name = gpr_strdup(lb_policy_name);
result->lb_policy_args = lb_policy_args; result->channel_args = args;
return result; return result;
} }
@ -69,7 +69,7 @@ void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
gpr_free(result->server_name); gpr_free(result->server_name);
grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */); grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */);
gpr_free(result->lb_policy_name); gpr_free(result->lb_policy_name);
grpc_channel_args_destroy(result->lb_policy_args); grpc_channel_args_destroy(result->channel_args);
gpr_free(result); gpr_free(result);
} }
} }
@ -88,7 +88,7 @@ const char* grpc_resolver_result_get_lb_policy_name(
return result->lb_policy_name; return result->lb_policy_name;
} }
grpc_channel_args* grpc_resolver_result_get_lb_policy_args( grpc_channel_args* grpc_resolver_result_get_channel_args(
grpc_resolver_result* result) { grpc_resolver_result* result) {
return result->lb_policy_args; return result->channel_args;
} }

@ -48,10 +48,10 @@
/// Results reported from a grpc_resolver. /// Results reported from a grpc_resolver.
typedef struct grpc_resolver_result grpc_resolver_result; typedef struct grpc_resolver_result grpc_resolver_result;
/// Takes ownership of \a addresses and \a lb_policy_args. /// Takes ownership of \a addresses and \a args.
grpc_resolver_result* grpc_resolver_result_create( grpc_resolver_result* grpc_resolver_result_create(
const char* server_name, grpc_lb_addresses* addresses, const char* server_name, grpc_lb_addresses* addresses,
const char* lb_policy_name, grpc_channel_args* lb_policy_args); const char* lb_policy_name, grpc_channel_args* args);
void grpc_resolver_result_ref(grpc_resolver_result* result); void grpc_resolver_result_ref(grpc_resolver_result* result);
void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
@ -63,7 +63,7 @@ grpc_lb_addresses* grpc_resolver_result_get_addresses(
grpc_resolver_result* result); grpc_resolver_result* result);
const char* grpc_resolver_result_get_lb_policy_name( const char* grpc_resolver_result_get_lb_policy_name(
grpc_resolver_result* result); grpc_resolver_result* result);
grpc_channel_args* grpc_resolver_result_get_lb_policy_args( grpc_channel_args* grpc_resolver_result_get_channel_args(
grpc_resolver_result* result); grpc_resolver_result* result);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */ #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */

@ -459,7 +459,7 @@ static grpc_lb_policy *create_rr_locked(
args.server_name = glb_policy->server_name; args.server_name = glb_policy->server_name;
args.client_channel_factory = glb_policy->cc_factory; args.client_channel_factory = glb_policy->cc_factory;
args.addresses = process_serverlist(serverlist); args.addresses = process_serverlist(serverlist);
args.additional_args = glb_policy->args; args.args = glb_policy->args;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
@ -587,7 +587,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* Create a client channel over them to communicate with a LB service */ * Create a client channel over them to communicate with a LB service */
glb_policy->server_name = gpr_strdup(args->server_name); glb_policy->server_name = gpr_strdup(args->server_name);
glb_policy->cc_factory = args->client_channel_factory; glb_policy->cc_factory = args->client_channel_factory;
glb_policy->args = grpc_channel_args_copy(args->additional_args); glb_policy->args = grpc_channel_args_copy(args->args);
GPR_ASSERT(glb_policy->cc_factory != NULL); GPR_ASSERT(glb_policy->cc_factory != NULL);
/* construct a target from the addresses in args, given in the form /* construct a target from the addresses in args, given in the form
@ -621,7 +621,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
/* will pick using pick_first */ /* will pick using pick_first */
glb_policy->lb_channel = grpc_client_channel_factory_create_channel( glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
exec_ctx, glb_policy->cc_factory, target_uri_str, exec_ctx, glb_policy->cc_factory, target_uri_str,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, args->args);
gpr_free(target_uri_str); gpr_free(target_uri_str);
for (size_t i = 0; i < num_grpclb_addrs; i++) { for (size_t i = 0; i < num_grpclb_addrs; i++) {

@ -466,7 +466,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
sc_args.addr = sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr); (struct sockaddr *)(&args->addresses->addresses[i].address.addr);
sc_args.addr_len = args->addresses->addresses[i].address.len; sc_args.addr_len = args->addresses->addresses[i].address.len;
sc_args.args = args->additional_args; sc_args.args = args->args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args); exec_ctx, args->client_channel_factory, &sc_args);

@ -629,7 +629,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sc_args.addr = sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr); (struct sockaddr *)(&args->addresses->addresses[i].address.addr);
sc_args.addr_len = args->addresses->addresses[i].address.len; sc_args.addr_len = args->addresses->addresses[i].address.len;
sc_args.args = args->additional_args; sc_args.args = args->args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args); exec_ctx, args->client_channel_factory, &sc_args);

@ -40,6 +40,7 @@
#include "src/core/ext/client_config/http_connect_handshaker.h" #include "src/core/ext/client_config/http_connect_handshaker.h"
#include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/resolver_registry.h" #include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/backoff.h" #include "src/core/lib/support/backoff.h"
@ -59,6 +60,8 @@ typedef struct {
char *name_to_resolve; char *name_to_resolve;
/** default port to use */ /** default port to use */
char *default_port; char *default_port;
/** channel args. */
grpc_channel_args *channel_args;
/** mutex guarding the rest of the state */ /** mutex guarding the rest of the state */
gpr_mu mu; gpr_mu mu;
@ -176,8 +179,9 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
NULL /* balancer_name */, NULL /* user_data */); NULL /* balancer_name */, NULL /* user_data */);
} }
grpc_resolved_addresses_destroy(r->addresses); grpc_resolved_addresses_destroy(r->addresses);
result = grpc_resolver_result_create(r->target_name, addresses, result = grpc_resolver_result_create(
NULL /* lb_policy_name */, NULL); r->target_name, addresses, NULL /* lb_policy_name */,
grpc_channel_args_copy(r->channel_args));
} else { } else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
@ -241,6 +245,7 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
gpr_free(r->target_name); gpr_free(r->target_name);
gpr_free(r->name_to_resolve); gpr_free(r->name_to_resolve);
gpr_free(r->default_port); gpr_free(r->default_port);
grpc_channel_args_destroy(r->channel_args);
gpr_free(r); gpr_free(r);
} }
@ -263,6 +268,7 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
r->target_name = gpr_strdup(path); r->target_name = gpr_strdup(path);
r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name; r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name;
r->default_port = gpr_strdup(default_port); r->default_port = gpr_strdup(default_port);
r->channel_args = grpc_channel_args_copy(args->args);
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
return &r->base; return &r->base;

@ -55,6 +55,8 @@ typedef struct {
char *target_name; char *target_name;
/** the addresses that we've 'resolved' */ /** the addresses that we've 'resolved' */
grpc_lb_addresses *addresses; grpc_lb_addresses *addresses;
/** channel args */
grpc_channel_args *channel_args;
/** mutex guarding the rest of the state */ /** mutex guarding the rest of the state */
gpr_mu mu; gpr_mu mu;
/** have we published? */ /** have we published? */
@ -121,7 +123,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
*r->target_result = grpc_resolver_result_create( *r->target_result = grpc_resolver_result_create(
r->target_name, r->target_name,
grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */), grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
NULL /* lb_policy_name */, NULL /* lb_policy_args */); NULL /* lb_policy_name */, grpc_channel_args_copy(r->channel_args));
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL; r->next_completion = NULL;
} }
@ -132,6 +134,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
gpr_mu_destroy(&r->mu); gpr_mu_destroy(&r->mu);
gpr_free(r->target_name); gpr_free(r->target_name);
grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */); grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
grpc_channel_args_destroy(r->channel_args);
gpr_free(r); gpr_free(r);
} }
@ -201,6 +204,7 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
memset(r, 0, sizeof(*r)); memset(r, 0, sizeof(*r));
r->target_name = gpr_strdup(args->uri->path); r->target_name = gpr_strdup(args->uri->path);
r->addresses = addresses; r->addresses = addresses;
r->channel_args = grpc_channel_args_copy(args->args);
gpr_mu_init(&r->mu); gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
return &r->base; return &r->base;

@ -52,6 +52,10 @@
#include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
//
// connector
//
typedef struct { typedef struct {
grpc_connector base; grpc_connector base;
gpr_refcount refs; gpr_refcount refs;
@ -157,35 +161,20 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
static const grpc_connector_vtable connector_vtable = { static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect}; connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct { //
grpc_client_channel_factory base; // client_channel_factory
gpr_refcount refs; //
grpc_channel_args *merge_args;
} client_channel_factory;
static void client_channel_factory_ref( static void client_channel_factory_ref(
grpc_client_channel_factory *cc_factory) { grpc_client_channel_factory *cc_factory) {}
client_channel_factory *f = (client_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
static void client_channel_factory_unref( static void client_channel_factory_unref(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {}
client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
static grpc_subchannel *client_channel_factory_create_subchannel( static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) { grpc_subchannel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c)); connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
grpc_subchannel *s;
memset(c, 0, sizeof(*c)); memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable; c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1); gpr_ref_init(&c->refs, 1);
@ -197,10 +186,8 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_http_connect_handshaker_create(proxy_name, args->server_name)); grpc_http_connect_handshaker_create(proxy_name, args->server_name));
gpr_free(proxy_name); gpr_free(proxy_name);
} }
args->args = final_args; grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base); grpc_connector_unref(exec_ctx, &c->base);
grpc_channel_args_destroy(final_args);
return s; return s;
} }
@ -208,12 +195,9 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type, const char *target, grpc_client_channel_type type,
grpc_channel_args *args) { grpc_channel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory; grpc_channel *channel = grpc_channel_create(exec_ctx, target, args,
grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
GRPC_CLIENT_CHANNEL, NULL); GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(final_args); grpc_resolver *resolver = grpc_resolver_create(target, args);
grpc_resolver *resolver = grpc_resolver_create(target);
if (!resolver) { if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel"); "client_channel_factory_create_channel");
@ -221,7 +205,7 @@ static grpc_channel *client_channel_factory_create_channel(
} }
grpc_client_channel_finish_initialization( grpc_client_channel_finish_initialization(
exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); exec_ctx, grpc_channel_get_channel_stack(channel), resolver, cc_factory);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel"); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
return channel; return channel;
@ -232,6 +216,9 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
client_channel_factory_create_subchannel, client_channel_factory_create_subchannel,
client_channel_factory_create_channel}; client_channel_factory_create_channel};
static grpc_client_channel_factory client_channel_factory = {
&client_channel_factory_vtable};
/* Create a client channel: /* Create a client channel:
Asynchronously: - resolve target Asynchronously: - resolve target
- connect to it (trying alternatives as presented) - connect to it (trying alternatives as presented)
@ -245,16 +232,12 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
(target, args, reserved)); (target, args, reserved));
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
client_channel_factory *f = gpr_malloc(sizeof(*f)); grpc_client_channel_factory *factory =
memset(f, 0, sizeof(*f)); (grpc_client_channel_factory*)&client_channel_factory;
f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
f->merge_args = grpc_channel_args_copy(args);
grpc_channel *channel = client_channel_factory_create_channel( grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
grpc_client_channel_factory_unref(&exec_ctx, &f->base); grpc_client_channel_factory_unref(&exec_ctx, factory);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
return channel != NULL ? channel : grpc_lame_client_channel_create( return channel != NULL ? channel : grpc_lame_client_channel_create(

@ -54,6 +54,10 @@
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
#include "src/core/lib/tsi/transport_security_interface.h" #include "src/core/lib/tsi/transport_security_interface.h"
//
// connector
//
typedef struct { typedef struct {
grpc_connector base; grpc_connector base;
gpr_refcount refs; gpr_refcount refs;
@ -216,10 +220,13 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
static const grpc_connector_vtable connector_vtable = { static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect}; connector_ref, connector_unref, connector_shutdown, connector_connect};
//
// client_channel_factory
//
typedef struct { typedef struct {
grpc_client_channel_factory base; grpc_client_channel_factory base;
gpr_refcount refs; gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector; grpc_channel_security_connector *security_connector;
} client_channel_factory; } client_channel_factory;
@ -235,7 +242,6 @@ static void client_channel_factory_unref(
if (gpr_unref(&f->refs)) { if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"client_channel_factory"); "client_channel_factory");
grpc_channel_args_destroy(f->merge_args);
gpr_free(f); gpr_free(f);
} }
} }
@ -245,9 +251,6 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_subchannel_args *args) { grpc_subchannel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory; client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c)); connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
grpc_subchannel *s;
memset(c, 0, sizeof(*c)); memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable; c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector; c->security_connector = f->security_connector;
@ -261,10 +264,8 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
} }
gpr_mu_init(&c->mu); gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1); gpr_ref_init(&c->refs, 1);
args->args = final_args; grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base); grpc_connector_unref(exec_ctx, &c->base);
grpc_channel_args_destroy(final_args);
return s; return s;
} }
@ -273,13 +274,9 @@ static grpc_channel *client_channel_factory_create_channel(
const char *target, grpc_client_channel_type type, const char *target, grpc_client_channel_type type,
grpc_channel_args *args) { grpc_channel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory; client_channel_factory *f = (client_channel_factory *)cc_factory;
grpc_channel *channel = grpc_channel_create(exec_ctx, target, args,
grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
GRPC_CLIENT_CHANNEL, NULL); GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(final_args); grpc_resolver *resolver = grpc_resolver_create(target, args);
grpc_resolver *resolver = grpc_resolver_create(target);
if (resolver != NULL) { if (resolver != NULL) {
grpc_client_channel_finish_initialization( grpc_client_channel_finish_initialization(
exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
@ -289,7 +286,6 @@ static grpc_channel *client_channel_factory_create_channel(
"client_channel_factory_create_channel"); "client_channel_factory_create_channel");
channel = NULL; channel = NULL;
} }
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"client_channel_factory_create_channel"); "client_channel_factory_create_channel");
return channel; return channel;
@ -308,19 +304,13 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
const char *target, const char *target,
const grpc_channel_args *args, const grpc_channel_args *args,
void *reserved) { void *reserved) {
grpc_arg connector_arg;
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *security_connector;
client_channel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE( GRPC_API_TRACE(
"grpc_secure_channel_create(creds=%p, target=%s, args=%p, " "grpc_secure_channel_create(creds=%p, target=%s, args=%p, "
"reserved=%p)", "reserved=%p)",
4, (creds, target, args, reserved)); 4, (creds, target, args, reserved));
GPR_ASSERT(reserved == NULL); GPR_ASSERT(reserved == NULL);
// Make sure security connector does not already exist in args.
if (grpc_find_security_connector_in_args(args) != NULL) { if (grpc_find_security_connector_in_args(args) != NULL) {
gpr_log(GPR_ERROR, "Cannot set security context in channel args."); gpr_log(GPR_ERROR, "Cannot set security context in channel args.");
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
@ -328,7 +318,9 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
target, GRPC_STATUS_INTERNAL, target, GRPC_STATUS_INTERNAL,
"Security connector exists in channel args."); "Security connector exists in channel args.");
} }
// Create security connector and construct new channel args.
grpc_channel_security_connector *security_connector;
grpc_channel_args *new_args_from_connector;
if (grpc_channel_credentials_create_security_connector( if (grpc_channel_credentials_create_security_connector(
creds, target, args, &security_connector, &new_args_from_connector) != creds, target, args, &security_connector, &new_args_from_connector) !=
GRPC_SECURITY_OK) { GRPC_SECURITY_OK) {
@ -336,32 +328,28 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
return grpc_lame_client_channel_create( return grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL, "Failed to create security connector."); target, GRPC_STATUS_INTERNAL, "Failed to create security connector.");
} }
grpc_arg connector_arg =
connector_arg = grpc_security_connector_to_arg(&security_connector->base); grpc_security_connector_to_arg(&security_connector->base);
args_copy = grpc_channel_args_copy_and_add( grpc_channel_args *new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args, new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1); &connector_arg, 1);
f = gpr_malloc(sizeof(*f));
memset(f, 0, sizeof(*f));
f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
f->merge_args = grpc_channel_args_copy(args_copy);
grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) { if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(new_args_from_connector); grpc_channel_args_destroy(new_args_from_connector);
} }
// Create client channel factory.
client_channel_factory *f = gpr_malloc(sizeof(*f));
memset(f, 0, sizeof(*f));
f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
"grpc_secure_channel_create"); "grpc_secure_channel_create");
f->security_connector = security_connector; f->security_connector = security_connector;
// Create channel.
grpc_channel *channel = client_channel_factory_create_channel( grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
// Clean up.
grpc_channel_args_destroy(new_args);
grpc_client_channel_factory_unref(&exec_ctx, &f->base); grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
return channel; /* may be NULL */ return channel; /* may be NULL */
} }

Loading…
Cancel
Save