Merge pull request #20902 from markdroth/c++_connector

Convert grpc_connector to C++.
pull/21207/head
Mark D. Roth 5 years ago committed by GitHub
commit b073f5416d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 2
      BUILD.gn
  3. 7
      CMakeLists.txt
  4. 7
      Makefile
  5. 1
      build.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 1
      gRPC-Core.podspec
  9. 1
      grpc.gemspec
  10. 6
      grpc.gyp
  11. 1
      package.xml
  12. 41
      src/core/ext/filters/client_channel/connector.cc
  13. 85
      src/core/ext/filters/client_channel/connector.h
  14. 27
      src/core/ext/filters/client_channel/subchannel.cc
  15. 8
      src/core/ext/filters/client_channel/subchannel.h
  16. 308
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  17. 32
      src/core/ext/transport/chttp2/client/chttp2_connector.h
  18. 5
      src/core/ext/transport/chttp2/client/insecure/channel_create.cc
  19. 5
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
  20. 1
      src/python/grpcio/grpc_core_dependencies.py
  21. 1
      tools/doxygen/Doxyfile.c++.internal
  22. 1
      tools/doxygen/Doxyfile.core.internal

@ -1021,7 +1021,6 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/client_channel_channelz.cc",
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
"src/core/ext/filters/client_channel/connector.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/health/health_check_client.cc",
"src/core/ext/filters/client_channel/http_connect_handshaker.cc",

@ -218,7 +218,6 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
"src/core/ext/filters/client_channel/connector.cc",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",
@ -1203,7 +1202,6 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
"src/core/ext/filters/client_channel/connector.cc",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",

@ -1368,7 +1368,6 @@ add_library(grpc
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -1774,7 +1773,6 @@ add_library(grpc_cronet
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -2172,7 +2170,6 @@ add_library(grpc_test_util
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -2516,7 +2513,6 @@ add_library(grpc_test_util_unsecure
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -2871,7 +2867,6 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -3310,7 +3305,6 @@ add_library(grpc++
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -4576,7 +4570,6 @@ add_library(grpc++_unsecure
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc

@ -3853,7 +3853,6 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@ -4251,7 +4250,6 @@ LIBGRPC_CRONET_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@ -4640,7 +4638,6 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@ -4970,7 +4967,6 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@ -5298,7 +5294,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@ -5702,7 +5697,6 @@ LIBGRPC++_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@ -6934,7 +6928,6 @@ LIBGRPC++_UNSECURE_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \

@ -999,7 +999,6 @@ filegroups:
- src/core/ext/filters/client_channel/client_channel_channelz.cc
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
- src/core/ext/filters/client_channel/connector.cc
- src/core/ext/filters/client_channel/global_subchannel_pool.cc
- src/core/ext/filters/client_channel/health/health_check_client.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc

@ -45,7 +45,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \

@ -325,7 +325,6 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\client_channel_channelz.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_plugin.cc " +
"src\\core\\ext\\filters\\client_channel\\connector.cc " +
"src\\core\\ext\\filters\\client_channel\\global_subchannel_pool.cc " +
"src\\core\\ext\\filters\\client_channel\\health\\health_check_client.cc " +
"src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " +

@ -201,7 +201,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',

@ -113,7 +113,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.cc )
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.h )
s.files += %w( src/core/ext/filters/client_channel/client_channel_plugin.cc )
s.files += %w( src/core/ext/filters/client_channel/connector.cc )
s.files += %w( src/core/ext/filters/client_channel/connector.h )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.cc )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.h )

@ -494,7 +494,6 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
@ -807,7 +806,6 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
@ -1071,7 +1069,6 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
@ -1346,7 +1343,6 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
@ -1558,7 +1554,6 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
@ -1913,7 +1908,6 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',

@ -96,7 +96,6 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/connector.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/connector.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/global_subchannel_pool.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/global_subchannel_pool.h" role="src" />

@ -1,41 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/connector.h"
grpc_connector* grpc_connector_ref(grpc_connector* connector) {
connector->vtable->ref(connector);
return connector;
}
void grpc_connector_unref(grpc_connector* connector) {
connector->vtable->unref(connector);
}
void grpc_connector_connect(grpc_connector* connector,
const grpc_connect_in_args* in_args,
grpc_connect_out_args* out_args,
grpc_closure* notify) {
connector->vtable->connect(connector, in_args, out_args, notify);
}
void grpc_connector_shutdown(grpc_connector* connector, grpc_error* why) {
connector->vtable->shutdown(connector, why);
}

@ -23,62 +23,57 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/transport/transport.h"
typedef struct grpc_connector grpc_connector;
typedef struct grpc_connector_vtable grpc_connector_vtable;
namespace grpc_core {
struct grpc_connector {
const grpc_connector_vtable* vtable;
};
// Interface for connection-establishment functionality.
// Each transport that supports client channels (e.g., not inproc) must
// supply an implementation of this.
class SubchannelConnector : public InternallyRefCounted<SubchannelConnector> {
public:
struct Args {
// Set of pollsets interested in this connection.
grpc_pollset_set* interested_parties;
// Deadline for connection.
grpc_millis deadline;
// Channel args to be passed to handshakers and transport.
const grpc_channel_args* channel_args;
};
typedef struct {
/** set of pollsets interested in this connection */
grpc_pollset_set* interested_parties;
/** deadline for connection */
grpc_millis deadline;
/** channel arguments (to be passed to transport) */
const grpc_channel_args* channel_args;
} grpc_connect_in_args;
struct Result {
// The connected transport.
grpc_transport* transport = nullptr;
// Channel args to be passed to filters.
const grpc_channel_args* channel_args = nullptr;
// Channelz socket node of the connected transport, if any.
RefCountedPtr<channelz::SocketNode> socket_node;
typedef struct {
/** the connected transport */
grpc_transport* transport;
void Reset() {
transport = nullptr;
channel_args = nullptr;
socket_node.reset();
}
};
/** channel arguments (to be passed to the filters) */
grpc_channel_args* channel_args;
// Attempts to connect.
// When complete, populates *result and invokes notify.
// Only one connection attempt may be in progress at any one time.
virtual void Connect(const Args& args, Result* result,
grpc_closure* notify) = 0;
/** channelz socket node of the connected transport. nullptr if not available
*/
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket;
// Cancels any in-flight connection attempt and shuts down the
// connector.
virtual void Shutdown(grpc_error* error) = 0;
void reset() {
transport = nullptr;
channel_args = nullptr;
socket = nullptr;
void Orphan() override {
Shutdown(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Subchannel disconnected"));
Unref();
}
} grpc_connect_out_args;
struct grpc_connector_vtable {
void (*ref)(grpc_connector* connector);
void (*unref)(grpc_connector* connector);
/** Implementation of grpc_connector_shutdown */
void (*shutdown)(grpc_connector* connector, grpc_error* why);
/** Implementation of grpc_connector_connect */
void (*connect)(grpc_connector* connector,
const grpc_connect_in_args* in_args,
grpc_connect_out_args* out_args, grpc_closure* notify);
};
grpc_connector* grpc_connector_ref(grpc_connector* connector);
void grpc_connector_unref(grpc_connector* connector);
/** Connect using the connector: max one outstanding call at a time */
void grpc_connector_connect(grpc_connector* connector,
const grpc_connect_in_args* in_args,
grpc_connect_out_args* out_args,
grpc_closure* notify);
/** Cancel any pending connection */
void grpc_connector_shutdown(grpc_connector* connector, grpc_error* why);
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTOR_H */

@ -613,14 +613,14 @@ BackOff::Options ParseArgsForBackoffValues(
} // namespace
Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector,
Subchannel::Subchannel(SubchannelKey* key,
OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args)
: key_(key),
connector_(connector),
connector_(std::move(connector)),
backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) {
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
gpr_atm_no_barrier_store(&ref_pair_, 1 << INTERNAL_REF_BITS);
grpc_connector_ref(connector_);
pollset_set_ = grpc_pollset_set_create();
grpc_resolved_address* addr =
static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr)));
@ -668,12 +668,12 @@ Subchannel::~Subchannel() {
channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
}
grpc_channel_args_destroy(args_);
grpc_connector_unref(connector_);
connector_.reset();
grpc_pollset_set_destroy(pollset_set_);
delete key_;
}
Subchannel* Subchannel::Create(grpc_connector* connector,
Subchannel* Subchannel::Create(OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args) {
SubchannelKey* key = new SubchannelKey(args);
SubchannelPoolInterface* subchannel_pool =
@ -684,7 +684,7 @@ Subchannel* Subchannel::Create(grpc_connector* connector,
delete key;
return c;
}
c = new Subchannel(key, connector, args);
c = new Subchannel(key, std::move(connector), args);
// Try to register the subchannel before setting the subchannel pool.
// Otherwise, in case of a registration race, unreffing c in
// RegisterSubchannel() will cause c to be tried to be unregistered, while
@ -975,7 +975,7 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
}
void Subchannel::ContinueConnectingLocked() {
grpc_connect_in_args args;
SubchannelConnector::Args args;
args.interested_parties = pollset_set_;
const grpc_millis min_deadline =
min_connect_timeout_ms_ + ExecCtx::Get()->Now();
@ -983,13 +983,13 @@ void Subchannel::ContinueConnectingLocked() {
args.deadline = std::max(next_attempt_deadline_, min_deadline);
args.channel_args = args_;
SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING);
grpc_connector_connect(connector_, &args, &connecting_result_,
&on_connecting_finished_);
connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
}
void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) {
auto* c = static_cast<Subchannel*>(arg);
grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args;
const grpc_channel_args* delete_channel_args =
c->connecting_result_.channel_args;
GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished");
{
MutexLock lock(&c->mu_);
@ -1042,8 +1042,8 @@ bool Subchannel::PublishTransportLocked() {
return false;
}
RefCountedPtr<channelz::SocketNode> socket =
std::move(connecting_result_.socket);
connecting_result_.reset();
std::move(connecting_result_.socket_node);
connecting_result_.Reset();
if (disconnected_) {
grpc_channel_stack_destroy(stk);
gpr_free(stk);
@ -1075,8 +1075,7 @@ void Subchannel::Disconnect() {
MutexLock lock(&mu_);
GPR_ASSERT(!disconnected_);
disconnected_ = true;
grpc_connector_shutdown(connector_, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected"));
connector_.reset();
connected_subchannel_.reset();
health_watcher_map_.ShutdownLocked();
}

@ -197,12 +197,12 @@ class Subchannel {
};
// The ctor and dtor are not intended to use directly.
Subchannel(SubchannelKey* key, grpc_connector* connector,
Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args);
~Subchannel();
// Creates a subchannel given \a connector and \a args.
static Subchannel* Create(grpc_connector* connector,
static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args);
// Strong and weak refcounting.
@ -365,9 +365,9 @@ class Subchannel {
gpr_atm ref_pair_;
// Connection states.
grpc_connector* connector_ = nullptr;
OrphanablePtr<SubchannelConnector> connector_;
// Set during connection.
grpc_connect_out_args connecting_result_;
SubchannelConnector::Result connecting_result_;
grpc_closure on_connecting_finished_;
// Active connection, or null.
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;

@ -38,202 +38,162 @@
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/slice/slice_internal.h"
typedef struct {
grpc_connector base;
namespace grpc_core {
gpr_mu mu;
gpr_refcount refs;
bool shutdown;
bool connecting;
grpc_closure* notify;
grpc_connect_in_args args;
grpc_connect_out_args* result;
grpc_endpoint* endpoint; // Non-NULL until handshaking starts.
grpc_closure connected;
grpc_core::RefCountedPtr<grpc_core::HandshakeManager> handshake_mgr;
} chttp2_connector;
Chttp2Connector::Chttp2Connector() {
GRPC_CLOSURE_INIT(&connected_, Connected, this, grpc_schedule_on_exec_ctx);
}
static void chttp2_connector_ref(grpc_connector* con) {
chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
gpr_ref(&c->refs);
Chttp2Connector::~Chttp2Connector() {
if (endpoint_ != nullptr) grpc_endpoint_destroy(endpoint_);
}
static void chttp2_connector_unref(grpc_connector* con) {
chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
if (gpr_unref(&c->refs)) {
gpr_mu_destroy(&c->mu);
// If handshaking is not yet in progress, destroy the endpoint.
// Otherwise, the handshaker will do this for us.
if (c->endpoint != nullptr) grpc_endpoint_destroy(c->endpoint);
gpr_free(c);
void Chttp2Connector::Connect(const Args& args, Result* result,
grpc_closure* notify) {
grpc_resolved_address addr;
Subchannel::GetAddressFromSubchannelAddressArg(args.channel_args, &addr);
grpc_endpoint** ep;
{
MutexLock lock(&mu_);
GPR_ASSERT(notify_ == nullptr);
args_ = args;
result_ = result;
notify_ = notify;
GPR_ASSERT(!connecting_);
connecting_ = true;
GPR_ASSERT(endpoint_ == nullptr);
ep = &endpoint_;
}
// In some implementations, the closure can be flushed before
// grpc_tcp_client_connect() returns, and since the closure requires access
// to mu_, this can result in a deadlock (see
// https://github.com/grpc/grpc/issues/16427 for details).
// grpc_tcp_client_connect() will fill endpoint_ with proper contents, and we
// make sure that we still exist at that point by taking a ref.
Ref().release(); // Ref held by callback.
grpc_tcp_client_connect(&connected_, ep, args.interested_parties,
args.channel_args, &addr, args.deadline);
}
static void chttp2_connector_shutdown(grpc_connector* con, grpc_error* why) {
chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
gpr_mu_lock(&c->mu);
c->shutdown = true;
if (c->handshake_mgr != nullptr) {
c->handshake_mgr->Shutdown(GRPC_ERROR_REF(why));
void Chttp2Connector::Shutdown(grpc_error* error) {
MutexLock lock(&mu_);
shutdown_ = true;
if (handshake_mgr_ != nullptr) {
handshake_mgr_->Shutdown(GRPC_ERROR_REF(error));
}
// If handshaking is not yet in progress, shutdown the endpoint.
// Otherwise, the handshaker will do this for us.
if (!c->connecting && c->endpoint != nullptr) {
grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(why));
if (!connecting_ && endpoint_ != nullptr) {
grpc_endpoint_shutdown(endpoint_, GRPC_ERROR_REF(error));
}
gpr_mu_unlock(&c->mu);
GRPC_ERROR_UNREF(why);
GRPC_ERROR_UNREF(error);
}
static void on_handshake_done(void* arg, grpc_error* error) {
auto* args = static_cast<grpc_core::HandshakerArgs*>(arg);
chttp2_connector* c = static_cast<chttp2_connector*>(args->user_data);
gpr_mu_lock(&c->mu);
if (error != GRPC_ERROR_NONE || c->shutdown) {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error));
grpc_endpoint_destroy(args->endpoint);
grpc_channel_args_destroy(args->args);
grpc_slice_buffer_destroy_internal(args->read_buffer);
gpr_free(args->read_buffer);
void Chttp2Connector::Connected(void* arg, grpc_error* error) {
Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
bool unref = false;
{
MutexLock lock(&self->mu_);
GPR_ASSERT(self->connecting_);
self->connecting_ = false;
if (error != GRPC_ERROR_NONE || self->shutdown_) {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
} else {
error = GRPC_ERROR_REF(error);
}
if (self->endpoint_ != nullptr) {
grpc_endpoint_shutdown(self->endpoint_, GRPC_ERROR_REF(error));
}
self->result_->Reset();
grpc_closure* notify = self->notify_;
self->notify_ = nullptr;
ExecCtx::Run(DEBUG_LOCATION, notify, error);
unref = true;
} else {
error = GRPC_ERROR_REF(error);
GPR_ASSERT(self->endpoint_ != nullptr);
self->StartHandshakeLocked();
}
c->result->reset();
} else {
grpc_endpoint_delete_from_pollset_set(args->endpoint,
c->args.interested_parties);
c->result->transport =
grpc_create_chttp2_transport(args->args, args->endpoint, true);
c->result->socket =
grpc_chttp2_transport_get_socket_node(c->result->transport);
GPR_ASSERT(c->result->transport);
// TODO(roth): We ideally want to wait until we receive HTTP/2
// settings from the server before we consider the connection
// established. If that doesn't happen before the connection
// timeout expires, then we should consider the connection attempt a
// failure and feed that information back into the backoff code.
// We could pass a notify_on_receive_settings callback to
// grpc_chttp2_transport_start_reading() to let us know when
// settings are received, but we would need to figure out how to use
// that information here.
//
// Unfortunately, we don't currently have a way to split apart the two
// effects of scheduling c->notify: we start sending RPCs immediately
// (which we want to do) and we consider the connection attempt successful
// (which we don't want to do until we get the notify_on_receive_settings
// callback from the transport). If we could split those things
// apart, then we could start sending RPCs but then wait for our
// timeout before deciding if the connection attempt is successful.
// If the attempt is not successful, then we would tear down the
// transport and feed the failure back into the backoff code.
//
// In addition, even if we did that, we would probably not want to do
// so until after transparent retries is implemented. Otherwise, any
// RPC that we attempt to send on the connection before the timeout
// would fail instead of being retried on a subsequent attempt.
grpc_chttp2_transport_start_reading(c->result->transport, args->read_buffer,
nullptr);
c->result->channel_args = args->args;
}
grpc_closure* notify = c->notify;
c->notify = nullptr;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error);
c->handshake_mgr.reset();
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(reinterpret_cast<grpc_connector*>(c));
if (unref) self->Unref();
}
static void start_handshake_locked(chttp2_connector* c) {
c->handshake_mgr = grpc_core::MakeRefCounted<grpc_core::HandshakeManager>();
grpc_core::HandshakerRegistry::AddHandshakers(
grpc_core::HANDSHAKER_CLIENT, c->args.channel_args,
c->args.interested_parties, c->handshake_mgr.get());
grpc_endpoint_add_to_pollset_set(c->endpoint, c->args.interested_parties);
c->handshake_mgr->DoHandshake(c->endpoint, c->args.channel_args,
c->args.deadline, nullptr /* acceptor */,
on_handshake_done, c);
c->endpoint = nullptr; // Endpoint handed off to handshake manager.
void Chttp2Connector::StartHandshakeLocked() {
handshake_mgr_ = MakeRefCounted<HandshakeManager>();
HandshakerRegistry::AddHandshakers(HANDSHAKER_CLIENT, args_.channel_args,
args_.interested_parties,
handshake_mgr_.get());
grpc_endpoint_add_to_pollset_set(endpoint_, args_.interested_parties);
handshake_mgr_->DoHandshake(endpoint_, args_.channel_args, args_.deadline,
nullptr /* acceptor */, OnHandshakeDone, this);
endpoint_ = nullptr; // Endpoint handed off to handshake manager.
}
static void connected(void* arg, grpc_error* error) {
chttp2_connector* c = static_cast<chttp2_connector*>(arg);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->connecting);
c->connecting = false;
if (error != GRPC_ERROR_NONE || c->shutdown) {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error* error) {
auto* args = static_cast<HandshakerArgs*>(arg);
Chttp2Connector* self = static_cast<Chttp2Connector*>(args->user_data);
{
MutexLock lock(&self->mu_);
if (error != GRPC_ERROR_NONE || self->shutdown_) {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error));
grpc_endpoint_destroy(args->endpoint);
grpc_channel_args_destroy(args->args);
grpc_slice_buffer_destroy_internal(args->read_buffer);
gpr_free(args->read_buffer);
} else {
error = GRPC_ERROR_REF(error);
}
self->result_->Reset();
} else {
error = GRPC_ERROR_REF(error);
grpc_endpoint_delete_from_pollset_set(args->endpoint,
self->args_.interested_parties);
self->result_->transport =
grpc_create_chttp2_transport(args->args, args->endpoint, true);
self->result_->socket_node =
grpc_chttp2_transport_get_socket_node(self->result_->transport);
GPR_ASSERT(self->result_->transport != nullptr);
// TODO(roth): We ideally want to wait until we receive HTTP/2
// settings from the server before we consider the connection
// established. If that doesn't happen before the connection
// timeout expires, then we should consider the connection attempt a
// failure and feed that information back into the backoff code.
// We could pass a notify_on_receive_settings callback to
// grpc_chttp2_transport_start_reading() to let us know when
// settings are received, but we would need to figure out how to use
// that information here.
//
// Unfortunately, we don't currently have a way to split apart the two
// effects of scheduling c->notify: we start sending RPCs immediately
// (which we want to do) and we consider the connection attempt successful
// (which we don't want to do until we get the notify_on_receive_settings
// callback from the transport). If we could split those things
// apart, then we could start sending RPCs but then wait for our
// timeout before deciding if the connection attempt is successful.
// If the attempt is not successful, then we would tear down the
// transport and feed the failure back into the backoff code.
//
// In addition, even if we did that, we would probably not want to do
// so until after transparent retries is implemented. Otherwise, any
// RPC that we attempt to send on the connection before the timeout
// would fail instead of being retried on a subsequent attempt.
grpc_chttp2_transport_start_reading(self->result_->transport,
args->read_buffer, nullptr);
self->result_->channel_args = args->args;
}
c->result->reset();
grpc_closure* notify = c->notify;
c->notify = nullptr;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error);
if (c->endpoint != nullptr) {
grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(error));
}
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(static_cast<grpc_connector*>(arg));
} else {
GPR_ASSERT(c->endpoint != nullptr);
start_handshake_locked(c);
gpr_mu_unlock(&c->mu);
grpc_closure* notify = self->notify_;
self->notify_ = nullptr;
ExecCtx::Run(DEBUG_LOCATION, notify, error);
self->handshake_mgr_.reset();
}
self->Unref();
}
static void chttp2_connector_connect(grpc_connector* con,
const grpc_connect_in_args* args,
grpc_connect_out_args* result,
grpc_closure* notify) {
chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
grpc_resolved_address addr;
grpc_core::Subchannel::GetAddressFromSubchannelAddressArg(args->channel_args,
&addr);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == nullptr);
c->notify = notify;
c->args = *args;
c->result = result;
GPR_ASSERT(c->endpoint == nullptr);
chttp2_connector_ref(con); // Ref taken for callback.
GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx);
GPR_ASSERT(!c->connecting);
c->connecting = true;
grpc_closure* closure = &c->connected;
grpc_endpoint** ep = &c->endpoint;
gpr_mu_unlock(&c->mu);
// In some implementations, the closure can be flushed before
// grpc_tcp_client_connect and since the closure requires access to c->mu,
// this can result in a deadlock. Refer
// https://github.com/grpc/grpc/issues/16427
// grpc_tcp_client_connect would fill c->endpoint with proper contents and we
// make sure that we would still exist at that point by taking a ref.
grpc_tcp_client_connect(closure, ep, args->interested_parties,
args->channel_args, &addr, args->deadline);
}
static const grpc_connector_vtable chttp2_connector_vtable = {
chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown,
chttp2_connector_connect};
grpc_connector* grpc_chttp2_connector_create() {
chttp2_connector* c = static_cast<chttp2_connector*>(gpr_zalloc(sizeof(*c)));
c->base.vtable = &chttp2_connector_vtable;
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
return &c->base;
}
} // namespace grpc_core

@ -22,7 +22,37 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/handshaker_registry.h"
grpc_connector* grpc_chttp2_connector_create();
namespace grpc_core {
class Chttp2Connector : public SubchannelConnector {
public:
Chttp2Connector();
~Chttp2Connector();
void Connect(const Args& args, Result* result, grpc_closure* notify) override;
void Shutdown(grpc_error* error) override;
private:
static void Connected(void* arg, grpc_error* error);
void StartHandshakeLocked();
static void OnHandshakeDone(void* arg, grpc_error* error);
Mutex mu_;
Args args_;
Result* result_ = nullptr;
grpc_closure* notify_ = nullptr;
bool shutdown_ = false;
bool connecting_ = false;
// Holds the endpoint when first created before being handed off to
// the handshake manager.
grpc_endpoint* endpoint_ = nullptr;
grpc_closure connected_;
RefCountedPtr<HandshakeManager> handshake_mgr_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */

@ -40,9 +40,8 @@ class Chttp2InsecureClientChannelFactory : public ClientChannelFactory {
Subchannel* CreateSubchannel(const grpc_channel_args* args) override {
grpc_channel_args* new_args =
grpc_default_authority_add_if_not_present(args);
grpc_connector* connector = grpc_chttp2_connector_create();
Subchannel* s = Subchannel::Create(connector, new_args);
grpc_connector_unref(connector);
Subchannel* s =
Subchannel::Create(MakeOrphanable<Chttp2Connector>(), new_args);
grpc_channel_args_destroy(new_args);
return s;
}

@ -51,9 +51,8 @@ class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
"Failed to create channel args during subchannel creation.");
return nullptr;
}
grpc_connector* connector = grpc_chttp2_connector_create();
Subchannel* s = Subchannel::Create(connector, new_args);
grpc_connector_unref(connector);
Subchannel* s =
Subchannel::Create(MakeOrphanable<Chttp2Connector>(), new_args);
grpc_channel_args_destroy(new_args);
return s;
}

@ -24,7 +24,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',

@ -1075,7 +1075,6 @@ src/core/ext/filters/client_channel/client_channel_channelz.h \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_factory.h \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/connector.h \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.h \

@ -888,7 +888,6 @@ src/core/ext/filters/client_channel/client_channel_channelz.h \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_factory.h \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/connector.h \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.h \

Loading…
Cancel
Save