From 0e487a7405cbe35bf6fc0105fadf27146bc35672 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 14 Nov 2019 09:45:17 -0800 Subject: [PATCH] Convert grpc_connector to C++. --- BUILD | 1 - BUILD.gn | 2 - CMakeLists.txt | 7 - Makefile | 7 - build.yaml | 1 - config.m4 | 1 - config.w32 | 1 - gRPC-Core.podspec | 1 - grpc.gemspec | 1 - grpc.gyp | 6 - package.xml | 1 - .../ext/filters/client_channel/connector.cc | 41 --- .../ext/filters/client_channel/connector.h | 85 +++-- .../ext/filters/client_channel/subchannel.cc | 27 +- .../ext/filters/client_channel/subchannel.h | 8 +- .../chttp2/client/chttp2_connector.cc | 308 ++++++++---------- .../chttp2/client/chttp2_connector.h | 32 +- .../chttp2/client/insecure/channel_create.cc | 5 +- .../client/secure/secure_channel_create.cc | 5 +- src/python/grpcio/grpc_core_dependencies.py | 1 - tools/doxygen/Doxyfile.c++.internal | 1 - tools/doxygen/Doxyfile.core.internal | 1 - 22 files changed, 226 insertions(+), 317 deletions(-) delete mode 100644 src/core/ext/filters/client_channel/connector.cc diff --git a/BUILD b/BUILD index 7cc070e4ba0..281f56cbcb4 100644 --- a/BUILD +++ b/BUILD @@ -1020,7 +1020,6 @@ grpc_cc_library( "src/core/ext/filters/client_channel/client_channel_channelz.cc", "src/core/ext/filters/client_channel/client_channel_factory.cc", "src/core/ext/filters/client_channel/client_channel_plugin.cc", - "src/core/ext/filters/client_channel/connector.cc", "src/core/ext/filters/client_channel/global_subchannel_pool.cc", "src/core/ext/filters/client_channel/health/health_check_client.cc", "src/core/ext/filters/client_channel/http_connect_handshaker.cc", diff --git a/BUILD.gn b/BUILD.gn index a69a8e0940a..c17f339f241 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -218,7 +218,6 @@ config("grpc_config") { "src/core/ext/filters/client_channel/client_channel_factory.cc", "src/core/ext/filters/client_channel/client_channel_factory.h", "src/core/ext/filters/client_channel/client_channel_plugin.cc", - "src/core/ext/filters/client_channel/connector.cc", "src/core/ext/filters/client_channel/connector.h", "src/core/ext/filters/client_channel/global_subchannel_pool.cc", "src/core/ext/filters/client_channel/global_subchannel_pool.h", @@ -1202,7 +1201,6 @@ config("grpc_config") { "src/core/ext/filters/client_channel/client_channel_factory.cc", "src/core/ext/filters/client_channel/client_channel_factory.h", "src/core/ext/filters/client_channel/client_channel_plugin.cc", - "src/core/ext/filters/client_channel/connector.cc", "src/core/ext/filters/client_channel/connector.h", "src/core/ext/filters/client_channel/global_subchannel_pool.cc", "src/core/ext/filters/client_channel/global_subchannel_pool.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c370c3ebd8..b000dde6cde 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1360,7 +1360,6 @@ add_library(grpc src/core/ext/filters/client_channel/client_channel_channelz.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc - src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/global_subchannel_pool.cc src/core/ext/filters/client_channel/health/health_check_client.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -1766,7 +1765,6 @@ add_library(grpc_cronet src/core/ext/filters/client_channel/client_channel_channelz.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc - src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/global_subchannel_pool.cc src/core/ext/filters/client_channel/health/health_check_client.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -2164,7 +2162,6 @@ add_library(grpc_test_util src/core/ext/filters/client_channel/client_channel_channelz.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc - src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/global_subchannel_pool.cc src/core/ext/filters/client_channel/health/health_check_client.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -2508,7 +2505,6 @@ add_library(grpc_test_util_unsecure src/core/ext/filters/client_channel/client_channel_channelz.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc - src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/global_subchannel_pool.cc src/core/ext/filters/client_channel/health/health_check_client.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -2863,7 +2859,6 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/client_channel_channelz.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc - src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/global_subchannel_pool.cc src/core/ext/filters/client_channel/health/health_check_client.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -3301,7 +3296,6 @@ add_library(grpc++ src/core/ext/filters/client_channel/client_channel_channelz.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc - src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/global_subchannel_pool.cc src/core/ext/filters/client_channel/health/health_check_client.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -4563,7 +4557,6 @@ add_library(grpc++_unsecure src/core/ext/filters/client_channel/client_channel_channelz.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc - src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/global_subchannel_pool.cc src/core/ext/filters/client_channel/health/health_check_client.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc diff --git a/Makefile b/Makefile index ed34f3e950a..4d398c4477f 100644 --- a/Makefile +++ b/Makefile @@ -3848,7 +3848,6 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ - src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ @@ -4246,7 +4245,6 @@ LIBGRPC_CRONET_SRC = \ src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ - src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ @@ -4635,7 +4633,6 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ - src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ @@ -4965,7 +4962,6 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ - src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ @@ -5293,7 +5289,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ - src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ @@ -5696,7 +5691,6 @@ LIBGRPC++_SRC = \ src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ - src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ @@ -6924,7 +6918,6 @@ LIBGRPC++_UNSECURE_SRC = \ src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ - src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ diff --git a/build.yaml b/build.yaml index 364f3bfcb50..97cd943fdee 100644 --- a/build.yaml +++ b/build.yaml @@ -996,7 +996,6 @@ filegroups: - src/core/ext/filters/client_channel/client_channel_channelz.cc - src/core/ext/filters/client_channel/client_channel_factory.cc - src/core/ext/filters/client_channel/client_channel_plugin.cc - - src/core/ext/filters/client_channel/connector.cc - src/core/ext/filters/client_channel/global_subchannel_pool.cc - src/core/ext/filters/client_channel/health/health_check_client.cc - src/core/ext/filters/client_channel/http_connect_handshaker.cc diff --git a/config.m4 b/config.m4 index 322029f0c72..933e940b948 100644 --- a/config.m4 +++ b/config.m4 @@ -45,7 +45,6 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ - src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ diff --git a/config.w32 b/config.w32 index 0062eafa8b6..934f7c63240 100644 --- a/config.w32 +++ b/config.w32 @@ -325,7 +325,6 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\client_channel_channelz.cc " + "src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " + "src\\core\\ext\\filters\\client_channel\\client_channel_plugin.cc " + - "src\\core\\ext\\filters\\client_channel\\connector.cc " + "src\\core\\ext\\filters\\client_channel\\global_subchannel_pool.cc " + "src\\core\\ext\\filters\\client_channel\\health\\health_check_client.cc " + "src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 04e30e692cd..8989626e319 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -201,7 +201,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_factory.h', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', - 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/connector.h', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.h', diff --git a/grpc.gemspec b/grpc.gemspec index d3354b1de7e..ed711536378 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -113,7 +113,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.cc ) s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.h ) s.files += %w( src/core/ext/filters/client_channel/client_channel_plugin.cc ) - s.files += %w( src/core/ext/filters/client_channel/connector.cc ) s.files += %w( src/core/ext/filters/client_channel/connector.h ) s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.cc ) s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.h ) diff --git a/grpc.gyp b/grpc.gyp index 3d62307f057..58de98e7131 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -494,7 +494,6 @@ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', - 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', @@ -807,7 +806,6 @@ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', - 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', @@ -1071,7 +1069,6 @@ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', - 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', @@ -1346,7 +1343,6 @@ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', - 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', @@ -1557,7 +1553,6 @@ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', - 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', @@ -1911,7 +1906,6 @@ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', - 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', diff --git a/package.xml b/package.xml index fc63e5772b2..b5c5f312825 100644 --- a/package.xml +++ b/package.xml @@ -96,7 +96,6 @@ - diff --git a/src/core/ext/filters/client_channel/connector.cc b/src/core/ext/filters/client_channel/connector.cc deleted file mode 100644 index 5e04b3b4539..00000000000 --- a/src/core/ext/filters/client_channel/connector.cc +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include "src/core/ext/filters/client_channel/connector.h" - -grpc_connector* grpc_connector_ref(grpc_connector* connector) { - connector->vtable->ref(connector); - return connector; -} - -void grpc_connector_unref(grpc_connector* connector) { - connector->vtable->unref(connector); -} - -void grpc_connector_connect(grpc_connector* connector, - const grpc_connect_in_args* in_args, - grpc_connect_out_args* out_args, - grpc_closure* notify) { - connector->vtable->connect(connector, in_args, out_args, notify); -} - -void grpc_connector_shutdown(grpc_connector* connector, grpc_error* why) { - connector->vtable->shutdown(connector, why); -} diff --git a/src/core/ext/filters/client_channel/connector.h b/src/core/ext/filters/client_channel/connector.h index 2bd5ff26e3e..256471456e3 100644 --- a/src/core/ext/filters/client_channel/connector.h +++ b/src/core/ext/filters/client_channel/connector.h @@ -23,62 +23,57 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channelz.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/transport/transport.h" -typedef struct grpc_connector grpc_connector; -typedef struct grpc_connector_vtable grpc_connector_vtable; +namespace grpc_core { -struct grpc_connector { - const grpc_connector_vtable* vtable; -}; +// Interface for connection-establishment functionality. +// Each transport that supports client channels (e.g., not inproc) must +// supply an implementation of this. +class SubchannelConnector : public InternallyRefCounted { + public: + struct Args { + // Set of pollsets interested in this connection. + grpc_pollset_set* interested_parties; + // Deadline for connection. + grpc_millis deadline; + // Channel args to be passed to handshakers and transport. + const grpc_channel_args* channel_args; + }; -typedef struct { - /** set of pollsets interested in this connection */ - grpc_pollset_set* interested_parties; - /** deadline for connection */ - grpc_millis deadline; - /** channel arguments (to be passed to transport) */ - const grpc_channel_args* channel_args; -} grpc_connect_in_args; + struct Result { + // The connected transport. + grpc_transport* transport = nullptr; + // Channel args to be passed to filters. + const grpc_channel_args* channel_args = nullptr; + // Channelz socket node of the connected transport, if any. + RefCountedPtr socket_node; -typedef struct { - /** the connected transport */ - grpc_transport* transport; + void Reset() { + transport = nullptr; + channel_args = nullptr; + socket_node.reset(); + } + }; - /** channel arguments (to be passed to the filters) */ - grpc_channel_args* channel_args; + // Attempts to connect. + // When complete, populates *result and invokes notify. + // Only one connection attempt may be in progress at any one time. + virtual void Connect(const Args& args, Result* result, + grpc_closure* notify) = 0; - /** channelz socket node of the connected transport. nullptr if not available - */ - grpc_core::RefCountedPtr socket; + // Cancels any in-flight connection attempt and shuts down the + // connector. + virtual void Shutdown(grpc_error* error) = 0; - void reset() { - transport = nullptr; - channel_args = nullptr; - socket = nullptr; + void Orphan() override { + Shutdown(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Subchannel disconnected")); + Unref(); } -} grpc_connect_out_args; - -struct grpc_connector_vtable { - void (*ref)(grpc_connector* connector); - void (*unref)(grpc_connector* connector); - /** Implementation of grpc_connector_shutdown */ - void (*shutdown)(grpc_connector* connector, grpc_error* why); - /** Implementation of grpc_connector_connect */ - void (*connect)(grpc_connector* connector, - const grpc_connect_in_args* in_args, - grpc_connect_out_args* out_args, grpc_closure* notify); }; -grpc_connector* grpc_connector_ref(grpc_connector* connector); -void grpc_connector_unref(grpc_connector* connector); -/** Connect using the connector: max one outstanding call at a time */ -void grpc_connector_connect(grpc_connector* connector, - const grpc_connect_in_args* in_args, - grpc_connect_out_args* out_args, - grpc_closure* notify); -/** Cancel any pending connection */ -void grpc_connector_shutdown(grpc_connector* connector, grpc_error* why); +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTOR_H */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 4213b7ae086..26769e861e7 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -613,14 +613,14 @@ BackOff::Options ParseArgsForBackoffValues( } // namespace -Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector, +Subchannel::Subchannel(SubchannelKey* key, + OrphanablePtr connector, const grpc_channel_args* args) : key_(key), - connector_(connector), + connector_(std::move(connector)), backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) { GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); gpr_atm_no_barrier_store(&ref_pair_, 1 << INTERNAL_REF_BITS); - grpc_connector_ref(connector_); pollset_set_ = grpc_pollset_set_create(); grpc_resolved_address* addr = static_cast(gpr_malloc(sizeof(*addr))); @@ -668,12 +668,12 @@ Subchannel::~Subchannel() { channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN); } grpc_channel_args_destroy(args_); - grpc_connector_unref(connector_); + connector_.reset(); grpc_pollset_set_destroy(pollset_set_); delete key_; } -Subchannel* Subchannel::Create(grpc_connector* connector, +Subchannel* Subchannel::Create(OrphanablePtr connector, const grpc_channel_args* args) { SubchannelKey* key = new SubchannelKey(args); SubchannelPoolInterface* subchannel_pool = @@ -684,7 +684,7 @@ Subchannel* Subchannel::Create(grpc_connector* connector, delete key; return c; } - c = new Subchannel(key, connector, args); + c = new Subchannel(key, std::move(connector), args); // Try to register the subchannel before setting the subchannel pool. // Otherwise, in case of a registration race, unreffing c in // RegisterSubchannel() will cause c to be tried to be unregistered, while @@ -975,7 +975,7 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { } void Subchannel::ContinueConnectingLocked() { - grpc_connect_in_args args; + SubchannelConnector::Args args; args.interested_parties = pollset_set_; const grpc_millis min_deadline = min_connect_timeout_ms_ + ExecCtx::Get()->Now(); @@ -983,13 +983,13 @@ void Subchannel::ContinueConnectingLocked() { args.deadline = std::max(next_attempt_deadline_, min_deadline); args.channel_args = args_; SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING); - grpc_connector_connect(connector_, &args, &connecting_result_, - &on_connecting_finished_); + connector_->Connect(args, &connecting_result_, &on_connecting_finished_); } void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) { auto* c = static_cast(arg); - grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args; + const grpc_channel_args* delete_channel_args = + c->connecting_result_.channel_args; GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished"); { MutexLock lock(&c->mu_); @@ -1042,8 +1042,8 @@ bool Subchannel::PublishTransportLocked() { return false; } RefCountedPtr socket = - std::move(connecting_result_.socket); - connecting_result_.reset(); + std::move(connecting_result_.socket_node); + connecting_result_.Reset(); if (disconnected_) { grpc_channel_stack_destroy(stk); gpr_free(stk); @@ -1075,8 +1075,7 @@ void Subchannel::Disconnect() { MutexLock lock(&mu_); GPR_ASSERT(!disconnected_); disconnected_ = true; - grpc_connector_shutdown(connector_, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Subchannel disconnected")); + connector_.reset(); connected_subchannel_.reset(); health_watcher_map_.ShutdownLocked(); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index ba4706e3177..cf4ddafb45e 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -197,12 +197,12 @@ class Subchannel { }; // The ctor and dtor are not intended to use directly. - Subchannel(SubchannelKey* key, grpc_connector* connector, + Subchannel(SubchannelKey* key, OrphanablePtr connector, const grpc_channel_args* args); ~Subchannel(); // Creates a subchannel given \a connector and \a args. - static Subchannel* Create(grpc_connector* connector, + static Subchannel* Create(OrphanablePtr connector, const grpc_channel_args* args); // Strong and weak refcounting. @@ -365,9 +365,9 @@ class Subchannel { gpr_atm ref_pair_; // Connection states. - grpc_connector* connector_ = nullptr; + OrphanablePtr connector_; // Set during connection. - grpc_connect_out_args connecting_result_; + SubchannelConnector::Result connecting_result_; grpc_closure on_connecting_finished_; // Active connection, or null. RefCountedPtr connected_subchannel_; diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 18a2df8a2d0..431748be46f 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -38,202 +38,162 @@ #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/slice/slice_internal.h" -typedef struct { - grpc_connector base; +namespace grpc_core { - gpr_mu mu; - gpr_refcount refs; - - bool shutdown; - bool connecting; - - grpc_closure* notify; - grpc_connect_in_args args; - grpc_connect_out_args* result; - - grpc_endpoint* endpoint; // Non-NULL until handshaking starts. - - grpc_closure connected; - - grpc_core::RefCountedPtr handshake_mgr; -} chttp2_connector; +Chttp2Connector::Chttp2Connector() { + GRPC_CLOSURE_INIT(&connected_, Connected, this, grpc_schedule_on_exec_ctx); +} -static void chttp2_connector_ref(grpc_connector* con) { - chttp2_connector* c = reinterpret_cast(con); - gpr_ref(&c->refs); +Chttp2Connector::~Chttp2Connector() { + if (endpoint_ != nullptr) grpc_endpoint_destroy(endpoint_); } -static void chttp2_connector_unref(grpc_connector* con) { - chttp2_connector* c = reinterpret_cast(con); - if (gpr_unref(&c->refs)) { - gpr_mu_destroy(&c->mu); - // If handshaking is not yet in progress, destroy the endpoint. - // Otherwise, the handshaker will do this for us. - if (c->endpoint != nullptr) grpc_endpoint_destroy(c->endpoint); - gpr_free(c); +void Chttp2Connector::Connect(const Args& args, Result* result, + grpc_closure* notify) { + grpc_resolved_address addr; + Subchannel::GetAddressFromSubchannelAddressArg(args.channel_args, &addr); + grpc_endpoint** ep; + { + MutexLock lock(&mu_); + GPR_ASSERT(notify_ == nullptr); + args_ = args; + result_ = result; + notify_ = notify; + GPR_ASSERT(!connecting_); + connecting_ = true; + GPR_ASSERT(endpoint_ == nullptr); + ep = &endpoint_; } + // In some implementations, the closure can be flushed before + // grpc_tcp_client_connect() returns, and since the closure requires access + // to mu_, this can result in a deadlock (see + // https://github.com/grpc/grpc/issues/16427 for details). + // grpc_tcp_client_connect() will fill endpoint_ with proper contents, and we + // make sure that we still exist at that point by taking a ref. + Ref().release(); // Ref held by callback. + grpc_tcp_client_connect(&connected_, ep, args.interested_parties, + args.channel_args, &addr, args.deadline); } -static void chttp2_connector_shutdown(grpc_connector* con, grpc_error* why) { - chttp2_connector* c = reinterpret_cast(con); - gpr_mu_lock(&c->mu); - c->shutdown = true; - if (c->handshake_mgr != nullptr) { - c->handshake_mgr->Shutdown(GRPC_ERROR_REF(why)); +void Chttp2Connector::Shutdown(grpc_error* error) { + MutexLock lock(&mu_); + shutdown_ = true; + if (handshake_mgr_ != nullptr) { + handshake_mgr_->Shutdown(GRPC_ERROR_REF(error)); } // If handshaking is not yet in progress, shutdown the endpoint. // Otherwise, the handshaker will do this for us. - if (!c->connecting && c->endpoint != nullptr) { - grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(why)); + if (!connecting_ && endpoint_ != nullptr) { + grpc_endpoint_shutdown(endpoint_, GRPC_ERROR_REF(error)); } - gpr_mu_unlock(&c->mu); - GRPC_ERROR_UNREF(why); + GRPC_ERROR_UNREF(error); } -static void on_handshake_done(void* arg, grpc_error* error) { - auto* args = static_cast(arg); - chttp2_connector* c = static_cast(args->user_data); - gpr_mu_lock(&c->mu); - if (error != GRPC_ERROR_NONE || c->shutdown) { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown"); - // We were shut down after handshaking completed successfully, so - // destroy the endpoint here. - // TODO(ctiller): It is currently necessary to shutdown endpoints - // before destroying them, even if we know that there are no - // pending read/write callbacks. This should be fixed, at which - // point this can be removed. - grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error)); - grpc_endpoint_destroy(args->endpoint); - grpc_channel_args_destroy(args->args); - grpc_slice_buffer_destroy_internal(args->read_buffer); - gpr_free(args->read_buffer); +void Chttp2Connector::Connected(void* arg, grpc_error* error) { + Chttp2Connector* self = static_cast(arg); + bool unref = false; + { + MutexLock lock(&self->mu_); + GPR_ASSERT(self->connecting_); + self->connecting_ = false; + if (error != GRPC_ERROR_NONE || self->shutdown_) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + if (self->endpoint_ != nullptr) { + grpc_endpoint_shutdown(self->endpoint_, GRPC_ERROR_REF(error)); + } + self->result_->Reset(); + grpc_closure* notify = self->notify_; + self->notify_ = nullptr; + ExecCtx::Run(DEBUG_LOCATION, notify, error); + unref = true; } else { - error = GRPC_ERROR_REF(error); + GPR_ASSERT(self->endpoint_ != nullptr); + self->StartHandshakeLocked(); } - c->result->reset(); - } else { - grpc_endpoint_delete_from_pollset_set(args->endpoint, - c->args.interested_parties); - c->result->transport = - grpc_create_chttp2_transport(args->args, args->endpoint, true); - c->result->socket = - grpc_chttp2_transport_get_socket_node(c->result->transport); - GPR_ASSERT(c->result->transport); - // TODO(roth): We ideally want to wait until we receive HTTP/2 - // settings from the server before we consider the connection - // established. If that doesn't happen before the connection - // timeout expires, then we should consider the connection attempt a - // failure and feed that information back into the backoff code. - // We could pass a notify_on_receive_settings callback to - // grpc_chttp2_transport_start_reading() to let us know when - // settings are received, but we would need to figure out how to use - // that information here. - // - // Unfortunately, we don't currently have a way to split apart the two - // effects of scheduling c->notify: we start sending RPCs immediately - // (which we want to do) and we consider the connection attempt successful - // (which we don't want to do until we get the notify_on_receive_settings - // callback from the transport). If we could split those things - // apart, then we could start sending RPCs but then wait for our - // timeout before deciding if the connection attempt is successful. - // If the attempt is not successful, then we would tear down the - // transport and feed the failure back into the backoff code. - // - // In addition, even if we did that, we would probably not want to do - // so until after transparent retries is implemented. Otherwise, any - // RPC that we attempt to send on the connection before the timeout - // would fail instead of being retried on a subsequent attempt. - grpc_chttp2_transport_start_reading(c->result->transport, args->read_buffer, - nullptr); - c->result->channel_args = args->args; } - grpc_closure* notify = c->notify; - c->notify = nullptr; - grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error); - c->handshake_mgr.reset(); - gpr_mu_unlock(&c->mu); - chttp2_connector_unref(reinterpret_cast(c)); + if (unref) self->Unref(); } -static void start_handshake_locked(chttp2_connector* c) { - c->handshake_mgr = grpc_core::MakeRefCounted(); - grpc_core::HandshakerRegistry::AddHandshakers( - grpc_core::HANDSHAKER_CLIENT, c->args.channel_args, - c->args.interested_parties, c->handshake_mgr.get()); - grpc_endpoint_add_to_pollset_set(c->endpoint, c->args.interested_parties); - c->handshake_mgr->DoHandshake(c->endpoint, c->args.channel_args, - c->args.deadline, nullptr /* acceptor */, - on_handshake_done, c); - c->endpoint = nullptr; // Endpoint handed off to handshake manager. +void Chttp2Connector::StartHandshakeLocked() { + handshake_mgr_ = MakeRefCounted(); + HandshakerRegistry::AddHandshakers(HANDSHAKER_CLIENT, args_.channel_args, + args_.interested_parties, + handshake_mgr_.get()); + grpc_endpoint_add_to_pollset_set(endpoint_, args_.interested_parties); + handshake_mgr_->DoHandshake(endpoint_, args_.channel_args, args_.deadline, + nullptr /* acceptor */, OnHandshakeDone, this); + endpoint_ = nullptr; // Endpoint handed off to handshake manager. } -static void connected(void* arg, grpc_error* error) { - chttp2_connector* c = static_cast(arg); - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->connecting); - c->connecting = false; - if (error != GRPC_ERROR_NONE || c->shutdown) { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown"); +void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error* error) { + auto* args = static_cast(arg); + Chttp2Connector* self = static_cast(args->user_data); + { + MutexLock lock(&self->mu_); + if (error != GRPC_ERROR_NONE || self->shutdown_) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown"); + // We were shut down after handshaking completed successfully, so + // destroy the endpoint here. + // TODO(ctiller): It is currently necessary to shutdown endpoints + // before destroying them, even if we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error)); + grpc_endpoint_destroy(args->endpoint); + grpc_channel_args_destroy(args->args); + grpc_slice_buffer_destroy_internal(args->read_buffer); + gpr_free(args->read_buffer); + } else { + error = GRPC_ERROR_REF(error); + } + self->result_->Reset(); } else { - error = GRPC_ERROR_REF(error); + grpc_endpoint_delete_from_pollset_set(args->endpoint, + self->args_.interested_parties); + self->result_->transport = + grpc_create_chttp2_transport(args->args, args->endpoint, true); + self->result_->socket_node = + grpc_chttp2_transport_get_socket_node(self->result_->transport); + GPR_ASSERT(self->result_->transport != nullptr); + // TODO(roth): We ideally want to wait until we receive HTTP/2 + // settings from the server before we consider the connection + // established. If that doesn't happen before the connection + // timeout expires, then we should consider the connection attempt a + // failure and feed that information back into the backoff code. + // We could pass a notify_on_receive_settings callback to + // grpc_chttp2_transport_start_reading() to let us know when + // settings are received, but we would need to figure out how to use + // that information here. + // + // Unfortunately, we don't currently have a way to split apart the two + // effects of scheduling c->notify: we start sending RPCs immediately + // (which we want to do) and we consider the connection attempt successful + // (which we don't want to do until we get the notify_on_receive_settings + // callback from the transport). If we could split those things + // apart, then we could start sending RPCs but then wait for our + // timeout before deciding if the connection attempt is successful. + // If the attempt is not successful, then we would tear down the + // transport and feed the failure back into the backoff code. + // + // In addition, even if we did that, we would probably not want to do + // so until after transparent retries is implemented. Otherwise, any + // RPC that we attempt to send on the connection before the timeout + // would fail instead of being retried on a subsequent attempt. + grpc_chttp2_transport_start_reading(self->result_->transport, + args->read_buffer, nullptr); + self->result_->channel_args = args->args; } - c->result->reset(); - grpc_closure* notify = c->notify; - c->notify = nullptr; - grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error); - if (c->endpoint != nullptr) { - grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(error)); - } - gpr_mu_unlock(&c->mu); - chttp2_connector_unref(static_cast(arg)); - } else { - GPR_ASSERT(c->endpoint != nullptr); - start_handshake_locked(c); - gpr_mu_unlock(&c->mu); + grpc_closure* notify = self->notify_; + self->notify_ = nullptr; + ExecCtx::Run(DEBUG_LOCATION, notify, error); + self->handshake_mgr_.reset(); } + self->Unref(); } -static void chttp2_connector_connect(grpc_connector* con, - const grpc_connect_in_args* args, - grpc_connect_out_args* result, - grpc_closure* notify) { - chttp2_connector* c = reinterpret_cast(con); - grpc_resolved_address addr; - grpc_core::Subchannel::GetAddressFromSubchannelAddressArg(args->channel_args, - &addr); - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->notify == nullptr); - c->notify = notify; - c->args = *args; - c->result = result; - GPR_ASSERT(c->endpoint == nullptr); - chttp2_connector_ref(con); // Ref taken for callback. - GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx); - GPR_ASSERT(!c->connecting); - c->connecting = true; - grpc_closure* closure = &c->connected; - grpc_endpoint** ep = &c->endpoint; - gpr_mu_unlock(&c->mu); - // In some implementations, the closure can be flushed before - // grpc_tcp_client_connect and since the closure requires access to c->mu, - // this can result in a deadlock. Refer - // https://github.com/grpc/grpc/issues/16427 - // grpc_tcp_client_connect would fill c->endpoint with proper contents and we - // make sure that we would still exist at that point by taking a ref. - grpc_tcp_client_connect(closure, ep, args->interested_parties, - args->channel_args, &addr, args->deadline); -} - -static const grpc_connector_vtable chttp2_connector_vtable = { - chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown, - chttp2_connector_connect}; - -grpc_connector* grpc_chttp2_connector_create() { - chttp2_connector* c = static_cast(gpr_zalloc(sizeof(*c))); - c->base.vtable = &chttp2_connector_vtable; - gpr_mu_init(&c->mu); - gpr_ref_init(&c->refs, 1); - return &c->base; -} +} // namespace grpc_core diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h index 04da4413095..1ecd172bba1 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.h +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -22,7 +22,37 @@ #include #include "src/core/ext/filters/client_channel/connector.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/channel/handshaker_registry.h" -grpc_connector* grpc_chttp2_connector_create(); +namespace grpc_core { + +class Chttp2Connector : public SubchannelConnector { + public: + Chttp2Connector(); + ~Chttp2Connector(); + + void Connect(const Args& args, Result* result, grpc_closure* notify) override; + void Shutdown(grpc_error* error) override; + + private: + static void Connected(void* arg, grpc_error* error); + void StartHandshakeLocked(); + static void OnHandshakeDone(void* arg, grpc_error* error); + + Mutex mu_; + Args args_; + Result* result_ = nullptr; + grpc_closure* notify_ = nullptr; + bool shutdown_ = false; + bool connecting_ = false; + // Holds the endpoint when first created before being handed off to + // the handshake manager. + grpc_endpoint* endpoint_ = nullptr; + grpc_closure connected_; + RefCountedPtr handshake_mgr_; +}; + +} // namespace grpc_core #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc index bd7bad72f0a..189ed7d1ce5 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc @@ -40,9 +40,8 @@ class Chttp2InsecureClientChannelFactory : public ClientChannelFactory { Subchannel* CreateSubchannel(const grpc_channel_args* args) override { grpc_channel_args* new_args = grpc_default_authority_add_if_not_present(args); - grpc_connector* connector = grpc_chttp2_connector_create(); - Subchannel* s = Subchannel::Create(connector, new_args); - grpc_connector_unref(connector); + Subchannel* s = + Subchannel::Create(MakeOrphanable(), new_args); grpc_channel_args_destroy(new_args); return s; } diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index 91f7c13a335..33aa0cd0808 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -51,9 +51,8 @@ class Chttp2SecureClientChannelFactory : public ClientChannelFactory { "Failed to create channel args during subchannel creation."); return nullptr; } - grpc_connector* connector = grpc_chttp2_connector_create(); - Subchannel* s = Subchannel::Create(connector, new_args); - grpc_connector_unref(connector); + Subchannel* s = + Subchannel::Create(MakeOrphanable(), new_args); grpc_channel_args_destroy(new_args); return s; } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 556912a7708..103bcf83cb6 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -24,7 +24,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', - 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 1d69aa232d8..9a75e179261 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1074,7 +1074,6 @@ src/core/ext/filters/client_channel/client_channel_channelz.h \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_factory.h \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ -src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/connector.h \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 1b0ff5c5698..bec0ab35f8e 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -888,7 +888,6 @@ src/core/ext/filters/client_channel/client_channel_channelz.h \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_factory.h \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ -src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/connector.h \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.h \