Convert client channel factory to C++

pull/18041/head
Mark D. Roth 6 years ago
parent a4b8667de9
commit 251d66aac6
  1. 14
      src/core/ext/filters/client_channel/README.md
  2. 29
      src/core/ext/filters/client_channel/client_channel.cc
  3. 56
      src/core/ext/filters/client_channel/client_channel_factory.cc
  4. 57
      src/core/ext/filters/client_channel/client_channel_factory.h
  5. 6
      src/core/ext/filters/client_channel/lb_policy.h
  6. 8
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  7. 8
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  8. 4
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  9. 90
      src/core/ext/transport/chttp2/client/insecure/channel_create.cc
  10. 267
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
  11. 4
      test/core/util/test_lb_policies.cc
  12. 32
      test/cpp/microbenchmarks/bm_call_create.cc

@ -4,7 +4,7 @@ Client Configuration Support for GRPC
This library provides high level configuration machinery to construct client This library provides high level configuration machinery to construct client
channels and load balance between them. channels and load balance between them.
Each grpc_channel is created with a grpc_resolver. It is the resolver's duty Each `grpc_channel` is created with a `Resolver`. It is the resolver's duty
to resolve a name into a set of arguments for the channel. Such arguments to resolve a name into a set of arguments for the channel. Such arguments
might include: might include:
@ -12,7 +12,7 @@ might include:
- a load balancing policy to decide which server to send a request to - a load balancing policy to decide which server to send a request to
- a set of filters to mutate outgoing requests (say, by adding metadata) - a set of filters to mutate outgoing requests (say, by adding metadata)
The resolver provides this data as a stream of grpc_channel_args objects to The resolver provides this data as a stream of `grpc_channel_args` objects to
the channel. We represent arguments as a stream so that they can be changed the channel. We represent arguments as a stream so that they can be changed
by the resolver during execution, by reacting to external events (such as by the resolver during execution, by reacting to external events (such as
new service configuration data being pushed to some store). new service configuration data being pushed to some store).
@ -21,11 +21,11 @@ new service configuration data being pushed to some store).
Load Balancing Load Balancing
-------------- --------------
Load balancing configuration is provided by a grpc_lb_policy object. Load balancing configuration is provided by a `LoadBalancingPolicy` object.
The primary job of the load balancing policies is to pick a target server The primary job of the load balancing policies is to pick a target server
given only the initial metadata for a request. It does this by providing given only the initial metadata for a request. It does this by providing
a grpc_subchannel object to the owning channel. a `ConnectedSubchannel` object to the owning channel.
Sub-Channels Sub-Channels
@ -38,9 +38,9 @@ decisions (for example, by avoiding disconnected backends).
Configured sub-channels are fully setup to participate in the grpc data plane. Configured sub-channels are fully setup to participate in the grpc data plane.
Their behavior is specified by a set of grpc channel filters defined at their Their behavior is specified by a set of grpc channel filters defined at their
construction. To customize this behavior, resolvers build construction. To customize this behavior, transports build
grpc_client_channel_factory objects, which use the decorator pattern to customize `ClientChannelFactory` objects, which customize construction arguments for
construction arguments for concrete grpc_subchannel instances. concrete subchannel instances.
Naming for GRPC Naming for GRPC

@ -107,8 +107,8 @@ typedef struct client_channel_channel_data {
grpc_channel_stack* owning_stack; grpc_channel_stack* owning_stack;
/** interested parties (owned) */ /** interested parties (owned) */
grpc_pollset_set* interested_parties; grpc_pollset_set* interested_parties;
// Client channel factory. Holds a ref. // Client channel factory.
grpc_client_channel_factory* client_channel_factory; grpc_core::ClientChannelFactory* client_channel_factory;
// Subchannel pool. // Subchannel pool.
grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool; grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
@ -205,16 +205,15 @@ class ClientChannelControlHelper
chand_->subchannel_pool.get()); chand_->subchannel_pool.get());
grpc_channel_args* new_args = grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, &arg, 1); grpc_channel_args_copy_and_add(&args, &arg, 1);
Subchannel* subchannel = grpc_client_channel_factory_create_subchannel( Subchannel* subchannel =
chand_->client_channel_factory, new_args); chand_->client_channel_factory->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args); grpc_channel_args_destroy(new_args);
return subchannel; return subchannel;
} }
grpc_channel* CreateChannel(const char* target, grpc_client_channel_type type, grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override { const grpc_channel_args& args) override {
return grpc_client_channel_factory_create_channel( return chand_->client_channel_factory->CreateChannel(target, &args);
chand_->client_channel_factory, target, type, &args);
} }
void UpdateState( void UpdateState(
@ -420,19 +419,12 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
chand->enable_retries = grpc_channel_arg_get_bool(arg, true); chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
// Record client channel factory. // Record client channel factory.
arg = grpc_channel_args_find(args->channel_args, chand->client_channel_factory =
GRPC_ARG_CLIENT_CHANNEL_FACTORY); grpc_core::ClientChannelFactory::GetFromChannelArgs(args->channel_args);
if (arg == nullptr) { if (chand->client_channel_factory == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING( return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Missing client channel factory in args for client channel filter"); "Missing client channel factory in args for client channel filter");
} }
if (arg->type != GRPC_ARG_POINTER) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"client channel factory arg must be a pointer");
}
chand->client_channel_factory =
static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
grpc_client_channel_factory_ref(chand->client_channel_factory);
// Get server name to resolve, using proxy mapper if needed. // Get server name to resolve, using proxy mapper if needed.
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
if (arg == nullptr) { if (arg == nullptr) {
@ -509,9 +501,6 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
// longer be any need to explicitly reset these smart pointer data members. // longer be any need to explicitly reset these smart pointer data members.
chand->picker.reset(); chand->picker.reset();
chand->subchannel_pool.reset(); chand->subchannel_pool.reset();
if (chand->client_channel_factory != nullptr) {
grpc_client_channel_factory_unref(chand->client_channel_factory);
}
chand->info_lb_policy_name.reset(); chand->info_lb_policy_name.reset();
chand->info_service_config_json.reset(); chand->info_service_config_json.reset();
chand->retry_throttle_data.reset(); chand->retry_throttle_data.reset();

@ -21,47 +21,35 @@
#include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory) { // Channel arg key for client channel factory.
factory->vtable->ref(factory); #define GRPC_ARG_CLIENT_CHANNEL_FACTORY "grpc.client_channel_factory"
}
void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory) { namespace grpc_core {
factory->vtable->unref(factory);
}
grpc_core::Subchannel* grpc_client_channel_factory_create_subchannel( namespace {
grpc_client_channel_factory* factory, const grpc_channel_args* args) {
return factory->vtable->create_subchannel(factory, args);
}
grpc_channel* grpc_client_channel_factory_create_channel( void* factory_arg_copy(void* f) { return f; }
grpc_client_channel_factory* factory, const char* target, void factory_arg_destroy(void* f) {}
grpc_client_channel_type type, const grpc_channel_args* args) { int factory_arg_cmp(void* factory1, void* factory2) {
return factory->vtable->create_client_channel(factory, target, type, args); return GPR_ICMP(factory1, factory2);
} }
const grpc_arg_pointer_vtable factory_arg_vtable = {
factory_arg_copy, factory_arg_destroy, factory_arg_cmp};
static void* factory_arg_copy(void* factory) { } // namespace
grpc_client_channel_factory_ref(
static_cast<grpc_client_channel_factory*>(factory));
return factory;
}
static void factory_arg_destroy(void* factory) { grpc_arg ClientChannelFactory::CreateChannelArg(ClientChannelFactory* factory) {
grpc_client_channel_factory_unref( return grpc_channel_arg_pointer_create(
static_cast<grpc_client_channel_factory*>(factory)); const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL_FACTORY), factory,
&factory_arg_vtable);
} }
static int factory_arg_cmp(void* factory1, void* factory2) { ClientChannelFactory* ClientChannelFactory::GetFromChannelArgs(
if (factory1 < factory2) return -1; const grpc_channel_args* args) {
if (factory1 > factory2) return 1; const grpc_arg* arg =
return 0; grpc_channel_args_find(args, GRPC_ARG_CLIENT_CHANNEL_FACTORY);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) return nullptr;
return static_cast<ClientChannelFactory*>(arg->value.pointer.p);
} }
static const grpc_arg_pointer_vtable factory_arg_vtable = { } // namespace grpc_core
factory_arg_copy, factory_arg_destroy, factory_arg_cmp};
grpc_arg grpc_client_channel_factory_create_channel_arg(
grpc_client_channel_factory* factory) {
return grpc_channel_arg_pointer_create((char*)GRPC_ARG_CLIENT_CHANNEL_FACTORY,
factory, &factory_arg_vtable);
}

@ -24,51 +24,32 @@
#include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/grpc_types.h>
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/gprpp/abstract.h"
// Channel arg key for client channel factory. namespace grpc_core {
#define GRPC_ARG_CLIENT_CHANNEL_FACTORY "grpc.client_channel_factory"
typedef struct grpc_client_channel_factory grpc_client_channel_factory; class ClientChannelFactory {
typedef struct grpc_client_channel_factory_vtable public:
grpc_client_channel_factory_vtable; virtual ~ClientChannelFactory() = default;
typedef enum { // Creates a subchannel with the specified args.
GRPC_CLIENT_CHANNEL_TYPE_REGULAR, /** for the user-level regular calls */ virtual Subchannel* CreateSubchannel(const grpc_channel_args* args)
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, /** for communication with a load GRPC_ABSTRACT;
balancing service */
} grpc_client_channel_type;
/** Constructor for new configured channels. // Creates a channel for the specified target with the specified args.
Creating decorators around this type is encouraged to adapt behavior. */ virtual grpc_channel* CreateChannel(
struct grpc_client_channel_factory { const char* target, const grpc_channel_args* args) GRPC_ABSTRACT;
const grpc_client_channel_factory_vtable* vtable;
};
struct grpc_client_channel_factory_vtable {
void (*ref)(grpc_client_channel_factory* factory);
void (*unref)(grpc_client_channel_factory* factory);
grpc_core::Subchannel* (*create_subchannel)(
grpc_client_channel_factory* factory, const grpc_channel_args* args);
grpc_channel* (*create_client_channel)(grpc_client_channel_factory* factory,
const char* target,
grpc_client_channel_type type,
const grpc_channel_args* args);
};
void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory); // Returns a channel arg containing the specified factory.
void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory); static grpc_arg CreateChannelArg(ClientChannelFactory* factory);
/** Create a new grpc_subchannel */ // Returns the factory from args, or null if not found.
grpc_core::Subchannel* grpc_client_channel_factory_create_subchannel( static ClientChannelFactory* GetFromChannelArgs(
grpc_client_channel_factory* factory, const grpc_channel_args* args); const grpc_channel_args* args);
/** Create a new grpc_channel */ GRPC_ABSTRACT_BASE_CLASS
grpc_channel* grpc_client_channel_factory_create_channel( };
grpc_client_channel_factory* factory, const char* target,
grpc_client_channel_type type, const grpc_channel_args* args);
grpc_arg grpc_client_channel_factory_create_channel_arg( } // namespace grpc_core
grpc_client_channel_factory* factory);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H */ #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H */

@ -22,7 +22,6 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
@ -193,10 +192,9 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
virtual Subchannel* CreateSubchannel(const grpc_channel_args& args) virtual Subchannel* CreateSubchannel(const grpc_channel_args& args)
GRPC_ABSTRACT; GRPC_ABSTRACT;
/// Creates a channel with the specified target, type, and channel args. /// Creates a channel with the specified target and channel args.
virtual grpc_channel* CreateChannel( virtual grpc_channel* CreateChannel(
const char* target, grpc_client_channel_type type, const char* target, const grpc_channel_args& args) GRPC_ABSTRACT;
const grpc_channel_args& args) GRPC_ABSTRACT;
/// Sets the connectivity state and returns a new picker to be used /// Sets the connectivity state and returns a new picker to be used
/// by the client channel. /// by the client channel.

@ -273,7 +273,6 @@ class GrpcLb : public LoadBalancingPolicy {
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target, grpc_channel* CreateChannel(const char* target,
grpc_client_channel_type type,
const grpc_channel_args& args) override; const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
UniquePtr<SubchannelPicker> picker) override; UniquePtr<SubchannelPicker> picker) override;
@ -581,10 +580,9 @@ Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
} }
grpc_channel* GrpcLb::Helper::CreateChannel(const char* target, grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,
grpc_client_channel_type type,
const grpc_channel_args& args) { const grpc_channel_args& args) {
if (parent_->shutting_down_) return nullptr; if (parent_->shutting_down_) return nullptr;
return parent_->channel_control_helper()->CreateChannel(target, type, args); return parent_->channel_control_helper()->CreateChannel(target, args);
} }
void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
@ -1305,8 +1303,8 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
if (lb_channel_ == nullptr) { if (lb_channel_ == nullptr) {
char* uri_str; char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", server_name_); gpr_asprintf(&uri_str, "fake:///%s", server_name_);
lb_channel_ = channel_control_helper()->CreateChannel( lb_channel_ =
uri_str, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, *lb_channel_args); channel_control_helper()->CreateChannel(uri_str, *lb_channel_args);
GPR_ASSERT(lb_channel_ != nullptr); GPR_ASSERT(lb_channel_ != nullptr);
grpc_core::channelz::ChannelNode* channel_node = grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_); grpc_channel_get_channelz_node(lb_channel_);

@ -223,7 +223,6 @@ class XdsLb : public LoadBalancingPolicy {
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target, grpc_channel* CreateChannel(const char* target,
grpc_client_channel_type type,
const grpc_channel_args& args) override; const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
UniquePtr<SubchannelPicker> picker) override; UniquePtr<SubchannelPicker> picker) override;
@ -354,10 +353,9 @@ Subchannel* XdsLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
} }
grpc_channel* XdsLb::Helper::CreateChannel(const char* target, grpc_channel* XdsLb::Helper::CreateChannel(const char* target,
grpc_client_channel_type type,
const grpc_channel_args& args) { const grpc_channel_args& args) {
if (parent_->shutting_down_) return nullptr; if (parent_->shutting_down_) return nullptr;
return parent_->channel_control_helper()->CreateChannel(target, type, args); return parent_->channel_control_helper()->CreateChannel(target, args);
} }
void XdsLb::Helper::UpdateState(grpc_connectivity_state state, void XdsLb::Helper::UpdateState(grpc_connectivity_state state,
@ -1076,8 +1074,8 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
char* uri_str; char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", server_name_); gpr_asprintf(&uri_str, "fake:///%s", server_name_);
gpr_mu_lock(&lb_channel_mu_); gpr_mu_lock(&lb_channel_mu_);
lb_channel_ = channel_control_helper()->CreateChannel( lb_channel_ =
uri_str, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, *lb_channel_args); channel_control_helper()->CreateChannel(uri_str, *lb_channel_args);
gpr_mu_unlock(&lb_channel_mu_); gpr_mu_unlock(&lb_channel_mu_);
GPR_ASSERT(lb_channel_ != nullptr); GPR_ASSERT(lb_channel_ != nullptr);
gpr_free(uri_str); gpr_free(uri_str);

@ -80,10 +80,10 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(args);
} }
grpc_channel* CreateChannel(const char* target, grpc_client_channel_type type, grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override { const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down. if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
return parent_->channel_control_helper()->CreateChannel(target, type, args); return parent_->channel_control_helper()->CreateChannel(target, args);
} }
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, void UpdateState(grpc_connectivity_state state, grpc_error* state_error,

@ -33,50 +33,53 @@
#include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
static void client_channel_factory_ref( namespace grpc_core {
grpc_client_channel_factory* cc_factory) {}
static void client_channel_factory_unref( class Chttp2InsecureClientChannelFactory : public ClientChannelFactory {
grpc_client_channel_factory* cc_factory) {} public:
Subchannel* CreateSubchannel(const grpc_channel_args* args) override {
static grpc_core::Subchannel* client_channel_factory_create_subchannel( grpc_channel_args* new_args =
grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) { grpc_default_authority_add_if_not_present(args);
grpc_channel_args* new_args = grpc_default_authority_add_if_not_present(args); grpc_connector* connector = grpc_chttp2_connector_create();
grpc_connector* connector = grpc_chttp2_connector_create(); Subchannel* s = Subchannel::Create(connector, new_args);
grpc_core::Subchannel* s = grpc_core::Subchannel::Create(connector, new_args); grpc_connector_unref(connector);
grpc_connector_unref(connector); grpc_channel_args_destroy(new_args);
grpc_channel_args_destroy(new_args); return s;
return s; }
}
static grpc_channel* client_channel_factory_create_channel( grpc_channel* CreateChannel(const char* target,
grpc_client_channel_factory* cc_factory, const char* target, const grpc_channel_args* args) override {
grpc_client_channel_type type, const grpc_channel_args* args) { if (target == nullptr) {
if (target == nullptr) { gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
gpr_log(GPR_ERROR, "cannot create channel with NULL target name"); return nullptr;
return nullptr; }
// Add channel arg containing the server URI.
UniquePtr<char> canonical_target =
ResolverRegistry::AddDefaultPrefixIfNeeded(target);
grpc_arg arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVER_URI), canonical_target.get());
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
grpc_channel* channel =
grpc_channel_create(target, new_args, GRPC_CLIENT_CHANNEL, nullptr);
grpc_channel_args_destroy(new_args);
return channel;
} }
// Add channel arg containing the server URI. };
grpc_core::UniquePtr<char> canonical_target =
grpc_core::ResolverRegistry::AddDefaultPrefixIfNeeded(target);
grpc_arg arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVER_URI), canonical_target.get());
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
grpc_channel* channel =
grpc_channel_create(target, new_args, GRPC_CLIENT_CHANNEL, nullptr);
grpc_channel_args_destroy(new_args);
return channel;
}
static const grpc_client_channel_factory_vtable client_channel_factory_vtable = } // namespace grpc_core
{client_channel_factory_ref, client_channel_factory_unref,
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
static grpc_client_channel_factory client_channel_factory = { namespace {
&client_channel_factory_vtable};
grpc_core::Chttp2InsecureClientChannelFactory* g_factory;
gpr_once g_factory_once;
void FactoryInit() {
g_factory = grpc_core::New<grpc_core::Chttp2InsecureClientChannelFactory>();
}
} // namespace
/* Create a client channel: /* Create a client channel:
Asynchronously: - resolve target Asynchronously: - resolve target
@ -91,16 +94,13 @@ grpc_channel* grpc_insecure_channel_create(const char* target,
(target, args, reserved)); (target, args, reserved));
GPR_ASSERT(reserved == nullptr); GPR_ASSERT(reserved == nullptr);
// Add channel arg containing the client channel factory. // Add channel arg containing the client channel factory.
grpc_arg arg = gpr_once_init(&g_factory_once, FactoryInit);
grpc_client_channel_factory_create_channel_arg(&client_channel_factory); grpc_arg arg = grpc_core::ClientChannelFactory::CreateChannelArg(g_factory);
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(args, &arg, 1); grpc_channel_args* new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
// Create channel. // Create channel.
grpc_channel* channel = client_channel_factory_create_channel( grpc_channel* channel = g_factory->CreateChannel(target, new_args);
&client_channel_factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR,
new_args);
// Clean up. // Clean up.
grpc_channel_args_destroy(new_args); grpc_channel_args_destroy(new_args);
return channel != nullptr ? channel return channel != nullptr ? channel
: grpc_lame_client_channel_create( : grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL, target, GRPC_STATUS_INTERNAL,

@ -40,148 +40,148 @@
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
#include "src/core/lib/uri/uri_parser.h" #include "src/core/lib/uri/uri_parser.h"
static void client_channel_factory_ref( namespace grpc_core {
grpc_client_channel_factory* cc_factory) {}
static void client_channel_factory_unref( class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
grpc_client_channel_factory* cc_factory) {} public:
Subchannel* CreateSubchannel(const grpc_channel_args* args) override {
static grpc_channel_args* get_secure_naming_channel_args( grpc_channel_args* new_args = GetSecureNamingChannelArgs(args);
const grpc_channel_args* args) { if (new_args == nullptr) {
grpc_channel_credentials* channel_credentials = gpr_log(GPR_ERROR,
grpc_channel_credentials_find_in_args(args); "Failed to create channel args during subchannel creation.");
if (channel_credentials == nullptr) { return nullptr;
gpr_log(GPR_ERROR,
"Can't create subchannel: channel credentials missing for secure "
"channel.");
return nullptr;
}
// Make sure security connector does not already exist in args.
if (grpc_security_connector_find_in_args(args) != nullptr) {
gpr_log(GPR_ERROR,
"Can't create subchannel: security connector already present in "
"channel args.");
return nullptr;
}
// To which address are we connecting? By default, use the server URI.
const grpc_arg* server_uri_arg =
grpc_channel_args_find(args, GRPC_ARG_SERVER_URI);
const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg);
GPR_ASSERT(server_uri_str != nullptr);
grpc_uri* server_uri =
grpc_uri_parse(server_uri_str, true /* supress errors */);
GPR_ASSERT(server_uri != nullptr);
const grpc_core::TargetAuthorityTable* target_authority_table =
grpc_core::FindTargetAuthorityTableInArgs(args);
grpc_core::UniquePtr<char> authority;
if (target_authority_table != nullptr) {
// Find the authority for the target.
const char* target_uri_str =
grpc_core::Subchannel::GetUriFromSubchannelAddressArg(args);
grpc_uri* target_uri =
grpc_uri_parse(target_uri_str, false /* suppress errors */);
GPR_ASSERT(target_uri != nullptr);
if (target_uri->path[0] != '\0') { // "path" may be empty
const grpc_slice key = grpc_slice_from_static_string(
target_uri->path[0] == '/' ? target_uri->path + 1 : target_uri->path);
const grpc_core::UniquePtr<char>* value =
target_authority_table->Get(key);
if (value != nullptr) authority.reset(gpr_strdup(value->get()));
grpc_slice_unref_internal(key);
} }
grpc_uri_destroy(target_uri); grpc_connector* connector = grpc_chttp2_connector_create();
} Subchannel* s = Subchannel::Create(connector, new_args);
// If the authority hasn't already been set (either because no target grpc_connector_unref(connector);
// authority table was present or because the target was not present grpc_channel_args_destroy(new_args);
// in the table), fall back to using the original server URI. return s;
if (authority == nullptr) {
authority =
grpc_core::ResolverRegistry::GetDefaultAuthority(server_uri_str);
} }
grpc_arg args_to_add[2];
size_t num_args_to_add = 0; grpc_channel* CreateChannel(const char* target,
if (grpc_channel_args_find(args, GRPC_ARG_DEFAULT_AUTHORITY) == nullptr) { const grpc_channel_args* args) override {
// If the channel args don't already contain GRPC_ARG_DEFAULT_AUTHORITY, add if (target == nullptr) {
// the arg, setting it to the value just obtained. gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
args_to_add[num_args_to_add++] = grpc_channel_arg_string_create( return nullptr;
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY), authority.get()); }
// Add channel arg containing the server URI.
UniquePtr<char> canonical_target =
ResolverRegistry::AddDefaultPrefixIfNeeded(target);
grpc_arg arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVER_URI), canonical_target.get());
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
grpc_channel* channel =
grpc_channel_create(target, new_args, GRPC_CLIENT_CHANNEL, nullptr);
grpc_channel_args_destroy(new_args);
return channel;
} }
grpc_channel_args* args_with_authority =
grpc_channel_args_copy_and_add(args, args_to_add, num_args_to_add); private:
grpc_uri_destroy(server_uri); static grpc_channel_args* GetSecureNamingChannelArgs(
// Create the security connector using the credentials and target name. const grpc_channel_args* args) {
grpc_channel_args* new_args_from_connector = nullptr; grpc_channel_credentials* channel_credentials =
grpc_core::RefCountedPtr<grpc_channel_security_connector> grpc_channel_credentials_find_in_args(args);
subchannel_security_connector = if (channel_credentials == nullptr) {
channel_credentials->create_security_connector( gpr_log(GPR_ERROR,
/*call_creds=*/nullptr, authority.get(), args_with_authority, "Can't create subchannel: channel credentials missing for secure "
&new_args_from_connector); "channel.");
if (subchannel_security_connector == nullptr) { return nullptr;
gpr_log(GPR_ERROR, }
"Failed to create secure subchannel for secure name '%s'", // Make sure security connector does not already exist in args.
authority.get()); if (grpc_security_connector_find_in_args(args) != nullptr) {
gpr_log(GPR_ERROR,
"Can't create subchannel: security connector already present in "
"channel args.");
return nullptr;
}
// To which address are we connecting? By default, use the server URI.
const grpc_arg* server_uri_arg =
grpc_channel_args_find(args, GRPC_ARG_SERVER_URI);
const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg);
GPR_ASSERT(server_uri_str != nullptr);
grpc_uri* server_uri =
grpc_uri_parse(server_uri_str, true /* suppress errors */);
GPR_ASSERT(server_uri != nullptr);
const TargetAuthorityTable* target_authority_table =
FindTargetAuthorityTableInArgs(args);
UniquePtr<char> authority;
if (target_authority_table != nullptr) {
// Find the authority for the target.
const char* target_uri_str =
Subchannel::GetUriFromSubchannelAddressArg(args);
grpc_uri* target_uri =
grpc_uri_parse(target_uri_str, false /* suppress errors */);
GPR_ASSERT(target_uri != nullptr);
if (target_uri->path[0] != '\0') { // "path" may be empty
const grpc_slice key = grpc_slice_from_static_string(
target_uri->path[0] == '/' ? target_uri->path + 1
: target_uri->path);
const UniquePtr<char>* value = target_authority_table->Get(key);
if (value != nullptr) authority.reset(gpr_strdup(value->get()));
grpc_slice_unref_internal(key);
}
grpc_uri_destroy(target_uri);
}
// If the authority hasn't already been set (either because no target
// authority table was present or because the target was not present
// in the table), fall back to using the original server URI.
if (authority == nullptr) {
authority = ResolverRegistry::GetDefaultAuthority(server_uri_str);
}
grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
if (grpc_channel_args_find(args, GRPC_ARG_DEFAULT_AUTHORITY) == nullptr) {
// If the channel args don't already contain GRPC_ARG_DEFAULT_AUTHORITY,
// add the arg, setting it to the value just obtained.
args_to_add[num_args_to_add++] = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY), authority.get());
}
grpc_channel_args* args_with_authority =
grpc_channel_args_copy_and_add(args, args_to_add, num_args_to_add);
grpc_uri_destroy(server_uri);
// Create the security connector using the credentials and target name.
grpc_channel_args* new_args_from_connector = nullptr;
RefCountedPtr<grpc_channel_security_connector>
subchannel_security_connector =
channel_credentials->create_security_connector(
/*call_creds=*/nullptr, authority.get(), args_with_authority,
&new_args_from_connector);
if (subchannel_security_connector == nullptr) {
gpr_log(GPR_ERROR,
"Failed to create secure subchannel for secure name '%s'",
authority.get());
grpc_channel_args_destroy(args_with_authority);
return nullptr;
}
grpc_arg new_security_connector_arg =
grpc_security_connector_to_arg(subchannel_security_connector.get());
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != nullptr ? new_args_from_connector
: args_with_authority,
&new_security_connector_arg, 1);
subchannel_security_connector.reset(DEBUG_LOCATION, "lb_channel_create");
if (new_args_from_connector != nullptr) {
grpc_channel_args_destroy(new_args_from_connector);
}
grpc_channel_args_destroy(args_with_authority); grpc_channel_args_destroy(args_with_authority);
return nullptr; return new_args;
} }
grpc_arg new_security_connector_arg = };
grpc_security_connector_to_arg(subchannel_security_connector.get());
grpc_channel_args* new_args = grpc_channel_args_copy_and_add( } // namespace grpc_core
new_args_from_connector != nullptr ? new_args_from_connector
: args_with_authority,
&new_security_connector_arg, 1);
subchannel_security_connector.reset(DEBUG_LOCATION, "lb_channel_create"); namespace {
if (new_args_from_connector != nullptr) {
grpc_channel_args_destroy(new_args_from_connector);
}
grpc_channel_args_destroy(args_with_authority);
return new_args;
}
static grpc_core::Subchannel* client_channel_factory_create_subchannel( grpc_core::Chttp2SecureClientChannelFactory* g_factory;
grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) { gpr_once g_factory_once;
grpc_channel_args* new_args = get_secure_naming_channel_args(args);
if (new_args == nullptr) {
gpr_log(GPR_ERROR,
"Failed to create channel args during subchannel creation.");
return nullptr;
}
grpc_connector* connector = grpc_chttp2_connector_create();
grpc_core::Subchannel* s = grpc_core::Subchannel::Create(connector, new_args);
grpc_connector_unref(connector);
grpc_channel_args_destroy(new_args);
return s;
}
static grpc_channel* client_channel_factory_create_channel( void FactoryInit() {
grpc_client_channel_factory* cc_factory, const char* target, g_factory = grpc_core::New<grpc_core::Chttp2SecureClientChannelFactory>();
grpc_client_channel_type type, const grpc_channel_args* args) {
if (target == nullptr) {
gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
return nullptr;
}
// Add channel arg containing the server URI.
grpc_core::UniquePtr<char> canonical_target =
grpc_core::ResolverRegistry::AddDefaultPrefixIfNeeded(target);
grpc_arg arg = grpc_channel_arg_string_create((char*)GRPC_ARG_SERVER_URI,
canonical_target.get());
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
grpc_channel* channel =
grpc_channel_create(target, new_args, GRPC_CLIENT_CHANNEL, nullptr);
grpc_channel_args_destroy(new_args);
return channel;
} }
static const grpc_client_channel_factory_vtable client_channel_factory_vtable = } // namespace
{client_channel_factory_ref, client_channel_factory_unref,
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
static grpc_client_channel_factory client_channel_factory = {
&client_channel_factory_vtable};
// Create a secure client channel: // Create a secure client channel:
// Asynchronously: - resolve target // Asynchronously: - resolve target
@ -201,16 +201,15 @@ grpc_channel* grpc_secure_channel_create(grpc_channel_credentials* creds,
if (creds != nullptr) { if (creds != nullptr) {
// Add channel args containing the client channel factory and channel // Add channel args containing the client channel factory and channel
// credentials. // credentials.
gpr_once_init(&g_factory_once, FactoryInit);
grpc_arg args_to_add[] = { grpc_arg args_to_add[] = {
grpc_client_channel_factory_create_channel_arg(&client_channel_factory), grpc_core::ClientChannelFactory::CreateChannelArg(g_factory),
grpc_channel_credentials_to_arg(creds)}; grpc_channel_credentials_to_arg(creds)};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add( grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); args, args_to_add, GPR_ARRAY_SIZE(args_to_add));
new_args = creds->update_arguments(new_args); new_args = creds->update_arguments(new_args);
// Create channel. // Create channel.
channel = client_channel_factory_create_channel( channel = g_factory->CreateChannel(target, new_args);
&client_channel_factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR,
new_args);
// Clean up. // Clean up.
grpc_channel_args_destroy(new_args); grpc_channel_args_destroy(new_args);
} }

@ -147,10 +147,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
} }
grpc_channel* CreateChannel(const char* target, grpc_channel* CreateChannel(const char* target,
grpc_client_channel_type type,
const grpc_channel_args& args) override { const grpc_channel_args& args) override {
return parent_->channel_control_helper()->CreateChannel(target, type, return parent_->channel_control_helper()->CreateChannel(target, args);
args);
} }
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, void UpdateState(grpc_connectivity_state state, grpc_error* state_error,

@ -318,30 +318,18 @@ static void FilterDestroy(void* arg, grpc_error* error) { gpr_free(arg); }
static void DoNothing(void* arg, grpc_error* error) {} static void DoNothing(void* arg, grpc_error* error) {}
class FakeClientChannelFactory : public grpc_client_channel_factory { class FakeClientChannelFactory : public grpc_core::ClientChannelFactory {
public: public:
FakeClientChannelFactory() { vtable = &vtable_; } grpc_core::Subchannel* CreateSubchannel(
const grpc_channel_args* args) override {
private:
static void NoRef(grpc_client_channel_factory* factory) {}
static void NoUnref(grpc_client_channel_factory* factory) {}
static grpc_core::Subchannel* CreateSubchannel(
grpc_client_channel_factory* factory, const grpc_channel_args* args) {
return nullptr; return nullptr;
} }
static grpc_channel* CreateClientChannel(grpc_client_channel_factory* factory, grpc_channel* CreateChannel(const char* target,
const char* target, const grpc_channel_args* args) override {
grpc_client_channel_type type,
const grpc_channel_args* args) {
return nullptr; return nullptr;
} }
static const grpc_client_channel_factory_vtable vtable_;
}; };
const grpc_client_channel_factory_vtable FakeClientChannelFactory::vtable_ = {
NoRef, NoUnref, CreateSubchannel, CreateClientChannel};
static grpc_arg StringArg(const char* key, const char* value) { static grpc_arg StringArg(const char* key, const char* value) {
grpc_arg a; grpc_arg a;
a.type = GRPC_ARG_STRING; a.type = GRPC_ARG_STRING;
@ -506,13 +494,13 @@ static void BM_IsolatedFilter(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
Fixture fixture; Fixture fixture;
std::ostringstream label; std::ostringstream label;
std::vector<grpc_arg> args;
FakeClientChannelFactory fake_client_channel_factory; FakeClientChannelFactory fake_client_channel_factory;
args.push_back(grpc_client_channel_factory_create_channel_arg(
&fake_client_channel_factory));
args.push_back(StringArg(GRPC_ARG_SERVER_URI, "localhost"));
std::vector<grpc_arg> args = {
grpc_core::ClientChannelFactory::CreateChannelArg(
&fake_client_channel_factory),
StringArg(GRPC_ARG_SERVER_URI, "localhost"),
};
grpc_channel_args channel_args = {args.size(), &args[0]}; grpc_channel_args channel_args = {args.size(), &args[0]};
std::vector<const grpc_channel_filter*> filters; std::vector<const grpc_channel_filter*> filters;

Loading…
Cancel
Save