diff --git a/BUILD b/BUILD index fb660b6892d..14f478e6b00 100644 --- a/BUILD +++ b/BUILD @@ -3734,7 +3734,6 @@ grpc_cc_library( "src/core/ext/xds/xds_bootstrap.cc", "src/core/ext/xds/xds_certificate_provider.cc", "src/core/ext/xds/xds_client.cc", - "src/core/ext/xds/xds_client_grpc.cc", "src/core/ext/xds/xds_client_stats.cc", "src/core/ext/xds/xds_cluster.cc", "src/core/ext/xds/xds_cluster_specifier_plugin.cc", @@ -3748,7 +3747,6 @@ grpc_cc_library( "src/core/ext/xds/xds_resource_type.cc", "src/core/ext/xds/xds_route_config.cc", "src/core/ext/xds/xds_routing.cc", - "src/core/ext/xds/xds_transport_grpc.cc", "src/core/lib/security/credentials/xds/xds_credentials.cc", ], hdrs = [ @@ -3762,7 +3760,6 @@ grpc_cc_library( "src/core/ext/xds/xds_certificate_provider.h", "src/core/ext/xds/xds_channel_args.h", "src/core/ext/xds/xds_client.h", - "src/core/ext/xds/xds_client_grpc.h", "src/core/ext/xds/xds_client_stats.h", "src/core/ext/xds/xds_cluster.h", "src/core/ext/xds/xds_cluster_specifier_plugin.h", @@ -3777,12 +3774,9 @@ grpc_cc_library( "src/core/ext/xds/xds_resource_type_impl.h", "src/core/ext/xds/xds_route_config.h", "src/core/ext/xds/xds_routing.h", - "src/core/ext/xds/xds_transport.h", - "src/core/ext/xds/xds_transport_grpc.h", "src/core/lib/security/credentials/xds/xds_credentials.h", ], external_deps = [ - "absl/base:core_headers", "absl/container:inlined_vector", "absl/functional:bind_front", "absl/memory", diff --git a/CMakeLists.txt b/CMakeLists.txt index f2ad9421bfb..44ca18c5545 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2027,7 +2027,6 @@ add_library(grpc src/core/ext/xds/xds_certificate_provider.cc src/core/ext/xds/xds_channel_stack_modifier.cc src/core/ext/xds/xds_client.cc - src/core/ext/xds/xds_client_grpc.cc src/core/ext/xds/xds_client_stats.cc src/core/ext/xds/xds_cluster.cc src/core/ext/xds/xds_cluster_specifier_plugin.cc @@ -2042,7 +2041,6 @@ add_library(grpc src/core/ext/xds/xds_route_config.cc src/core/ext/xds/xds_routing.cc src/core/ext/xds/xds_server_config_fetcher.cc - src/core/ext/xds/xds_transport_grpc.cc src/core/lib/address_utils/parse_address.cc src/core/lib/address_utils/sockaddr_utils.cc src/core/lib/backoff/backoff.cc diff --git a/Makefile b/Makefile index 3926b36609a..91a86bde0d6 100644 --- a/Makefile +++ b/Makefile @@ -1409,7 +1409,6 @@ LIBGRPC_SRC = \ src/core/ext/xds/xds_certificate_provider.cc \ src/core/ext/xds/xds_channel_stack_modifier.cc \ src/core/ext/xds/xds_client.cc \ - src/core/ext/xds/xds_client_grpc.cc \ src/core/ext/xds/xds_client_stats.cc \ src/core/ext/xds/xds_cluster.cc \ src/core/ext/xds/xds_cluster_specifier_plugin.cc \ @@ -1424,7 +1423,6 @@ LIBGRPC_SRC = \ src/core/ext/xds/xds_route_config.cc \ src/core/ext/xds/xds_routing.cc \ src/core/ext/xds/xds_server_config_fetcher.cc \ - src/core/ext/xds/xds_transport_grpc.cc \ src/core/lib/address_utils/parse_address.cc \ src/core/lib/address_utils/sockaddr_utils.cc \ src/core/lib/backoff/backoff.cc \ @@ -3126,7 +3124,6 @@ src/core/ext/xds/xds_bootstrap.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_certificate_provider.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_channel_stack_modifier.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_client.cc: $(OPENSSL_DEP) -src/core/ext/xds/xds_client_grpc.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_client_stats.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_cluster.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_cluster_specifier_plugin.cc: $(OPENSSL_DEP) @@ -3141,7 +3138,6 @@ src/core/ext/xds/xds_resource_type.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_route_config.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_routing.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_server_config_fetcher.cc: $(OPENSSL_DEP) -src/core/ext/xds/xds_transport_grpc.cc: $(OPENSSL_DEP) src/core/lib/http/httpcli_security_connector.cc: $(OPENSSL_DEP) src/core/lib/matchers/matchers.cc: $(OPENSSL_DEP) src/core/lib/security/authorization/grpc_authorization_engine.cc: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 88aa8e90fc1..b8c4b889785 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -702,7 +702,6 @@ libs: - src/core/ext/xds/xds_channel_args.h - src/core/ext/xds/xds_channel_stack_modifier.h - src/core/ext/xds/xds_client.h - - src/core/ext/xds/xds_client_grpc.h - src/core/ext/xds/xds_client_stats.h - src/core/ext/xds/xds_cluster.h - src/core/ext/xds/xds_cluster_specifier_plugin.h @@ -717,8 +716,6 @@ libs: - src/core/ext/xds/xds_resource_type_impl.h - src/core/ext/xds/xds_route_config.h - src/core/ext/xds/xds_routing.h - - src/core/ext/xds/xds_transport.h - - src/core/ext/xds/xds_transport_grpc.h - src/core/lib/address_utils/parse_address.h - src/core/lib/address_utils/sockaddr_utils.h - src/core/lib/avl/avl.h @@ -1388,7 +1385,6 @@ libs: - src/core/ext/xds/xds_certificate_provider.cc - src/core/ext/xds/xds_channel_stack_modifier.cc - src/core/ext/xds/xds_client.cc - - src/core/ext/xds/xds_client_grpc.cc - src/core/ext/xds/xds_client_stats.cc - src/core/ext/xds/xds_cluster.cc - src/core/ext/xds/xds_cluster_specifier_plugin.cc @@ -1403,7 +1399,6 @@ libs: - src/core/ext/xds/xds_route_config.cc - src/core/ext/xds/xds_routing.cc - src/core/ext/xds/xds_server_config_fetcher.cc - - src/core/ext/xds/xds_transport_grpc.cc - src/core/lib/address_utils/parse_address.cc - src/core/lib/address_utils/sockaddr_utils.cc - src/core/lib/backoff/backoff.cc diff --git a/config.m4 b/config.m4 index 9a44118996e..ac31518d16e 100644 --- a/config.m4 +++ b/config.m4 @@ -427,7 +427,6 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/xds/xds_certificate_provider.cc \ src/core/ext/xds/xds_channel_stack_modifier.cc \ src/core/ext/xds/xds_client.cc \ - src/core/ext/xds/xds_client_grpc.cc \ src/core/ext/xds/xds_client_stats.cc \ src/core/ext/xds/xds_cluster.cc \ src/core/ext/xds/xds_cluster_specifier_plugin.cc \ @@ -442,7 +441,6 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/xds/xds_route_config.cc \ src/core/ext/xds/xds_routing.cc \ src/core/ext/xds/xds_server_config_fetcher.cc \ - src/core/ext/xds/xds_transport_grpc.cc \ src/core/lib/address_utils/parse_address.cc \ src/core/lib/address_utils/sockaddr_utils.cc \ src/core/lib/backoff/backoff.cc \ diff --git a/config.w32 b/config.w32 index 174fcf9166b..8ac8429a028 100644 --- a/config.w32 +++ b/config.w32 @@ -393,7 +393,6 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\xds\\xds_certificate_provider.cc " + "src\\core\\ext\\xds\\xds_channel_stack_modifier.cc " + "src\\core\\ext\\xds\\xds_client.cc " + - "src\\core\\ext\\xds\\xds_client_grpc.cc " + "src\\core\\ext\\xds\\xds_client_stats.cc " + "src\\core\\ext\\xds\\xds_cluster.cc " + "src\\core\\ext\\xds\\xds_cluster_specifier_plugin.cc " + @@ -408,7 +407,6 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\xds\\xds_route_config.cc " + "src\\core\\ext\\xds\\xds_routing.cc " + "src\\core\\ext\\xds\\xds_server_config_fetcher.cc " + - "src\\core\\ext\\xds\\xds_transport_grpc.cc " + "src\\core\\lib\\address_utils\\parse_address.cc " + "src\\core\\lib\\address_utils\\sockaddr_utils.cc " + "src\\core\\lib\\backoff\\backoff.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index f44860393e7..435d6db4c53 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -634,7 +634,6 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_channel_args.h', 'src/core/ext/xds/xds_channel_stack_modifier.h', 'src/core/ext/xds/xds_client.h', - 'src/core/ext/xds/xds_client_grpc.h', 'src/core/ext/xds/xds_client_stats.h', 'src/core/ext/xds/xds_cluster.h', 'src/core/ext/xds/xds_cluster_specifier_plugin.h', @@ -649,8 +648,6 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_resource_type_impl.h', 'src/core/ext/xds/xds_route_config.h', 'src/core/ext/xds/xds_routing.h', - 'src/core/ext/xds/xds_transport.h', - 'src/core/ext/xds/xds_transport_grpc.h', 'src/core/lib/address_utils/parse_address.h', 'src/core/lib/address_utils/sockaddr_utils.h', 'src/core/lib/avl/avl.h', @@ -1474,7 +1471,6 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_channel_args.h', 'src/core/ext/xds/xds_channel_stack_modifier.h', 'src/core/ext/xds/xds_client.h', - 'src/core/ext/xds/xds_client_grpc.h', 'src/core/ext/xds/xds_client_stats.h', 'src/core/ext/xds/xds_cluster.h', 'src/core/ext/xds/xds_cluster_specifier_plugin.h', @@ -1489,8 +1485,6 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_resource_type_impl.h', 'src/core/ext/xds/xds_route_config.h', 'src/core/ext/xds/xds_routing.h', - 'src/core/ext/xds/xds_transport.h', - 'src/core/ext/xds/xds_transport_grpc.h', 'src/core/lib/address_utils/parse_address.h', 'src/core/lib/address_utils/sockaddr_utils.h', 'src/core/lib/avl/avl.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3e5702cd5bf..b7ca0540e1f 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -959,8 +959,6 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_channel_stack_modifier.h', 'src/core/ext/xds/xds_client.cc', 'src/core/ext/xds/xds_client.h', - 'src/core/ext/xds/xds_client_grpc.cc', - 'src/core/ext/xds/xds_client_grpc.h', 'src/core/ext/xds/xds_client_stats.cc', 'src/core/ext/xds/xds_client_stats.h', 'src/core/ext/xds/xds_cluster.cc', @@ -989,9 +987,6 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_routing.cc', 'src/core/ext/xds/xds_routing.h', 'src/core/ext/xds/xds_server_config_fetcher.cc', - 'src/core/ext/xds/xds_transport.h', - 'src/core/ext/xds/xds_transport_grpc.cc', - 'src/core/ext/xds/xds_transport_grpc.h', 'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/parse_address.h', 'src/core/lib/address_utils/sockaddr_utils.cc', @@ -2086,7 +2081,6 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_channel_args.h', 'src/core/ext/xds/xds_channel_stack_modifier.h', 'src/core/ext/xds/xds_client.h', - 'src/core/ext/xds/xds_client_grpc.h', 'src/core/ext/xds/xds_client_stats.h', 'src/core/ext/xds/xds_cluster.h', 'src/core/ext/xds/xds_cluster_specifier_plugin.h', @@ -2101,8 +2095,6 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_resource_type_impl.h', 'src/core/ext/xds/xds_route_config.h', 'src/core/ext/xds/xds_routing.h', - 'src/core/ext/xds/xds_transport.h', - 'src/core/ext/xds/xds_transport_grpc.h', 'src/core/lib/address_utils/parse_address.h', 'src/core/lib/address_utils/sockaddr_utils.h', 'src/core/lib/avl/avl.h', diff --git a/grpc.gemspec b/grpc.gemspec index 0cd4fcdcf9b..b2a6ed1f4aa 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -873,8 +873,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/xds/xds_channel_stack_modifier.h ) s.files += %w( src/core/ext/xds/xds_client.cc ) s.files += %w( src/core/ext/xds/xds_client.h ) - s.files += %w( src/core/ext/xds/xds_client_grpc.cc ) - s.files += %w( src/core/ext/xds/xds_client_grpc.h ) s.files += %w( src/core/ext/xds/xds_client_stats.cc ) s.files += %w( src/core/ext/xds/xds_client_stats.h ) s.files += %w( src/core/ext/xds/xds_cluster.cc ) @@ -903,9 +901,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/xds/xds_routing.cc ) s.files += %w( src/core/ext/xds/xds_routing.h ) s.files += %w( src/core/ext/xds/xds_server_config_fetcher.cc ) - s.files += %w( src/core/ext/xds/xds_transport.h ) - s.files += %w( src/core/ext/xds/xds_transport_grpc.cc ) - s.files += %w( src/core/ext/xds/xds_transport_grpc.h ) s.files += %w( src/core/lib/address_utils/parse_address.cc ) s.files += %w( src/core/lib/address_utils/parse_address.h ) s.files += %w( src/core/lib/address_utils/sockaddr_utils.cc ) diff --git a/grpc.gyp b/grpc.gyp index 75ac5727a72..85544624d69 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -759,7 +759,6 @@ 'src/core/ext/xds/xds_certificate_provider.cc', 'src/core/ext/xds/xds_channel_stack_modifier.cc', 'src/core/ext/xds/xds_client.cc', - 'src/core/ext/xds/xds_client_grpc.cc', 'src/core/ext/xds/xds_client_stats.cc', 'src/core/ext/xds/xds_cluster.cc', 'src/core/ext/xds/xds_cluster_specifier_plugin.cc', @@ -774,7 +773,6 @@ 'src/core/ext/xds/xds_route_config.cc', 'src/core/ext/xds/xds_routing.cc', 'src/core/ext/xds/xds_server_config_fetcher.cc', - 'src/core/ext/xds/xds_transport_grpc.cc', 'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/sockaddr_utils.cc', 'src/core/lib/backoff/backoff.cc', diff --git a/package.xml b/package.xml index 85bdb8d3552..cf69c6ebb7b 100644 --- a/package.xml +++ b/package.xml @@ -855,8 +855,6 @@ - - @@ -885,9 +883,6 @@ - - - diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 31fd55da442..c3d4ba76b90 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -46,7 +46,6 @@ #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_certificate_provider.h" #include "src/core/ext/xds/xds_client.h" -#include "src/core/ext/xds/xds_client_grpc.h" #include "src/core/ext/xds/xds_cluster.h" #include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_resource_type_impl.h" @@ -720,7 +719,7 @@ class CdsLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { RefCountedPtr xds_client = - GrpcXdsClient::GetFromChannelArgs(*args.args); + XdsClient::GetFromChannelArgs(*args.args); if (xds_client == nullptr) { gpr_log(GPR_ERROR, "XdsClient not present in channel args -- cannot instantiate " diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index 4ab9b7ee13c..09fbcbe7379 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -49,7 +49,6 @@ #include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client.h" -#include "src/core/ext/xds/xds_client_grpc.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_endpoint.h" #include "src/core/lib/channel/channel_args.h" @@ -692,7 +691,7 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { RefCountedPtr xds_client = - GrpcXdsClient::GetFromChannelArgs(*args.args); + XdsClient::GetFromChannelArgs(*args.args); if (xds_client == nullptr) { gpr_log(GPR_ERROR, "XdsClient not present in channel args -- cannot instantiate " diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 1a27232adaa..07edc3ea2b3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -52,7 +52,6 @@ #include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client.h" -#include "src/core/ext/xds/xds_client_grpc.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_endpoint.h" #include "src/core/ext/xds/xds_resource_type_impl.h" @@ -1018,7 +1017,7 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { RefCountedPtr xds_client = - GrpcXdsClient::GetFromChannelArgs(*args.args); + XdsClient::GetFromChannelArgs(*args.args); if (xds_client == nullptr) { gpr_log(GPR_ERROR, "XdsClient not present in channel args -- cannot instantiate " diff --git a/src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc b/src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc index 5f1dda6f12f..420592ca0e0 100644 --- a/src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc @@ -23,7 +23,6 @@ #include #include #include -#include #include #include "absl/memory/memory.h" @@ -40,7 +39,7 @@ #include #include -#include "src/core/ext/xds/xds_client_grpc.h" +#include "src/core/ext/xds/xds_client.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/gpr/env.h" diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index bedafef5056..b944212d2fd 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -29,6 +29,7 @@ #include #include "absl/memory/memory.h" +#include "absl/meta/type_traits.h" #include "absl/random/random.h" #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -57,7 +58,6 @@ #include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client.h" -#include "src/core/ext/xds/xds_client_grpc.h" #include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_listener.h" #include "src/core/ext/xds/xds_resource_type_impl.h" @@ -790,7 +790,7 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig( void XdsResolver::StartLocked() { grpc_error_handle error = GRPC_ERROR_NONE; - xds_client_ = GrpcXdsClient::GetOrCreate(args_, &error); + xds_client_ = XdsClient::GetOrCreate(args_, &error); if (!GRPC_ERROR_IS_NONE(error)) { gpr_log(GPR_ERROR, "Failed to create xds client -- channel will remain in " @@ -852,9 +852,8 @@ void XdsResolver::StartLocked() { gpr_log(GPR_INFO, "[xds_resolver %p] Started with lds_resource_name %s.", this, lds_resource_name_.c_str()); } - grpc_pollset_set_add_pollset_set( - static_cast(xds_client_.get())->interested_parties(), - interested_parties_); + grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), + interested_parties_); auto watcher = MakeRefCounted(Ref()); listener_watcher_ = watcher.get(); XdsListenerResourceType::StartWatch(xds_client_.get(), lds_resource_name_, @@ -876,9 +875,8 @@ void XdsResolver::ShutdownLocked() { xds_client_.get(), route_config_name_, route_config_watcher_, /*delay_unsubscription=*/false); } - grpc_pollset_set_del_pollset_set( - static_cast(xds_client_.get())->interested_parties(), - interested_parties_); + grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), + interested_parties_); xds_client_.reset(); } } @@ -977,8 +975,7 @@ void XdsResolver::OnError(absl::string_view context, absl::Status status) { Result result; result.addresses = status; result.service_config = std::move(status); - grpc_arg new_arg = - static_cast(xds_client_.get())->MakeChannelArg(); + grpc_arg new_arg = xds_client_->MakeChannelArg(); result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1); result_handler_->ReportResult(std::move(result)); } @@ -1072,7 +1069,7 @@ void XdsResolver::GenerateResult() { : result.service_config.status().ToString().c_str()); } grpc_arg new_args[] = { - static_cast(xds_client_.get())->MakeChannelArg(), + xds_client_->MakeChannelArg(), config_selector->MakeChannelArg(), }; result.args = diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 58765609a65..6714167eb41 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -258,18 +258,18 @@ void MaybeLogDiscoveryRequest( } } -std::string SerializeDiscoveryRequest( +grpc_slice SerializeDiscoveryRequest( const XdsEncodingContext& context, envoy_service_discovery_v3_DiscoveryRequest* request) { size_t output_length; char* output = envoy_service_discovery_v3_DiscoveryRequest_serialize( request, context.arena, &output_length); - return std::string(output, output_length); + return grpc_slice_from_copied_buffer(output, output_length); } } // namespace -std::string XdsApi::CreateAdsRequest( +grpc_slice XdsApi::CreateAdsRequest( const XdsBootstrap::XdsServer& server, absl::string_view type_url, absl::string_view version, absl::string_view nonce, const std::vector& resource_names, grpc_error_handle error, @@ -356,7 +356,7 @@ void MaybeLogDiscoveryResponse( } // namespace absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server, - absl::string_view encoded_response, + const grpc_slice& encoded_response, AdsResponseParserInterface* parser) { upb::Arena arena; const XdsEncodingContext context = {client_, @@ -369,7 +369,8 @@ absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server, // Decode the response. const envoy_service_discovery_v3_DiscoveryResponse* response = envoy_service_discovery_v3_DiscoveryResponse_parse( - encoded_response.data(), encoded_response.size(), arena.ptr()); + reinterpret_cast(GRPC_SLICE_START_PTR(encoded_response)), + GRPC_SLICE_LENGTH(encoded_response), arena.ptr()); // If decoding fails, report a fatal error and return. if (response == nullptr) { return absl::InvalidArgumentError("Can't decode DiscoveryResponse."); @@ -437,18 +438,18 @@ void MaybeLogLrsRequest( } } -std::string SerializeLrsRequest( +grpc_slice SerializeLrsRequest( const XdsEncodingContext& context, const envoy_service_load_stats_v3_LoadStatsRequest* request) { size_t output_length; char* output = envoy_service_load_stats_v3_LoadStatsRequest_serialize( request, context.arena, &output_length); - return std::string(output, output_length); + return grpc_slice_from_copied_buffer(output, output_length); } } // namespace -std::string XdsApi::CreateLrsInitialRequest( +grpc_slice XdsApi::CreateLrsInitialRequest( const XdsBootstrap::XdsServer& server) { upb::Arena arena; const XdsEncodingContext context = {client_, @@ -525,7 +526,7 @@ void LocalityStatsPopulate( } // namespace -std::string XdsApi::CreateLrsRequest( +grpc_slice XdsApi::CreateLrsRequest( ClusterLoadReportMap cluster_load_report_map) { upb::Arena arena; // The xDS server info is not actually needed here, so we seed it with an @@ -596,7 +597,7 @@ std::string XdsApi::CreateLrsRequest( return SerializeLrsRequest(context, request); } -grpc_error_handle XdsApi::ParseLrsResponse(absl::string_view encoded_response, +grpc_error_handle XdsApi::ParseLrsResponse(const grpc_slice& encoded_response, bool* send_all_clusters, std::set* cluster_names, Duration* load_reporting_interval) { @@ -604,7 +605,8 @@ grpc_error_handle XdsApi::ParseLrsResponse(absl::string_view encoded_response, // Decode the response. const envoy_service_load_stats_v3_LoadStatsResponse* decoded_response = envoy_service_load_stats_v3_LoadStatsResponse_parse( - encoded_response.data(), encoded_response.size(), arena.ptr()); + reinterpret_cast(GRPC_SLICE_START_PTR(encoded_response)), + GRPC_SLICE_LENGTH(encoded_response), arena.ptr()); // Parse the response. if (decoded_response == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode response."); diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index 3acb23f1eea..be46aa7458b 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -32,6 +32,8 @@ #include "envoy/admin/v3/config_dump.upb.h" #include "upb/def.hpp" +#include + #include "src/core/ext/xds/certificate_provider_store.h" #include "src/core/ext/xds/upb_utils.h" #include "src/core/ext/xds/xds_bootstrap.h" @@ -148,29 +150,29 @@ class XdsApi { // Creates an ADS request. // Takes ownership of \a error. - std::string CreateAdsRequest(const XdsBootstrap::XdsServer& server, - absl::string_view type_url, - absl::string_view version, - absl::string_view nonce, - const std::vector& resource_names, - grpc_error_handle error, bool populate_node); + grpc_slice CreateAdsRequest(const XdsBootstrap::XdsServer& server, + absl::string_view type_url, + absl::string_view version, + absl::string_view nonce, + const std::vector& resource_names, + grpc_error_handle error, bool populate_node); // Returns non-OK when failing to deserialize response message. // Otherwise, all events are reported to the parser. absl::Status ParseAdsResponse(const XdsBootstrap::XdsServer& server, - absl::string_view encoded_response, + const grpc_slice& encoded_response, AdsResponseParserInterface* parser); // Creates an initial LRS request. - std::string CreateLrsInitialRequest(const XdsBootstrap::XdsServer& server); + grpc_slice CreateLrsInitialRequest(const XdsBootstrap::XdsServer& server); // Creates an LRS request sending a client-side load report. - std::string CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map); + grpc_slice CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map); // Parses the LRS response and returns \a // load_reporting_interval for client-side load reporting. If there is any // error, the output config is invalid. - grpc_error_handle ParseLrsResponse(absl::string_view encoded_response, + grpc_error_handle ParseLrsResponse(const grpc_slice& encoded_response, bool* send_all_clusters, std::set* cluster_names, Duration* load_reporting_interval); diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 310c0e08e7c..38e06feef04 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -19,11 +19,12 @@ #include "src/core/ext/xds/xds_client.h" #include +#include #include #include -#include "absl/memory/memory.h" +#include "absl/container/inlined_vector.h" #include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" @@ -32,21 +33,51 @@ #include "absl/strings/string_view.h" #include "absl/strings/strip.h" +#include +#include +#include +#include +#include +#include #include +#include #include +#include +#include +#include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/xds/upb_utils.h" #include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_bootstrap.h" +#include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/ext/xds/xds_cluster_specifier_plugin.h" +#include "src/core/ext/xds/xds_http_filters.h" #include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_fwd.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/load_file.h" +#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/security/credentials/channel_creds_registry.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/slice/slice.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_refcount.h" +#include "src/core/lib/surface/call.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/lame_client.h" +#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/uri/uri_parser.h" #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1 @@ -60,6 +91,16 @@ namespace grpc_core { TraceFlag grpc_xds_client_trace(false, "xds_client"); TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount"); +namespace { + +Mutex* g_mu = nullptr; + +const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr; +XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; +char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; + +} // namespace + // // Internal class declarations // @@ -108,6 +149,7 @@ class XdsClient::ChannelState::AdsCallState public: // The ctor and dtor should not be used directly. explicit AdsCallState(RefCountedPtr> parent); + ~AdsCallState() override; void Orphan() override; @@ -262,24 +304,6 @@ class XdsClient::ChannelState::AdsCallState grpc_closure timer_callback_; }; - class StreamEventHandler - : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler { - public: - explicit StreamEventHandler(RefCountedPtr ads_calld) - : ads_calld_(std::move(ads_calld)) {} - - void OnRequestSent(bool ok) override { ads_calld_->OnRequestSent(ok); } - void OnRecvMessage(absl::string_view payload) override { - ads_calld_->OnRecvMessage(payload); - } - void OnStatusReceived(absl::Status status) override { - ads_calld_->OnStatusReceived(std::move(status)); - } - - private: - RefCountedPtr ads_calld_; - }; - struct ResourceTypeState { ~ResourceTypeState() { GRPC_ERROR_UNREF(error); } @@ -296,9 +320,15 @@ class XdsClient::ChannelState::AdsCallState void SendMessageLocked(const XdsResourceType* type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - void OnRequestSent(bool ok); - void OnRecvMessage(absl::string_view payload); - void OnStatusReceived(absl::Status status); + static void OnRequestSent(void* arg, grpc_error_handle error); + void OnRequestSentLocked(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + static void OnResponseReceived(void* arg, grpc_error_handle error); + bool OnResponseReceivedLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + static void OnStatusReceived(void* arg, grpc_error_handle error); + void OnStatusReceivedLocked(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool IsCurrentCallOnChannel() const; @@ -310,11 +340,28 @@ class XdsClient::ChannelState::AdsCallState // The owning RetryableCall<>. RefCountedPtr> parent_; - OrphanablePtr call_; - bool sent_initial_message_ = false; bool seen_response_ = false; - bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false; + + // Always non-NULL. + grpc_call* call_; + + // recv_initial_metadata + grpc_metadata_array initial_metadata_recv_; + + // send_message + grpc_byte_buffer* send_message_payload_ = nullptr; + grpc_closure on_request_sent_; + + // recv_message + grpc_byte_buffer* recv_message_payload_ = nullptr; + grpc_closure on_response_received_; + + // recv_trailing_metadata + grpc_metadata_array trailing_metadata_recv_; + grpc_status_code status_code_; + grpc_slice status_details_; + grpc_closure on_status_received_; // Resource types for which requests need to be sent. std::set buffered_requests_; @@ -329,11 +376,11 @@ class XdsClient::ChannelState::LrsCallState public: // The ctor and dtor should not be used directly. explicit LrsCallState(RefCountedPtr> parent); + ~LrsCallState() override; void Orphan() override; - void MaybeStartReportingLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + void MaybeStartReportingLocked(); RetryableCall* parent() { return parent_.get(); } ChannelState* chand() const { return parent_->chand(); } @@ -341,24 +388,6 @@ class XdsClient::ChannelState::LrsCallState bool seen_response() const { return seen_response_; } private: - class StreamEventHandler - : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler { - public: - explicit StreamEventHandler(RefCountedPtr lrs_calld) - : lrs_calld_(std::move(lrs_calld)) {} - - void OnRequestSent(bool ok) override { lrs_calld_->OnRequestSent(ok); } - void OnRecvMessage(absl::string_view payload) override { - lrs_calld_->OnRecvMessage(payload); - } - void OnStatusReceived(absl::Status status) override { - lrs_calld_->OnStatusReceived(std::move(status)); - } - - private: - RefCountedPtr lrs_calld_; - }; - // Reports client-side load stats according to a fixed interval. class Reporter : public InternallyRefCounted { public: @@ -366,13 +395,13 @@ class XdsClient::ChannelState::LrsCallState : parent_(std::move(parent)), report_interval_(report_interval) { GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this, + grpc_schedule_on_exec_ctx); ScheduleNextReportLocked(); } void Orphan() override; - void OnReportDoneLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - private: void ScheduleNextReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); @@ -380,6 +409,9 @@ class XdsClient::ChannelState::LrsCallState bool OnNextReportTimerLocked(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + static void OnReportDone(void* arg, grpc_error_handle error); + bool OnReportDoneLocked(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool IsCurrentReporterOnCall() const { return this == parent_->reporter_.get(); @@ -395,21 +427,44 @@ class XdsClient::ChannelState::LrsCallState bool next_report_timer_callback_pending_ = false; grpc_timer next_report_timer_; grpc_closure on_next_report_timer_; + grpc_closure on_report_done_; }; - void OnRequestSent(bool ok); - void OnRecvMessage(absl::string_view payload); - void OnStatusReceived(absl::Status status); + static void OnInitialRequestSent(void* arg, grpc_error_handle error); + void OnInitialRequestSentLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + static void OnResponseReceived(void* arg, grpc_error_handle error); + bool OnResponseReceivedLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + static void OnStatusReceived(void* arg, grpc_error_handle error); + void OnStatusReceivedLocked(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool IsCurrentCallOnChannel() const; // The owning RetryableCall<>. RefCountedPtr> parent_; + bool seen_response_ = false; - OrphanablePtr call_; + // Always non-NULL. + grpc_call* call_; - bool seen_response_ = false; - bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false; + // recv_initial_metadata + grpc_metadata_array initial_metadata_recv_; + + // send_message + grpc_byte_buffer* send_message_payload_ = nullptr; + grpc_closure on_initial_request_sent_; + + // recv_message + grpc_byte_buffer* recv_message_payload_ = nullptr; + grpc_closure on_response_received_; + + // recv_trailing_metadata + grpc_metadata_array trailing_metadata_recv_; + grpc_status_code status_code_; + grpc_slice status_details_; + grpc_closure on_status_received_; // Load reporting state. bool send_all_clusters_ = false; @@ -418,10 +473,58 @@ class XdsClient::ChannelState::LrsCallState OrphanablePtr reporter_; }; +// +// XdsClient::ChannelState::StateWatcher +// + +class XdsClient::ChannelState::StateWatcher + : public AsyncConnectivityStateWatcherInterface { + public: + explicit StateWatcher(WeakRefCountedPtr parent) + : parent_(std::move(parent)) {} + + private: + void OnConnectivityStateChange(grpc_connectivity_state new_state, + const absl::Status& status) override { + { + MutexLock lock(&parent_->xds_client_->mu_); + if (!parent_->shutting_down_ && + new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // In TRANSIENT_FAILURE. Notify all watchers of error. + gpr_log(GPR_INFO, + "[xds_client %p] xds channel for server %s in " + "state TRANSIENT_FAILURE: %s", + parent_->xds_client(), parent_->server_.server_uri.c_str(), + status.ToString().c_str()); + parent_->xds_client_->NotifyOnErrorLocked( + absl::UnavailableError(absl::StrCat( + "xds channel in TRANSIENT_FAILURE, connectivity error: ", + status.ToString()))); + } + } + parent_->xds_client()->work_serializer_.DrainQueue(); + } + + WeakRefCountedPtr parent_; +}; + // // XdsClient::ChannelState // +namespace { + +grpc_channel* CreateXdsChannel(grpc_channel_args* args, + const XdsBootstrap::XdsServer& server) { + RefCountedPtr channel_creds = + CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds( + server.channel_creds_type, server.channel_creds_config); + return grpc_channel_create(server.server_uri.c_str(), channel_creds.get(), + args); +} + +} // namespace + XdsClient::ChannelState::ChannelState(WeakRefCountedPtr xds_client, const XdsBootstrap::XdsServer& server) : DualRefCounted( @@ -434,16 +537,9 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr xds_client, gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", xds_client_.get(), server.server_uri.c_str()); } - absl::Status status; - transport_ = xds_client_->transport_factory_->Create( - server, - [self = WeakRef(DEBUG_LOCATION, "OnConnectivityFailure")]( - absl::Status status) { - self->OnConnectivityStateChange(std::move(status)); - }, - &status); - GPR_ASSERT(transport_ != nullptr); - if (!status.ok()) OnConnectivityStateChange(std::move(status)); + channel_ = CreateXdsChannel(xds_client_->args_, server); + GPR_ASSERT(channel_ != nullptr); + StartConnectivityWatchLocked(); } XdsClient::ChannelState::~ChannelState() { @@ -451,6 +547,7 @@ XdsClient::ChannelState::~ChannelState() { gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s", xds_client(), this, server_.server_uri.c_str()); } + grpc_channel_destroy(channel_); xds_client_.reset(DEBUG_LOCATION, "ChannelState"); } @@ -460,7 +557,7 @@ XdsClient::ChannelState::~ChannelState() { // lock in this subclass. void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS { shutting_down_ = true; - transport_.reset(); + CancelConnectivityWatchLocked(); // At this time, all strong refs are removed, remove from channel map to // prevent subsequent subscription from trying to use this ChannelState as it // is shutting down. @@ -469,8 +566,6 @@ void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS { lrs_calld_.reset(); } -void XdsClient::ChannelState::ResetBackoff() { transport_->ResetBackoff(); } - XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld() const { return ads_calld_->calld(); @@ -496,6 +591,41 @@ void XdsClient::ChannelState::StopLrsCallLocked() { lrs_calld_.reset(); } +namespace { + +bool IsLameChannel(grpc_channel* channel) { + grpc_channel_element* elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + return elem->filter == &LameClientFilter::kFilter; +} + +} // namespace + +void XdsClient::ChannelState::StartConnectivityWatchLocked() { + if (IsLameChannel(channel_)) { + xds_client()->NotifyOnErrorLocked( + absl::UnavailableError("xds client has a lame channel")); + return; + } + ClientChannel* client_channel = + ClientChannel::GetFromChannel(Channel::FromC(channel_)); + GPR_ASSERT(client_channel != nullptr); + watcher_ = new StateWatcher(WeakRef(DEBUG_LOCATION, "ChannelState+watch")); + client_channel->AddConnectivityWatcher( + GRPC_CHANNEL_IDLE, + OrphanablePtr(watcher_)); +} + +void XdsClient::ChannelState::CancelConnectivityWatchLocked() { + if (IsLameChannel(channel_)) { + return; + } + ClientChannel* client_channel = + ClientChannel::GetFromChannel(Channel::FromC(channel_)); + GPR_ASSERT(client_channel != nullptr); + client_channel->RemoveConnectivityWatcher(watcher_); +} + void XdsClient::ChannelState::SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name) { if (ads_calld_ == nullptr) { @@ -528,24 +658,6 @@ void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type, } } -void XdsClient::ChannelState::OnConnectivityStateChange(absl::Status status) { - { - MutexLock lock(&xds_client_->mu_); - if (!shutting_down_) { - // Notify all watchers of error. - gpr_log(GPR_INFO, - "[xds_client %p] xds channel for server %s in " - "state TRANSIENT_FAILURE: %s", - xds_client(), server_.server_uri.c_str(), - status.ToString().c_str()); - xds_client_->NotifyOnErrorLocked(absl::UnavailableError( - absl::StrCat("xds channel in TRANSIENT_FAILURE, connectivity error: ", - status.ToString()))); - } - } - xds_client_->work_serializer_.DrainQueue(); -} - // // XdsClient::ChannelState::RetryableCall<> // @@ -587,7 +699,7 @@ void XdsClient::ChannelState::RetryableCall::OnCallFinishedLocked() { template void XdsClient::ChannelState::RetryableCall::StartNewCallLocked() { if (shutting_down_) return; - GPR_ASSERT(chand_->transport_ != nullptr); + GPR_ASSERT(chand_->channel_ != nullptr); GPR_ASSERT(calld_ == nullptr); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log( @@ -829,30 +941,51 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( ? "AdsCallState" : nullptr), parent_(std::move(parent)) { + // Init the ADS call. Note that the call will progress every time there's + // activity in xds_client()->interested_parties_, which is comprised of + // the polling entities from client_channel. GPR_ASSERT(xds_client() != nullptr); - // Init the ADS call. + // Create a call with the specified method name. const char* method = chand()->server_.ShouldUseV3() ? "/envoy.service.discovery.v3.AggregatedDiscoveryService/" "StreamAggregatedResources" : "/envoy.service.discovery.v2.AggregatedDiscoveryService/" "StreamAggregatedResources"; - call_ = chand()->transport_->CreateStreamingCall( - method, absl::make_unique( - // Passing the initial ref here. This ref will go away when - // the StreamEventHandler is destroyed. - RefCountedPtr(this))); + call_ = grpc_channel_create_pollset_set_call( + chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, + xds_client()->interested_parties_, + StaticSlice::FromStaticString(method).c_slice(), nullptr, + Timestamp::InfFuture(), nullptr); GPR_ASSERT(call_ != nullptr); + // Init data associated with the call. + grpc_metadata_array_init(&initial_metadata_recv_); + grpc_metadata_array_init(&trailing_metadata_recv_); // Start the call. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: starting ADS call " "(calld: %p, call: %p)", - xds_client(), chand()->server_.server_uri.c_str(), this, - call_.get()); - } - // If this is a reconnect, add any necessary subscriptions from what's - // already in the cache. + xds_client(), chand()->server_.server_uri.c_str(), this, call_); + } + // Create the ops. + grpc_call_error call_error; + grpc_op ops[3]; + memset(ops, 0, sizeof(ops)); + // Op: send initial metadata. + grpc_op* op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; + op->reserved = nullptr; + op++; + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast(op - ops), nullptr); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: send request message. + GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, + grpc_schedule_on_exec_ctx); for (const auto& a : xds_client()->authority_state_map_) { const std::string& authority = a.first; // Skip authorities that are not using this xDS channel. @@ -865,30 +998,81 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( } } } - // Send initial message if we added any subscriptions above. for (const auto& p : state_map_) { SendMessageLocked(p.first); } + // Op: recv initial metadata. + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = + &initial_metadata_recv_; + op->flags = 0; + op->reserved = nullptr; + op++; + // Op: recv response. + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &recv_message_payload_; + op->flags = 0; + op->reserved = nullptr; + op++; + Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release(); + GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, + grpc_schedule_on_exec_ctx); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast(op - ops), &on_response_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: recv server status. + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; + op->data.recv_status_on_client.status = &status_code_; + op->data.recv_status_on_client.status_details = &status_details_; + op->flags = 0; + op->reserved = nullptr; + op++; + // This callback signals the end of the call, so it relies on the initial + // ref instead of a new ref. When it's invoked, it's the initial ref that is + // unreffed. + GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, + grpc_schedule_on_exec_ctx); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast(op - ops), &on_status_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +XdsClient::ChannelState::AdsCallState::~AdsCallState() { + grpc_metadata_array_destroy(&initial_metadata_recv_); + grpc_metadata_array_destroy(&trailing_metadata_recv_); + grpc_byte_buffer_destroy(send_message_payload_); + grpc_byte_buffer_destroy(recv_message_payload_); + grpc_slice_unref_internal(status_details_); + GPR_ASSERT(call_ != nullptr); + grpc_call_unref(call_); } void XdsClient::ChannelState::AdsCallState::Orphan() { + GPR_ASSERT(call_ != nullptr); + // If we are here because xds_client wants to cancel the call, + // on_status_received_ will complete the cancellation and clean up. Otherwise, + // we are here because xds_client has to orphan a failed call, then the + // following cancellation will be a no-op. + grpc_call_cancel_internal(call_); state_map_.clear(); - // Note that the initial ref is held by the StreamEventHandler, which - // will be destroyed when call_ is destroyed, which may not happen - // here, since there may be other refs held to call_ by internal callbacks. - call_.reset(); + // Note that the initial ref is hold by on_status_received_. So the + // corresponding unref happens in on_status_received_ instead of here. } void XdsClient::ChannelState::AdsCallState::SendMessageLocked( const XdsResourceType* type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { // Buffer message sending if an existing message is in flight. - if (send_message_pending_) { + if (send_message_payload_ != nullptr) { buffered_requests_.insert(type); return; } auto& state = state_map_[type]; - std::string serialized_message = xds_client()->api_.CreateAdsRequest( + grpc_slice request_payload_slice; + request_payload_slice = xds_client()->api_.CreateAdsRequest( chand()->server_, chand()->server_.ShouldUseV3() ? type->type_url() : type->v2_type_url(), chand()->resource_type_version_map_[type], state.nonce, @@ -906,8 +1090,28 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( } GRPC_ERROR_UNREF(state.error); state.error = GRPC_ERROR_NONE; - call_->SendMessage(std::move(serialized_message)); - send_message_pending_ = true; + // Create message payload. + send_message_payload_ = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(request_payload_slice); + // Send the message. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = send_message_payload_; + Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release(); + GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, + grpc_schedule_on_exec_ctx); + grpc_call_error call_error = + grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_); + if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { + gpr_log(GPR_ERROR, + "[xds_client %p] xds server %s: error starting ADS send_message " + "batch on calld=%p: call_error=%d", + xds_client(), chand()->server_.server_uri.c_str(), this, + call_error); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } } void XdsClient::ChannelState::AdsCallState::SubscribeLocked( @@ -938,10 +1142,22 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { return false; } -void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) { - MutexLock lock(&xds_client()->mu_); - send_message_pending_ = false; - if (ok && IsCurrentCallOnChannel()) { +void XdsClient::ChannelState::AdsCallState::OnRequestSent( + void* arg, grpc_error_handle error) { + AdsCallState* ads_calld = static_cast(arg); + { + MutexLock lock(&ads_calld->xds_client()->mu_); + ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error)); + } + ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked"); +} + +void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked( + grpc_error_handle error) { + if (IsCurrentCallOnChannel() && GRPC_ERROR_IS_NONE(error)) { + // Clean up the sent message. + grpc_byte_buffer_destroy(send_message_payload_); + send_message_payload_ = nullptr; // Continue to send another pending message if any. // TODO(roth): The current code to handle buffered messages has the // advantage of sending only the most recent list of resource names for @@ -957,132 +1173,179 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) { buffered_requests_.erase(it); } } + GRPC_ERROR_UNREF(error); } -void XdsClient::ChannelState::AdsCallState::OnRecvMessage( - absl::string_view payload) { +void XdsClient::ChannelState::AdsCallState::OnResponseReceived( + void* arg, grpc_error_handle /* error */) { + AdsCallState* ads_calld = static_cast(arg); + bool done; { - MutexLock lock(&xds_client()->mu_); - if (!IsCurrentCallOnChannel()) return; - // Parse and validate the response. - AdsResponseParser parser(this); - absl::Status status = - xds_client()->api_.ParseAdsResponse(chand()->server_, payload, &parser); - if (!status.ok()) { - // Ignore unparsable response. - gpr_log(GPR_ERROR, - "[xds_client %p] xds server %s: error parsing ADS response (%s) " - "-- ignoring", - xds_client(), chand()->server_.server_uri.c_str(), - status.ToString().c_str()); - } else { - seen_response_ = true; - AdsResponseParser::Result result = parser.TakeResult(); - // Update nonce. - auto& state = state_map_[result.type]; - state.nonce = result.nonce; - // If we got an error, set state.error so that we'll NACK the update. - if (!result.errors.empty()) { - std::string error = absl::StrJoin(result.errors, "; "); - gpr_log( - GPR_ERROR, - "[xds_client %p] xds server %s: ADS response invalid for resource " - "type %s version %s, will NACK: nonce=%s error=%s", + MutexLock lock(&ads_calld->xds_client()->mu_); + done = ads_calld->OnResponseReceivedLocked(); + } + ads_calld->xds_client()->work_serializer_.DrainQueue(); + if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); +} + +bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { + // Empty payload means the call was cancelled. + if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) { + return true; + } + // Read the response. + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, recv_message_payload_); + grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_byte_buffer_reader_destroy(&bbr); + grpc_byte_buffer_destroy(recv_message_payload_); + recv_message_payload_ = nullptr; + // Parse and validate the response. + AdsResponseParser parser(this); + absl::Status status = xds_client()->api_.ParseAdsResponse( + chand()->server_, response_slice, &parser); + grpc_slice_unref_internal(response_slice); + if (!status.ok()) { + // Ignore unparsable response. + gpr_log(GPR_ERROR, + "[xds_client %p] xds server %s: error parsing ADS response (%s) " + "-- ignoring", xds_client(), chand()->server_.server_uri.c_str(), - result.type_url.c_str(), result.version.c_str(), - state.nonce.c_str(), error.c_str()); - GRPC_ERROR_UNREF(state.error); - state.error = grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_CPP_STRING(error), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - } - // Delete resources not seen in update if needed. - if (result.type->AllResourcesRequiredInSotW()) { - for (auto& a : xds_client()->authority_state_map_) { - const std::string& authority = a.first; - AuthorityState& authority_state = a.second; - // Skip authorities that are not using this xDS channel. - if (authority_state.channel_state != chand()) continue; - auto seen_authority_it = result.resources_seen.find(authority); - // Find this resource type. - auto type_it = authority_state.resource_map.find(result.type); - if (type_it == authority_state.resource_map.end()) continue; - // Iterate over resource ids. - for (auto& r : type_it->second) { - const XdsResourceKey& resource_key = r.first; - ResourceState& resource_state = r.second; - if (seen_authority_it == result.resources_seen.end() || - seen_authority_it->second.find(resource_key) == - seen_authority_it->second.end()) { - // If the resource was newly requested but has not yet been - // received, we don't want to generate an error for the watchers, - // because this ADS response may be in reaction to an earlier - // request that did not yet request the new resource, so its - // absence from the response does not necessarily indicate that - // the resource does not exist. For that case, we rely on the - // request timeout instead. - if (resource_state.resource == nullptr) continue; - if (chand()->server_.IgnoreResourceDeletion()) { - if (!resource_state.ignored_deletion) { - gpr_log(GPR_ERROR, - "[xds_client %p] xds server %s: ignoring deletion " - "for resource type %s name %s", - xds_client(), chand()->server_.server_uri.c_str(), - result.type_url.c_str(), - XdsClient::ConstructFullXdsResourceName( - authority, result.type_url.c_str(), resource_key) - .c_str()); - resource_state.ignored_deletion = true; - } - } else { - resource_state.resource.reset(); - xds_client()->NotifyWatchersOnResourceDoesNotExist( - resource_state.watchers); + status.ToString().c_str()); + } else { + seen_response_ = true; + AdsResponseParser::Result result = parser.TakeResult(); + // Update nonce. + auto& state = state_map_[result.type]; + state.nonce = result.nonce; + // If we got an error, set state.error so that we'll NACK the update. + if (!result.errors.empty()) { + std::string error = absl::StrJoin(result.errors, "; "); + gpr_log( + GPR_ERROR, + "[xds_client %p] xds server %s: ADS response invalid for resource " + "type %s version %s, will NACK: nonce=%s error=%s", + xds_client(), chand()->server_.server_uri.c_str(), + result.type_url.c_str(), result.version.c_str(), state.nonce.c_str(), + error.c_str()); + GRPC_ERROR_UNREF(state.error); + state.error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_CPP_STRING(error), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } + // Delete resources not seen in update if needed. + if (result.type->AllResourcesRequiredInSotW()) { + for (auto& a : xds_client()->authority_state_map_) { + const std::string& authority = a.first; + AuthorityState& authority_state = a.second; + // Skip authorities that are not using this xDS channel. + if (authority_state.channel_state != chand()) continue; + auto seen_authority_it = result.resources_seen.find(authority); + // Find this resource type. + auto type_it = authority_state.resource_map.find(result.type); + if (type_it == authority_state.resource_map.end()) continue; + // Iterate over resource ids. + for (auto& r : type_it->second) { + const XdsResourceKey& resource_key = r.first; + ResourceState& resource_state = r.second; + if (seen_authority_it == result.resources_seen.end() || + seen_authority_it->second.find(resource_key) == + seen_authority_it->second.end()) { + // If the resource was newly requested but has not yet been + // received, we don't want to generate an error for the watchers, + // because this ADS response may be in reaction to an earlier + // request that did not yet request the new resource, so its absence + // from the response does not necessarily indicate that the resource + // does not exist. For that case, we rely on the request timeout + // instead. + if (resource_state.resource == nullptr) continue; + if (chand()->server_.IgnoreResourceDeletion()) { + if (!resource_state.ignored_deletion) { + gpr_log(GPR_ERROR, + "[xds_client %p] xds server %s: ignoring deletion " + "for resource type %s name %s", + xds_client(), chand()->server_.server_uri.c_str(), + result.type_url.c_str(), + XdsClient::ConstructFullXdsResourceName( + authority, result.type_url.c_str(), resource_key) + .c_str()); + resource_state.ignored_deletion = true; } + } else { + resource_state.resource.reset(); + xds_client()->NotifyWatchersOnResourceDoesNotExist( + resource_state.watchers); } } } } - // If we had valid resources, update the version. - if (result.have_valid_resources) { - chand()->resource_type_version_map_[result.type] = - std::move(result.version); - // Start load reporting if needed. - auto& lrs_call = chand()->lrs_calld_; - if (lrs_call != nullptr) { - LrsCallState* lrs_calld = lrs_call->calld(); - if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); - } + } + // If we had valid resources, update the version. + if (result.have_valid_resources) { + chand()->resource_type_version_map_[result.type] = + std::move(result.version); + // Start load reporting if needed. + auto& lrs_call = chand()->lrs_calld_; + if (lrs_call != nullptr) { + LrsCallState* lrs_calld = lrs_call->calld(); + if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); } - // Send ACK or NACK. - SendMessageLocked(result.type); } - } - xds_client()->work_serializer_.DrainQueue(); + // Send ACK or NACK. + SendMessageLocked(result.type); + } + if (xds_client()->shutting_down_) return true; + // Keep listening for updates. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_MESSAGE; + op.data.recv_message.recv_message = &recv_message_payload_; + op.flags = 0; + op.reserved = nullptr; + GPR_ASSERT(call_ != nullptr); + // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor. + const grpc_call_error call_error = + grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + return false; } void XdsClient::ChannelState::AdsCallState::OnStatusReceived( - absl::Status status) { + void* arg, grpc_error_handle error) { + AdsCallState* ads_calld = static_cast(arg); { - MutexLock lock(&xds_client()->mu_); - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] xds server %s: ADS call status received " - "(chand=%p, ads_calld=%p, call=%p): %s", - xds_client(), chand()->server_.server_uri.c_str(), chand(), this, - call_.get(), status.ToString().c_str()); - } - // Ignore status from a stale call. - if (IsCurrentCallOnChannel()) { - // Try to restart the call. - parent_->OnCallFinishedLocked(); - // Send error to all watchers. - xds_client()->NotifyOnErrorLocked(absl::UnavailableError(absl::StrFormat( - "xDS call failed: xDS server: %s, ADS call status: %s", - chand()->server_.server_uri, status.ToString().c_str()))); - } + MutexLock lock(&ads_calld->xds_client()->mu_); + ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error)); } - xds_client()->work_serializer_.DrainQueue(); + ads_calld->xds_client()->work_serializer_.DrainQueue(); + ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked"); +} + +void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked( + grpc_error_handle error) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + char* status_details = grpc_slice_to_c_string(status_details_); + gpr_log(GPR_INFO, + "[xds_client %p] xds server %s: ADS call status received " + "(chand=%p, ads_calld=%p, call=%p): " + "status=%d, details='%s', error='%s'", + xds_client(), chand()->server_.server_uri.c_str(), chand(), this, + call_, status_code_, status_details, + grpc_error_std_string(error).c_str()); + gpr_free(status_details); + } + // Ignore status from a stale call. + if (IsCurrentCallOnChannel()) { + // Try to restart the call. + parent_->OnCallFinishedLocked(); + // Send error to all watchers. + xds_client()->NotifyOnErrorLocked(absl::UnavailableError(absl::StrFormat( + "xDS call failed: xDS server: %s, ADS call status code=%d, " + "details='%s', error='%s'", + chand()->server_.server_uri, status_code_, + StringViewFromSlice(status_details_), grpc_error_std_string(error)))); + } + GRPC_ERROR_UNREF(error); } bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const { @@ -1188,32 +1451,65 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { ScheduleNextReportLocked(); return false; } - // Send a request that contains the snapshot. - std::string serialized_payload = + // Create a request that contains the snapshot. + grpc_slice request_payload_slice = xds_client()->api_.CreateLrsRequest(std::move(snapshot)); - parent_->call_->SendMessage(std::move(serialized_payload)); - parent_->send_message_pending_ = true; + parent_->send_message_payload_ = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(request_payload_slice); + // Send the report. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = parent_->send_message_payload_; + grpc_call_error call_error = grpc_call_start_batch_and_execute( + parent_->call_, &op, 1, &on_report_done_); + if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { + gpr_log(GPR_ERROR, + "[xds_client %p] xds server %s: error starting LRS send_message " + "batch on calld=%p: call_error=%d", + xds_client(), parent_->chand()->server_.server_uri.c_str(), this, + call_error); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } return false; } -void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() { - // If a reporter starts a send_message op, then the reporting interval - // changes and we destroy that reporter and create a new one, and then - // the send_message op started by the old reporter finishes, this - // method will be called even though it was for a completion started - // by the old reporter. In that case, the timer will be pending, so - // we just ignore the completion and wait for the timer to fire. - if (next_report_timer_callback_pending_) return; +void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone( + void* arg, grpc_error_handle error) { + Reporter* self = static_cast(arg); + bool done; + { + MutexLock lock(&self->xds_client()->mu_); + done = self->OnReportDoneLocked(GRPC_ERROR_REF(error)); + } + if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done"); +} + +bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked( + grpc_error_handle error) { + grpc_byte_buffer_destroy(parent_->send_message_payload_); + parent_->send_message_payload_ = nullptr; // If there are no more registered stats to report, cancel the call. auto it = xds_client()->xds_load_report_server_map_.find(parent_->chand()->server_); if (it == xds_client()->xds_load_report_server_map_.end() || it->second.load_report_map.empty()) { it->second.channel_state->StopLrsCallLocked(); - return; + GRPC_ERROR_UNREF(error); + return true; + } + if (!GRPC_ERROR_IS_NONE(error) || !IsCurrentReporterOnCall()) { + GRPC_ERROR_UNREF(error); + // If this reporter is no longer the current one on the call, the reason + // might be that it was orphaned for a new one due to config update. + if (!IsCurrentReporterOnCall()) { + parent_->MaybeStartReportingLocked(); + } + return true; } - // Otherwise, schedule the next load report. ScheduleNextReportLocked(); + return false; } // @@ -1235,32 +1531,112 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( chand()->server_.ShouldUseV3() ? "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats" : "/envoy.service.load_stats.v2.LoadReportingService/StreamLoadStats"; - call_ = chand()->transport_->CreateStreamingCall( - method, absl::make_unique( - // Passing the initial ref here. This ref will go away when - // the StreamEventHandler is destroyed. - RefCountedPtr(this))); + call_ = grpc_channel_create_pollset_set_call( + chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, + xds_client()->interested_parties_, + Slice::FromStaticString(method).c_slice(), nullptr, + Timestamp::InfFuture(), nullptr); GPR_ASSERT(call_ != nullptr); + // Init the request payload. + grpc_slice request_payload_slice = + xds_client()->api_.CreateLrsInitialRequest(chand()->server_); + send_message_payload_ = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(request_payload_slice); + // Init other data associated with the LRS call. + grpc_metadata_array_init(&initial_metadata_recv_); + grpc_metadata_array_init(&trailing_metadata_recv_); // Start the call. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log( GPR_INFO, "[xds_client %p] xds server %s: starting LRS call (calld=%p, call=%p)", - xds_client(), chand()->server_.server_uri.c_str(), this, call_.get()); - } - // Send the initial request. - std::string serialized_payload = - xds_client()->api_.CreateLrsInitialRequest(chand()->server_); - call_->SendMessage(std::move(serialized_payload)); - send_message_pending_ = true; + xds_client(), chand()->server_.server_uri.c_str(), this, call_); + } + // Create the ops. + grpc_call_error call_error; + grpc_op ops[3]; + memset(ops, 0, sizeof(ops)); + // Op: send initial metadata. + grpc_op* op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; + op->reserved = nullptr; + op++; + // Op: send request message. + GPR_ASSERT(send_message_payload_ != nullptr); + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = send_message_payload_; + op->flags = 0; + op->reserved = nullptr; + op++; + Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release(); + GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this, + grpc_schedule_on_exec_ctx); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast(op - ops), &on_initial_request_sent_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: recv initial metadata. + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = + &initial_metadata_recv_; + op->flags = 0; + op->reserved = nullptr; + op++; + // Op: recv response. + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &recv_message_payload_; + op->flags = 0; + op->reserved = nullptr; + op++; + Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release(); + GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, + grpc_schedule_on_exec_ctx); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast(op - ops), &on_response_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: recv server status. + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; + op->data.recv_status_on_client.status = &status_code_; + op->data.recv_status_on_client.status_details = &status_details_; + op->flags = 0; + op->reserved = nullptr; + op++; + // This callback signals the end of the call, so it relies on the initial + // ref instead of a new ref. When it's invoked, it's the initial ref that is + // unreffed. + GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, + grpc_schedule_on_exec_ctx); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast(op - ops), &on_status_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +XdsClient::ChannelState::LrsCallState::~LrsCallState() { + grpc_metadata_array_destroy(&initial_metadata_recv_); + grpc_metadata_array_destroy(&trailing_metadata_recv_); + grpc_byte_buffer_destroy(send_message_payload_); + grpc_byte_buffer_destroy(recv_message_payload_); + grpc_slice_unref_internal(status_details_); + GPR_ASSERT(call_ != nullptr); + grpc_call_unref(call_); } void XdsClient::ChannelState::LrsCallState::Orphan() { reporter_.reset(); - // Note that the initial ref is held by the StreamEventHandler, which - // will be destroyed when call_ is destroyed, which may not happen - // here, since there may be other refs held to call_ by internal callbacks. - call_.reset(); + GPR_ASSERT(call_ != nullptr); + // If we are here because xds_client wants to cancel the call, + // on_status_received_ will complete the cancellation and clean up. Otherwise, + // we are here because xds_client has to orphan a failed call, then the + // following cancellation will be a no-op. + grpc_call_cancel_internal(call_); + // Note that the initial ref is hold by on_status_received_. So the + // corresponding unref happens in on_status_received_ instead of here. } void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { @@ -1268,7 +1644,7 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { if (reporter_ != nullptr) return; // Don't start if the previous send_message op (of the initial request or the // last report of the previous reporter) hasn't completed. - if (call_ != nullptr && send_message_pending_) return; + if (send_message_payload_ != nullptr) return; // Don't start if no LRS response has arrived. if (!seen_response()) return; // Don't start if the ADS call hasn't received any valid response. Note that @@ -1284,101 +1660,161 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_); } -void XdsClient::ChannelState::LrsCallState::OnRequestSent(bool /*ok*/) { - MutexLock lock(&xds_client()->mu_); - send_message_pending_ = false; - if (reporter_ != nullptr) { - reporter_->OnReportDoneLocked(); - } else { - MaybeStartReportingLocked(); +void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent( + void* arg, grpc_error_handle /*error*/) { + LrsCallState* lrs_calld = static_cast(arg); + { + MutexLock lock(&lrs_calld->xds_client()->mu_); + lrs_calld->OnInitialRequestSentLocked(); } + lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked"); } -void XdsClient::ChannelState::LrsCallState::OnRecvMessage( - absl::string_view payload) { - MutexLock lock(&xds_client()->mu_); - // If we're no longer the current call, ignore the result. - if (!IsCurrentCallOnChannel()) return; - // Parse the response. - bool send_all_clusters = false; - std::set new_cluster_names; - Duration new_load_reporting_interval; - grpc_error_handle parse_error = xds_client()->api_.ParseLrsResponse( - payload, &send_all_clusters, &new_cluster_names, - &new_load_reporting_interval); - if (!GRPC_ERROR_IS_NONE(parse_error)) { - gpr_log(GPR_ERROR, - "[xds_client %p] xds server %s: LRS response parsing failed: %s", - xds_client(), chand()->server_.server_uri.c_str(), - grpc_error_std_string(parse_error).c_str()); - GRPC_ERROR_UNREF(parse_error); - return; +void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() { + // Clear the send_message_payload_. + grpc_byte_buffer_destroy(send_message_payload_); + send_message_payload_ = nullptr; + MaybeStartReportingLocked(); +} + +void XdsClient::ChannelState::LrsCallState::OnResponseReceived( + void* arg, grpc_error_handle /*error*/) { + LrsCallState* lrs_calld = static_cast(arg); + bool done; + { + MutexLock lock(&lrs_calld->xds_client()->mu_); + done = lrs_calld->OnResponseReceivedLocked(); } - seen_response_ = true; - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log( - GPR_INFO, - "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR - " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64 - "ms", - xds_client(), chand()->server_.server_uri.c_str(), - new_cluster_names.size(), send_all_clusters, - new_load_reporting_interval.millis()); - size_t i = 0; - for (const auto& name : new_cluster_names) { - gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s", - xds_client(), i++, name.c_str()); - } + if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked"); +} + +bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { + // Empty payload means the call was cancelled. + if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) { + return true; } - if (new_load_reporting_interval < - Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) { - new_load_reporting_interval = - Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] xds server %s: increased load_report_interval " - "to minimum value %dms", + // Read the response. + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, recv_message_payload_); + grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_byte_buffer_reader_destroy(&bbr); + grpc_byte_buffer_destroy(recv_message_payload_); + recv_message_payload_ = nullptr; + // This anonymous lambda is a hack to avoid the usage of goto. + [&]() { + // Parse the response. + bool send_all_clusters = false; + std::set new_cluster_names; + Duration new_load_reporting_interval; + grpc_error_handle parse_error = xds_client()->api_.ParseLrsResponse( + response_slice, &send_all_clusters, &new_cluster_names, + &new_load_reporting_interval); + if (!GRPC_ERROR_IS_NONE(parse_error)) { + gpr_log(GPR_ERROR, + "[xds_client %p] xds server %s: LRS response parsing failed: %s", xds_client(), chand()->server_.server_uri.c_str(), - GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); + grpc_error_std_string(parse_error).c_str()); + GRPC_ERROR_UNREF(parse_error); + return; } - } - // Ignore identical update. - if (send_all_clusters == send_all_clusters_ && - cluster_names_ == new_cluster_names && - load_reporting_interval_ == new_load_reporting_interval) { + seen_response_ = true; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] xds server %s: incoming LRS response identical " - "to current, ignoring.", - xds_client(), chand()->server_.server_uri.c_str()); + gpr_log( + GPR_INFO, + "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR + " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64 + "ms", + xds_client(), chand()->server_.server_uri.c_str(), + new_cluster_names.size(), send_all_clusters, + new_load_reporting_interval.millis()); + size_t i = 0; + for (const auto& name : new_cluster_names) { + gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s", + xds_client(), i++, name.c_str()); + } } - return; - } - // Stop current load reporting (if any) to adopt the new config. - reporter_.reset(); - // Record the new config. - send_all_clusters_ = send_all_clusters; - cluster_names_ = std::move(new_cluster_names); - load_reporting_interval_ = new_load_reporting_interval; - // Try starting sending load report. - MaybeStartReportingLocked(); + if (new_load_reporting_interval < + Duration::Milliseconds( + GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) { + new_load_reporting_interval = Duration::Milliseconds( + GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] xds server %s: increased load_report_interval " + "to minimum value %dms", + xds_client(), chand()->server_.server_uri.c_str(), + GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); + } + } + // Ignore identical update. + if (send_all_clusters == send_all_clusters_ && + cluster_names_ == new_cluster_names && + load_reporting_interval_ == new_load_reporting_interval) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log( + GPR_INFO, + "[xds_client %p] xds server %s: incoming LRS response identical " + "to current, ignoring.", + xds_client(), chand()->server_.server_uri.c_str()); + } + return; + } + // Stop current load reporting (if any) to adopt the new config. + reporter_.reset(); + // Record the new config. + send_all_clusters_ = send_all_clusters; + cluster_names_ = std::move(new_cluster_names); + load_reporting_interval_ = new_load_reporting_interval; + // Try starting sending load report. + MaybeStartReportingLocked(); + }(); + grpc_slice_unref_internal(response_slice); + if (xds_client()->shutting_down_) return true; + // Keep listening for LRS config updates. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_MESSAGE; + op.data.recv_message.recv_message = &recv_message_payload_; + op.flags = 0; + op.reserved = nullptr; + GPR_ASSERT(call_ != nullptr); + // Reuse the "OnResponseReceivedLocked" ref taken in ctor. + const grpc_call_error call_error = + grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + return false; } void XdsClient::ChannelState::LrsCallState::OnStatusReceived( - absl::Status status) { - MutexLock lock(&xds_client()->mu_); + void* arg, grpc_error_handle error) { + LrsCallState* lrs_calld = static_cast(arg); + { + MutexLock lock(&lrs_calld->xds_client()->mu_); + lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error)); + } + lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked"); +} + +void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked( + grpc_error_handle error) { + GPR_ASSERT(call_ != nullptr); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + char* status_details = grpc_slice_to_c_string(status_details_); gpr_log(GPR_INFO, "[xds_client %p] xds server %s: LRS call status received " - "(chand=%p, calld=%p, call=%p): %s", + "(chand=%p, calld=%p, call=%p): " + "status=%d, details='%s', error='%s'", xds_client(), chand()->server_.server_uri.c_str(), chand(), this, - call_.get(), status.ToString().c_str()); + call_, status_code_, status_details, + grpc_error_std_string(error).c_str()); + gpr_free(status_details); } // Ignore status from a stale call. if (IsCurrentCallOnChannel()) { // Try to restart the call. parent_->OnCallFinishedLocked(); } + GRPC_ERROR_UNREF(error); } bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const { @@ -1392,16 +1828,36 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const { // XdsClient // +namespace { + +Duration GetRequestTimeout(const grpc_channel_args* args) { + return Duration::Milliseconds(grpc_channel_args_find_integer( + args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS, + {15000, 0, INT_MAX})); +} + +grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) { + absl::InlinedVector args_to_add = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), + 5 * 60 * GPR_MS_PER_SEC), + }; + return grpc_channel_args_copy_and_add(args, args_to_add.data(), + args_to_add.size()); +} + +} // namespace + XdsClient::XdsClient(std::unique_ptr bootstrap, - OrphanablePtr transport_factory, - Duration resource_request_timeout) + const grpc_channel_args* args) : DualRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient" : nullptr), bootstrap_(std::move(bootstrap)), - transport_factory_(std::move(transport_factory)), - request_timeout_(resource_request_timeout), + args_(ModifyChannelArgs(args)), + request_timeout_(GetRequestTimeout(args)), xds_federation_enabled_(XdsFederationEnabled()), + interested_parties_(grpc_pollset_set_create()), certificate_provider_store_(MakeOrphanable( bootstrap_->certificate_providers())), api_(this, &grpc_xds_client_trace, bootstrap_->node(), @@ -1409,23 +1865,37 @@ XdsClient::XdsClient(std::unique_ptr bootstrap, if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); } + // Calling grpc_init to ensure gRPC does not shut down until the XdsClient is + // destroyed. + grpc_init(); } XdsClient::~XdsClient() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this); } + grpc_channel_args_destroy(args_); + grpc_pollset_set_destroy(interested_parties_); + // Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient + // is destroyed. + grpc_shutdown(); } void XdsClient::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this); } - MutexLock lock(&mu_); - shutting_down_ = true; - // Clear cache and any remaining watchers that may not have been cancelled. - authority_state_map_.clear(); - invalid_watchers_.clear(); + { + MutexLock lock(g_mu); + if (g_xds_client == this) g_xds_client = nullptr; + } + { + MutexLock lock(&mu_); + shutting_down_ = true; + // Clear cache and any remaining watchers that may not have been cancelled. + authority_state_map_.clear(); + invalid_watchers_.clear(); + } } RefCountedPtr XdsClient::GetOrCreateChannelStateLocked( @@ -1755,7 +2225,7 @@ void XdsClient::RemoveClusterLocalityStats( void XdsClient::ResetBackoff() { MutexLock lock(&mu_); for (auto& p : xds_server_channel_map_) { - p.second->ResetBackoff(); + grpc_channel_reset_connect_backoff(p.second->channel()); } } @@ -1924,4 +2394,192 @@ std::string XdsClient::DumpClientConfigBinary() { return api_.AssembleClientConfig(resource_type_metadata_map); } +// +// accessors for global state +// + +void XdsClientGlobalInit() { + g_mu = new Mutex; + XdsHttpFilterRegistry::Init(); + XdsClusterSpecifierPluginRegistry::Init(); +} + +// TODO(roth): Find a better way to clear the fallback config that does +// not require using ABSL_NO_THREAD_SAFETY_ANALYSIS. +void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS { + gpr_free(g_fallback_bootstrap_config); + g_fallback_bootstrap_config = nullptr; + delete g_mu; + g_mu = nullptr; + XdsHttpFilterRegistry::Shutdown(); + XdsClusterSpecifierPluginRegistry::Shutdown(); +} + +namespace { + +std::string GetBootstrapContents(const char* fallback_config, + grpc_error_handle* error) { + // First, try GRPC_XDS_BOOTSTRAP env var. + UniquePtr path(gpr_getenv("GRPC_XDS_BOOTSTRAP")); + if (path != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "Got bootstrap file location from GRPC_XDS_BOOTSTRAP " + "environment variable: %s", + path.get()); + } + grpc_slice contents; + *error = + grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents); + if (!GRPC_ERROR_IS_NONE(*error)) return ""; + std::string contents_str(StringViewFromSlice(contents)); + grpc_slice_unref_internal(contents); + return contents_str; + } + // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var. + UniquePtr env_config(gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG")); + if (env_config != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG " + "environment variable"); + } + return env_config.get(); + } + // Finally, try fallback config. + if (fallback_config != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "Got bootstrap contents from fallback config"); + } + return fallback_config; + } + // No bootstrap config found. + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG " + "not defined"); + return ""; +} + +} // namespace + +RefCountedPtr XdsClient::GetOrCreate(const grpc_channel_args* args, + grpc_error_handle* error) { + RefCountedPtr xds_client; + // If getting bootstrap from channel args, create a local XdsClient + // instance for the channel or server instead of using the global instance. + const char* bootstrap_config = grpc_channel_args_find_string( + args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG); + if (bootstrap_config != nullptr) { + std::unique_ptr bootstrap = + XdsBootstrap::Create(bootstrap_config, error); + if (GRPC_ERROR_IS_NONE(*error)) { + grpc_channel_args* xds_channel_args = + grpc_channel_args_find_pointer( + args, + GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS); + return MakeRefCounted(std::move(bootstrap), xds_channel_args); + } + return nullptr; + } + // Otherwise, use the global instance. + { + MutexLock lock(g_mu); + if (g_xds_client != nullptr) { + auto xds_client = g_xds_client->RefIfNonZero(); + if (xds_client != nullptr) return xds_client; + } + // Find bootstrap contents. + std::string bootstrap_contents = + GetBootstrapContents(g_fallback_bootstrap_config, error); + if (!GRPC_ERROR_IS_NONE(*error)) return nullptr; + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "xDS bootstrap contents: %s", + bootstrap_contents.c_str()); + } + // Parse bootstrap. + std::unique_ptr bootstrap = + XdsBootstrap::Create(bootstrap_contents, error); + if (!GRPC_ERROR_IS_NONE(*error)) return nullptr; + // Instantiate XdsClient. + xds_client = + MakeRefCounted(std::move(bootstrap), g_channel_args); + g_xds_client = xds_client.get(); + } + return xds_client; +} + +namespace internal { + +void SetXdsChannelArgsForTest(grpc_channel_args* args) { + MutexLock lock(g_mu); + g_channel_args = args; +} + +void UnsetGlobalXdsClientForTest() { + MutexLock lock(g_mu); + g_xds_client = nullptr; +} + +void SetXdsFallbackBootstrapConfig(const char* config) { + MutexLock lock(g_mu); + gpr_free(g_fallback_bootstrap_config); + g_fallback_bootstrap_config = gpr_strdup(config); +} + +} // namespace internal + +// +// embedding XdsClient in channel args +// + +#define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client" + +namespace { + +void* XdsClientArgCopy(void* p) { + XdsClient* xds_client = static_cast(p); + xds_client->Ref(DEBUG_LOCATION, "channel arg").release(); + return p; +} + +void XdsClientArgDestroy(void* p) { + XdsClient* xds_client = static_cast(p); + xds_client->Unref(DEBUG_LOCATION, "channel arg"); +} + +int XdsClientArgCmp(void* p, void* q) { return QsortCompare(p, q); } + +const grpc_arg_pointer_vtable kXdsClientArgVtable = { + XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp}; + +} // namespace + +grpc_arg XdsClient::MakeChannelArg() const { + return grpc_channel_arg_pointer_create(const_cast(GRPC_ARG_XDS_CLIENT), + const_cast(this), + &kXdsClientArgVtable); +} + +RefCountedPtr XdsClient::GetFromChannelArgs( + const grpc_channel_args& args) { + XdsClient* xds_client = + grpc_channel_args_find_pointer(&args, GRPC_ARG_XDS_CLIENT); + if (xds_client == nullptr) return nullptr; + return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs"); +} + } // namespace grpc_core + +// The returned bytes may contain NULL(0), so we can't use c-string. +grpc_slice grpc_dump_xds_configs(void) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + grpc_error_handle error = GRPC_ERROR_NONE; + auto xds_client = grpc_core::XdsClient::GetOrCreate(nullptr, &error); + if (!GRPC_ERROR_IS_NONE(error)) { + // If we isn't using xDS, just return an empty string. + GRPC_ERROR_UNREF(error); + return grpc_empty_slice(); + } + return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary()); +} diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 54da4174172..a7b6457cc8e 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -32,12 +32,13 @@ #include "absl/strings/string_view.h" #include "upb/def.hpp" +#include + #include "src/core/ext/xds/certificate_provider_store.h" #include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_resource_type.h" -#include "src/core/ext/xds/xds_transport.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/orphanable.h" @@ -45,7 +46,9 @@ #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/uri/uri_parser.h" @@ -71,9 +74,15 @@ class XdsClient : public DualRefCounted { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; }; + // Factory function to get or create the global XdsClient instance. + // If *error is not GRPC_ERROR_NONE upon return, then there was + // an error initializing the client. + static RefCountedPtr GetOrCreate(const grpc_channel_args* args, + grpc_error_handle* error); + + // Most callers should not instantiate directly. Use GetOrCreate() instead. XdsClient(std::unique_ptr bootstrap, - OrphanablePtr transport_factory, - Duration resource_request_timeout = Duration::Seconds(15)); + const grpc_channel_args* args); ~XdsClient() override; const XdsBootstrap& bootstrap() const { @@ -82,14 +91,12 @@ class XdsClient : public DualRefCounted { return *bootstrap_; } - XdsTransportFactory* transport_factory() const { - return transport_factory_.get(); - } - CertificateProviderStore& certificate_provider_store() { return *certificate_provider_store_; } + grpc_pollset_set* interested_parties() const { return interested_parties_; } + void Orphan() override; // Start and cancel watch for a resource. @@ -151,6 +158,11 @@ class XdsClient : public DualRefCounted { // implementation. std::string DumpClientConfigBinary(); + // Helpers for encoding the XdsClient object in channel args. + grpc_arg MakeChannelArg() const; + static RefCountedPtr GetFromChannelArgs( + const grpc_channel_args& args); + private: struct XdsResourceKey { std::string id; @@ -184,18 +196,21 @@ class XdsClient : public DualRefCounted { void Orphan() override; + grpc_channel* channel() const { return channel_; } XdsClient* xds_client() const { return xds_client_.get(); } AdsCallState* ads_calld() const; LrsCallState* lrs_calld() const; - void ResetBackoff(); - void MaybeStartLrsCall(); void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool HasAdsCall() const; bool HasActiveAdsCall() const; + void StartConnectivityWatchLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + void CancelConnectivityWatchLocked(); + void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); @@ -205,16 +220,17 @@ class XdsClient : public DualRefCounted { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); private: - void OnConnectivityStateChange(absl::Status status); + class StateWatcher; // The owning xds client. WeakRefCountedPtr xds_client_; const XdsBootstrap::XdsServer& server_; - OrphanablePtr transport_; - + // The channel and its status. + grpc_channel* channel_; bool shutting_down_ = false; + StateWatcher* watcher_; // The retryable XDS calls. OrphanablePtr> ads_calld_; @@ -298,9 +314,10 @@ class XdsClient : public DualRefCounted { const XdsBootstrap::XdsServer& server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); std::unique_ptr bootstrap_; - OrphanablePtr transport_factory_; + grpc_channel_args* args_; const Duration request_timeout_; const bool xds_federation_enabled_; + grpc_pollset_set* interested_parties_; OrphanablePtr certificate_provider_store_; XdsApi api_; WorkSerializer work_serializer_; @@ -332,6 +349,14 @@ class XdsClient : public DualRefCounted { bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; }; +namespace internal { +void SetXdsChannelArgsForTest(grpc_channel_args* args); +void UnsetGlobalXdsClientForTest(); +// Sets bootstrap config to be used when no env var is set. +// Does not take ownership of config. +void SetXdsFallbackBootstrapConfig(const char* config); +} // namespace internal + } // namespace grpc_core #endif // GRPC_CORE_EXT_XDS_XDS_CLIENT_H diff --git a/src/core/ext/xds/xds_client_grpc.cc b/src/core/ext/xds/xds_client_grpc.cc deleted file mode 100644 index 128ba171ebb..00000000000 --- a/src/core/ext/xds/xds_client_grpc.cc +++ /dev/null @@ -1,275 +0,0 @@ -// -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#include - -#include "src/core/ext/xds/xds_client_grpc.h" - -#include - -#include -#include -#include - -#include "absl/base/thread_annotations.h" -#include "absl/strings/string_view.h" - -#include -#include -#include -#include -#include - -#include "src/core/ext/xds/xds_bootstrap.h" -#include "src/core/ext/xds/xds_channel_args.h" -#include "src/core/ext/xds/xds_cluster_specifier_plugin.h" -#include "src/core/ext/xds/xds_http_filters.h" -#include "src/core/ext/xds/xds_transport.h" -#include "src/core/ext/xds/xds_transport_grpc.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr/env.h" -#include "src/core/lib/gpr/useful.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/gprpp/time.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/load_file.h" -#include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/slice/slice_refcount.h" - -namespace grpc_core { - -namespace { - -Mutex* g_mu = nullptr; -const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr; -XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; -char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; - -} // namespace - -void XdsClientGlobalInit() { - g_mu = new Mutex; - XdsHttpFilterRegistry::Init(); - XdsClusterSpecifierPluginRegistry::Init(); -} - -// TODO(roth): Find a better way to clear the fallback config that does -// not require using ABSL_NO_THREAD_SAFETY_ANALYSIS. -void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS { - gpr_free(g_fallback_bootstrap_config); - g_fallback_bootstrap_config = nullptr; - delete g_mu; - g_mu = nullptr; - XdsHttpFilterRegistry::Shutdown(); - XdsClusterSpecifierPluginRegistry::Shutdown(); -} - -namespace { - -std::string GetBootstrapContents(const char* fallback_config, - grpc_error_handle* error) { - // First, try GRPC_XDS_BOOTSTRAP env var. - UniquePtr path(gpr_getenv("GRPC_XDS_BOOTSTRAP")); - if (path != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "Got bootstrap file location from GRPC_XDS_BOOTSTRAP " - "environment variable: %s", - path.get()); - } - grpc_slice contents; - *error = - grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents); - if (!GRPC_ERROR_IS_NONE(*error)) return ""; - std::string contents_str(StringViewFromSlice(contents)); - grpc_slice_unref_internal(contents); - return contents_str; - } - // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var. - UniquePtr env_config(gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG")); - if (env_config != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG " - "environment variable"); - } - return env_config.get(); - } - // Finally, try fallback config. - if (fallback_config != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "Got bootstrap contents from fallback config"); - } - return fallback_config; - } - // No bootstrap config found. - *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG " - "not defined"); - return ""; -} - -} // namespace - -RefCountedPtr GrpcXdsClient::GetOrCreate( - const grpc_channel_args* args, grpc_error_handle* error) { - RefCountedPtr xds_client; - // If getting bootstrap from channel args, create a local XdsClient - // instance for the channel or server instead of using the global instance. - const char* bootstrap_config = grpc_channel_args_find_string( - args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG); - if (bootstrap_config != nullptr) { - std::unique_ptr bootstrap = - XdsBootstrap::Create(bootstrap_config, error); - if (GRPC_ERROR_IS_NONE(*error)) { - grpc_channel_args* xds_channel_args = - grpc_channel_args_find_pointer( - args, - GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS); - return MakeRefCounted(std::move(bootstrap), - xds_channel_args); - } - return nullptr; - } - // Otherwise, use the global instance. - { - MutexLock lock(g_mu); - if (g_xds_client != nullptr) { - auto xds_client = g_xds_client->RefIfNonZero(); - if (xds_client != nullptr) return xds_client; - } - // Find bootstrap contents. - std::string bootstrap_contents = - GetBootstrapContents(g_fallback_bootstrap_config, error); - if (!GRPC_ERROR_IS_NONE(*error)) return nullptr; - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "xDS bootstrap contents: %s", - bootstrap_contents.c_str()); - } - // Parse bootstrap. - std::unique_ptr bootstrap = - XdsBootstrap::Create(bootstrap_contents, error); - if (!GRPC_ERROR_IS_NONE(*error)) return nullptr; - // Instantiate XdsClient. - xds_client = - MakeRefCounted(std::move(bootstrap), g_channel_args); - g_xds_client = xds_client.get(); - } - return xds_client; -} - -namespace { - -Duration GetResourceDurationFromArgs(const grpc_channel_args* args) { - return Duration::Milliseconds(grpc_channel_args_find_integer( - args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS, - {15000, 0, INT_MAX})); -} - -} // namespace - -GrpcXdsClient::GrpcXdsClient(std::unique_ptr bootstrap, - const grpc_channel_args* args) - : XdsClient(std::move(bootstrap), - MakeOrphanable(args), - GetResourceDurationFromArgs(args)) {} - -GrpcXdsClient::~GrpcXdsClient() { - MutexLock lock(g_mu); - if (g_xds_client == this) g_xds_client = nullptr; -} - -grpc_pollset_set* GrpcXdsClient::interested_parties() const { - return reinterpret_cast(transport_factory()) - ->interested_parties(); -} - -#define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client" - -namespace { - -void* XdsClientArgCopy(void* p) { - XdsClient* xds_client = static_cast(p); - xds_client->Ref(DEBUG_LOCATION, "channel arg").release(); - return p; -} - -void XdsClientArgDestroy(void* p) { - XdsClient* xds_client = static_cast(p); - xds_client->Unref(DEBUG_LOCATION, "channel arg"); -} - -int XdsClientArgCmp(void* p, void* q) { return QsortCompare(p, q); } - -const grpc_arg_pointer_vtable kXdsClientArgVtable = { - XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp}; - -} // namespace - -grpc_arg GrpcXdsClient::MakeChannelArg() const { - return grpc_channel_arg_pointer_create(const_cast(GRPC_ARG_XDS_CLIENT), - const_cast(this), - &kXdsClientArgVtable); -} - -RefCountedPtr GrpcXdsClient::GetFromChannelArgs( - const grpc_channel_args& args) { - XdsClient* xds_client = - grpc_channel_args_find_pointer(&args, GRPC_ARG_XDS_CLIENT); - if (xds_client == nullptr) return nullptr; - return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs"); -} - -namespace internal { - -void SetXdsChannelArgsForTest(grpc_channel_args* args) { - MutexLock lock(g_mu); - g_channel_args = args; -} - -void UnsetGlobalXdsClientForTest() { - MutexLock lock(g_mu); - g_xds_client = nullptr; -} - -void SetXdsFallbackBootstrapConfig(const char* config) { - MutexLock lock(g_mu); - gpr_free(g_fallback_bootstrap_config); - g_fallback_bootstrap_config = gpr_strdup(config); -} - -} // namespace internal - -} // namespace grpc_core - -// The returned bytes may contain NULL(0), so we can't use c-string. -grpc_slice grpc_dump_xds_configs(void) { - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - grpc_error_handle error = GRPC_ERROR_NONE; - auto xds_client = grpc_core::GrpcXdsClient::GetOrCreate(nullptr, &error); - if (!GRPC_ERROR_IS_NONE(error)) { - // If we aren't using xDS, just return an empty string. - GRPC_ERROR_UNREF(error); - return grpc_empty_slice(); - } - return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary()); -} diff --git a/src/core/ext/xds/xds_client_grpc.h b/src/core/ext/xds/xds_client_grpc.h deleted file mode 100644 index d45b6ea1457..00000000000 --- a/src/core/ext/xds/xds_client_grpc.h +++ /dev/null @@ -1,65 +0,0 @@ -// -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_GRPC_H -#define GRPC_CORE_EXT_XDS_XDS_CLIENT_GRPC_H - -#include - -#include - -#include - -#include "src/core/ext/xds/xds_bootstrap.h" -#include "src/core/ext/xds/xds_client.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/iomgr_fwd.h" - -namespace grpc_core { - -class GrpcXdsClient : public XdsClient { - public: - // Factory function to get or create the global XdsClient instance. - // If *error is not GRPC_ERROR_NONE upon return, then there was - // an error initializing the client. - static RefCountedPtr GetOrCreate(const grpc_channel_args* args, - grpc_error_handle* error); - - // Do not instantiate directly -- use GetOrCreate() instead. - GrpcXdsClient(std::unique_ptr bootstrap, - const grpc_channel_args* args); - ~GrpcXdsClient() override; - - grpc_pollset_set* interested_parties() const; - - // Helpers for encoding the XdsClient object in channel args. - grpc_arg MakeChannelArg() const; - static RefCountedPtr GetFromChannelArgs( - const grpc_channel_args& args); -}; - -namespace internal { -void SetXdsChannelArgsForTest(grpc_channel_args* args); -void UnsetGlobalXdsClientForTest(); -// Sets bootstrap config to be used when no env var is set. -// Does not take ownership of config. -void SetXdsFallbackBootstrapConfig(const char* config); -} // namespace internal - -} // namespace grpc_core - -#endif // GRPC_CORE_EXT_XDS_XDS_CLIENT_GRPC_H diff --git a/src/core/ext/xds/xds_server_config_fetcher.cc b/src/core/ext/xds/xds_server_config_fetcher.cc index 4f894d64c67..9c8f0d7d264 100644 --- a/src/core/ext/xds/xds_server_config_fetcher.cc +++ b/src/core/ext/xds/xds_server_config_fetcher.cc @@ -56,7 +56,6 @@ #include "src/core/ext/xds/xds_certificate_provider.h" #include "src/core/ext/xds/xds_channel_stack_modifier.h" #include "src/core/ext/xds/xds_client.h" -#include "src/core/ext/xds/xds_client_grpc.h" #include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_listener.h" @@ -115,7 +114,7 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { // Return the interested parties from the xds client so that it can be polled. grpc_pollset_set* interested_parties() override { - return static_cast(xds_client_.get())->interested_parties(); + return xds_client_->interested_parties(); } private: @@ -1344,7 +1343,7 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create( 3, (notifier.on_serving_status_update, notifier.user_data, args)); grpc_error_handle error = GRPC_ERROR_NONE; grpc_core::RefCountedPtr xds_client = - grpc_core::GrpcXdsClient::GetOrCreate(args, &error); + grpc_core::XdsClient::GetOrCreate(args, &error); grpc_channel_args_destroy(args); if (!GRPC_ERROR_IS_NONE(error)) { gpr_log(GPR_ERROR, "Failed to create xds client: %s", diff --git a/src/core/ext/xds/xds_transport.h b/src/core/ext/xds/xds_transport.h deleted file mode 100644 index c36f9142cbb..00000000000 --- a/src/core/ext/xds/xds_transport.h +++ /dev/null @@ -1,86 +0,0 @@ -// -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_H -#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_H - -#include - -#include -#include -#include - -#include "absl/status/status.h" -#include "absl/strings/string_view.h" - -#include "src/core/ext/xds/xds_bootstrap.h" -#include "src/core/lib/gprpp/orphanable.h" - -namespace grpc_core { - -// A factory for creating new XdsTransport instances. -class XdsTransportFactory : public InternallyRefCounted { - public: - // Represents a transport for xDS communication (e.g., a gRPC channel). - class XdsTransport : public InternallyRefCounted { - public: - // Represents a bidi streaming RPC call. - class StreamingCall : public InternallyRefCounted { - public: - // An interface for handling events on a streaming call. - class EventHandler { - public: - virtual ~EventHandler() = default; - - // Called when a SendMessage() operation completes. - virtual void OnRequestSent(bool ok) = 0; - // Called when a message is received on the stream. - virtual void OnRecvMessage(absl::string_view payload) = 0; - // Called when status is received on the stream. - virtual void OnStatusReceived(absl::Status status) = 0; - }; - - // Sends a message on the stream. When the message has been sent, - // the EventHandler::OnRequestSent() method will be called. - // Only one message will be in flight at a time; subsequent - // messages will not be sent until this one is done. - virtual void SendMessage(std::string payload) = 0; - }; - - // Create a streaming call on this transport for the specified method. - // Events on the stream will be reported to event_handler. - virtual OrphanablePtr CreateStreamingCall( - const char* method, - std::unique_ptr event_handler) = 0; - - // Resets connection backoff for the transport. - virtual void ResetBackoff() = 0; - }; - - // Creates a new transport for the specified server. - // The on_connectivity_failure callback will be invoked whenever there is - // a connectivity failure on the transport. - // *status will be set if there is an error creating the channel, - // although the returned channel must still accept calls (which may fail). - virtual OrphanablePtr Create( - const XdsBootstrap::XdsServer& server, - std::function on_connectivity_failure, - absl::Status* status) = 0; -}; - -} // namespace grpc_core - -#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_H diff --git a/src/core/ext/xds/xds_transport_grpc.cc b/src/core/ext/xds/xds_transport_grpc.cc deleted file mode 100644 index ad4702b8f1f..00000000000 --- a/src/core/ext/xds/xds_transport_grpc.cc +++ /dev/null @@ -1,358 +0,0 @@ -// -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#include - -#include "src/core/ext/xds/xds_transport_grpc.h" - -#include - -#include -#include -#include - -#include "absl/container/inlined_vector.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "src/core/ext/filters/client_channel/client_channel.h" -#include "src/core/ext/xds/xds_bootstrap.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/channel_fwd.h" -#include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/config/core_configuration.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/time.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/security/credentials/channel_creds_registry.h" -#include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/slice/slice.h" -#include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/slice/slice_refcount.h" -#include "src/core/lib/surface/call.h" -#include "src/core/lib/surface/channel.h" -#include "src/core/lib/surface/lame_client.h" -#include "src/core/lib/transport/connectivity_state.h" - -namespace grpc_core { - -// -// GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall -// - -GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall( - RefCountedPtr factory, grpc_channel* channel, - const char* method, - std::unique_ptr event_handler) - : factory_(std::move(factory)), event_handler_(std::move(event_handler)) { - // Create call. - call_ = grpc_channel_create_pollset_set_call( - channel, nullptr, GRPC_PROPAGATE_DEFAULTS, factory_->interested_parties(), - StaticSlice::FromStaticString(method).c_slice(), nullptr, - Timestamp::InfFuture(), nullptr); - GPR_ASSERT(call_ != nullptr); - // Init data associated with the call. - grpc_metadata_array_init(&initial_metadata_recv_); - grpc_metadata_array_init(&trailing_metadata_recv_); - // Initialize closure to be used for sending messages. - GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr); - // Start ops on the call. - grpc_call_error call_error; - grpc_op ops[3]; - memset(ops, 0, sizeof(ops)); - // Send initial metadata. No callback for this, since we don't really - // care when it finishes. - grpc_op* op = ops; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | - GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; - op->reserved = nullptr; - op++; - call_error = grpc_call_start_batch_and_execute( - call_, ops, static_cast(op - ops), nullptr); - GPR_ASSERT(GRPC_CALL_OK == call_error); - // Start a batch with recv_initial_metadata and recv_message. - op = ops; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata.recv_initial_metadata = - &initial_metadata_recv_; - op->flags = 0; - op->reserved = nullptr; - op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &recv_message_payload_; - op->flags = 0; - op->reserved = nullptr; - op++; - Ref(DEBUG_LOCATION, "OnResponseReceived").release(); - GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr); - call_error = grpc_call_start_batch_and_execute( - call_, ops, static_cast(op - ops), &on_response_received_); - GPR_ASSERT(GRPC_CALL_OK == call_error); - // Start a batch for recv_trailing_metadata. - op = ops; - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; - op->data.recv_status_on_client.status = &status_code_; - op->data.recv_status_on_client.status_details = &status_details_; - op->flags = 0; - op->reserved = nullptr; - op++; - // This callback signals the end of the call, so it relies on the initial - // ref instead of a new ref. When it's invoked, it's the initial ref that is - // unreffed. - GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr); - call_error = grpc_call_start_batch_and_execute( - call_, ops, static_cast(op - ops), &on_status_received_); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: - ~GrpcStreamingCall() { - grpc_metadata_array_destroy(&initial_metadata_recv_); - grpc_metadata_array_destroy(&trailing_metadata_recv_); - grpc_byte_buffer_destroy(send_message_payload_); - grpc_byte_buffer_destroy(recv_message_payload_); - grpc_slice_unref_internal(status_details_); - GPR_ASSERT(call_ != nullptr); - grpc_call_unref(call_); -} - -void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::Orphan() { - GPR_ASSERT(call_ != nullptr); - // If we are here because xds_client wants to cancel the call, - // OnStatusReceived() will complete the cancellation and clean up. - // Otherwise, we are here because xds_client has to orphan a failed call, - // in which case the following cancellation will be a no-op. - grpc_call_cancel_internal(call_); - // Note that the initial ref is held by OnStatusReceived(), so the - // corresponding unref happens there instead of here. -} - -void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage( - std::string payload) { - // Create payload. - grpc_slice slice = grpc_slice_from_cpp_string(std::move(payload)); - send_message_payload_ = grpc_raw_byte_buffer_create(&slice, 1); - grpc_slice_unref_internal(slice); - // Send the message. - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_SEND_MESSAGE; - op.data.send_message.send_message = send_message_payload_; - Ref(DEBUG_LOCATION, "OnRequestSent").release(); - grpc_call_error call_error = - grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: - OnRequestSent(void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - // Clean up the sent message. - grpc_byte_buffer_destroy(self->send_message_payload_); - self->send_message_payload_ = nullptr; - // Invoke request handler. - self->event_handler_->OnRequestSent(GRPC_ERROR_IS_NONE(error)); - // Drop the ref. - self->Unref(DEBUG_LOCATION, "OnRequestSent"); -} - -void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: - OnResponseReceived(void* arg, grpc_error_handle /*error*/) { - auto* self = static_cast(arg); - // If there was no payload, then we received status before we received - // another message, so we stop reading. - if (self->recv_message_payload_ == nullptr) { - self->Unref(DEBUG_LOCATION, "OnResponseReceived"); - return; - } - // Process the response. - grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, self->recv_message_payload_); - grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); - grpc_byte_buffer_reader_destroy(&bbr); - grpc_byte_buffer_destroy(self->recv_message_payload_); - self->recv_message_payload_ = nullptr; - self->event_handler_->OnRecvMessage(StringViewFromSlice(response_slice)); - grpc_slice_unref_internal(response_slice); - // Keep reading. - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_RECV_MESSAGE; - op.data.recv_message.recv_message = &self->recv_message_payload_; - GPR_ASSERT(self->call_ != nullptr); - // Reuses the "OnResponseReceived" ref taken in ctor. - const grpc_call_error call_error = grpc_call_start_batch_and_execute( - self->call_, &op, 1, &self->on_response_received_); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: - OnStatusReceived(void* arg, grpc_error_handle /*error*/) { - auto* self = static_cast(arg); - self->event_handler_->OnStatusReceived( - absl::Status(static_cast(self->status_code_), - StringViewFromSlice(self->status_details_))); - self->Unref(DEBUG_LOCATION, "OnStatusReceived"); -} - -// -// GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher -// - -class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher - : public AsyncConnectivityStateWatcherInterface { - public: - explicit StateWatcher( - std::function on_connectivity_failure) - : on_connectivity_failure_(std::move(on_connectivity_failure)) {} - - private: - void OnConnectivityStateChange(grpc_connectivity_state new_state, - const absl::Status& status) override { - if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - on_connectivity_failure_(status); - } - } - - std::function on_connectivity_failure_; -}; - -// -// GrpcXdsClient::GrpcXdsTransport -// - -namespace { - -grpc_channel* CreateXdsChannel(grpc_channel_args* args, - const XdsBootstrap::XdsServer& server) { - RefCountedPtr channel_creds = - CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds( - server.channel_creds_type, server.channel_creds_config); - return grpc_channel_create(server.server_uri.c_str(), channel_creds.get(), - args); -} - -bool IsLameChannel(grpc_channel* channel) { - grpc_channel_element* elem = - grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); - return elem->filter == &LameClientFilter::kFilter; -} - -} // namespace - -GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport( - GrpcXdsTransportFactory* factory, const XdsBootstrap::XdsServer& server, - std::function on_connectivity_failure, - absl::Status* status) - : factory_(factory) { - channel_ = CreateXdsChannel(factory->args_, server); - GPR_ASSERT(channel_ != nullptr); - if (IsLameChannel(channel_)) { - *status = absl::UnavailableError("xds client has a lame channel"); - } else { - ClientChannel* client_channel = - ClientChannel::GetFromChannel(Channel::FromC(channel_)); - GPR_ASSERT(client_channel != nullptr); - watcher_ = new StateWatcher(std::move(on_connectivity_failure)); - client_channel->AddConnectivityWatcher( - GRPC_CHANNEL_IDLE, - OrphanablePtr(watcher_)); - } -} - -GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() { - grpc_channel_destroy(channel_); -} - -void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() { - if (IsLameChannel(channel_)) return; - ClientChannel* client_channel = - ClientChannel::GetFromChannel(Channel::FromC(channel_)); - GPR_ASSERT(client_channel != nullptr); - client_channel->RemoveConnectivityWatcher(watcher_); - Unref(); -} - -OrphanablePtr -GrpcXdsTransportFactory::GrpcXdsTransport::CreateStreamingCall( - const char* method, - std::unique_ptr event_handler) { - return MakeOrphanable( - factory_->Ref(DEBUG_LOCATION, "StreamingCall"), channel_, method, - std::move(event_handler)); -} - -void GrpcXdsTransportFactory::GrpcXdsTransport::ResetBackoff() { - grpc_channel_reset_connect_backoff(channel_); -} - -// -// GrpcXdsTransportFactory -// - -namespace { - -grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) { - absl::InlinedVector args_to_add = { - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), - 5 * 60 * GPR_MS_PER_SEC), - }; - return grpc_channel_args_copy_and_add(args, args_to_add.data(), - args_to_add.size()); -} - -} // namespace - -GrpcXdsTransportFactory::GrpcXdsTransportFactory(const grpc_channel_args* args) - : args_(ModifyChannelArgs(args)), - interested_parties_(grpc_pollset_set_create()) { - // Calling grpc_init to ensure gRPC does not shut down until the XdsClient is - // destroyed. - grpc_init(); -} - -GrpcXdsTransportFactory::~GrpcXdsTransportFactory() { - grpc_channel_args_destroy(args_); - grpc_pollset_set_destroy(interested_parties_); - // Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient - // is destroyed. - grpc_shutdown(); -} - -OrphanablePtr -GrpcXdsTransportFactory::Create( - const XdsBootstrap::XdsServer& server, - std::function on_connectivity_failure, - absl::Status* status) { - return MakeOrphanable( - this, server, std::move(on_connectivity_failure), status); -} - -} // namespace grpc_core diff --git a/src/core/ext/xds/xds_transport_grpc.h b/src/core/ext/xds/xds_transport_grpc.h deleted file mode 100644 index cb17b514b0e..00000000000 --- a/src/core/ext/xds/xds_transport_grpc.h +++ /dev/null @@ -1,134 +0,0 @@ -// -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H -#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H - -#include - -#include -#include -#include - -#include "absl/status/status.h" - -#include -#include -#include - -#include "src/core/ext/xds/xds_bootstrap.h" -#include "src/core/ext/xds/xds_transport.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/iomgr_fwd.h" - -namespace grpc_core { - -class GrpcXdsTransportFactory : public XdsTransportFactory { - public: - class GrpcXdsTransport; - - explicit GrpcXdsTransportFactory(const grpc_channel_args* args); - ~GrpcXdsTransportFactory() override; - - void Orphan() override { Unref(); } - - OrphanablePtr Create( - const XdsBootstrap::XdsServer& server, - std::function on_connectivity_failure, - absl::Status* status) override; - - grpc_pollset_set* interested_parties() const { return interested_parties_; } - - private: - grpc_channel_args* args_; - grpc_pollset_set* interested_parties_; -}; - -class GrpcXdsTransportFactory::GrpcXdsTransport - : public XdsTransportFactory::XdsTransport { - public: - class GrpcStreamingCall; - - GrpcXdsTransport(GrpcXdsTransportFactory* factory, - const XdsBootstrap::XdsServer& server, - std::function on_connectivity_failure, - absl::Status* status); - ~GrpcXdsTransport() override; - - void Orphan() override; - - OrphanablePtr CreateStreamingCall( - const char* method, - std::unique_ptr event_handler) override; - - void ResetBackoff() override; - - private: - class StateWatcher; - - GrpcXdsTransportFactory* factory_; // Not owned. - grpc_channel* channel_; - StateWatcher* watcher_; -}; - -class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall - : public XdsTransportFactory::XdsTransport::StreamingCall { - public: - GrpcStreamingCall(RefCountedPtr factory, - grpc_channel* channel, const char* method, - std::unique_ptr event_handler); - ~GrpcStreamingCall() override; - - void Orphan() override; - - void SendMessage(std::string payload) override; - - private: - static void OnRequestSent(void* arg, grpc_error_handle error); - static void OnResponseReceived(void* arg, grpc_error_handle /*error*/); - static void OnStatusReceived(void* arg, grpc_error_handle /*error*/); - - RefCountedPtr factory_; - - std::unique_ptr event_handler_; - - // Always non-NULL. - grpc_call* call_; - - // recv_initial_metadata - grpc_metadata_array initial_metadata_recv_; - - // send_message - grpc_byte_buffer* send_message_payload_ = nullptr; - grpc_closure on_request_sent_; - - // recv_message - grpc_byte_buffer* recv_message_payload_ = nullptr; - grpc_closure on_response_received_; - - // recv_trailing_metadata - grpc_metadata_array trailing_metadata_recv_; - grpc_status_code status_code_; - grpc_slice status_details_; - grpc_closure on_status_received_; -}; - -} // namespace grpc_core - -#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 3d9e7deeda8..f46890a7f12 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -402,7 +402,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/xds/xds_certificate_provider.cc', 'src/core/ext/xds/xds_channel_stack_modifier.cc', 'src/core/ext/xds/xds_client.cc', - 'src/core/ext/xds/xds_client_grpc.cc', 'src/core/ext/xds/xds_client_stats.cc', 'src/core/ext/xds/xds_cluster.cc', 'src/core/ext/xds/xds_cluster_specifier_plugin.cc', @@ -417,7 +416,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/xds/xds_route_config.cc', 'src/core/ext/xds/xds_routing.cc', 'src/core/ext/xds/xds_server_config_fetcher.cc', - 'src/core/ext/xds/xds_transport_grpc.cc', 'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/sockaddr_utils.cc', 'src/core/lib/backoff/backoff.cc', diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc index c0875cb6bbf..49cd4cd1276 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -38,7 +38,7 @@ #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/ext/xds/xds_channel_args.h" -#include "src/core/ext/xds/xds_client_grpc.h" +#include "src/core/ext/xds/xds_client.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/tmpfile.h" #include "src/core/lib/iomgr/load_file.h" diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 8a735f35bf3..89eab79c72e 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1856,8 +1856,6 @@ src/core/ext/xds/xds_channel_stack_modifier.cc \ src/core/ext/xds/xds_channel_stack_modifier.h \ src/core/ext/xds/xds_client.cc \ src/core/ext/xds/xds_client.h \ -src/core/ext/xds/xds_client_grpc.cc \ -src/core/ext/xds/xds_client_grpc.h \ src/core/ext/xds/xds_client_stats.cc \ src/core/ext/xds/xds_client_stats.h \ src/core/ext/xds/xds_cluster.cc \ @@ -1886,9 +1884,6 @@ src/core/ext/xds/xds_route_config.h \ src/core/ext/xds/xds_routing.cc \ src/core/ext/xds/xds_routing.h \ src/core/ext/xds/xds_server_config_fetcher.cc \ -src/core/ext/xds/xds_transport.h \ -src/core/ext/xds/xds_transport_grpc.cc \ -src/core/ext/xds/xds_transport_grpc.h \ src/core/lib/address_utils/parse_address.cc \ src/core/lib/address_utils/parse_address.h \ src/core/lib/address_utils/sockaddr_utils.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 7a84f5ef842..ec28471b4d3 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1645,8 +1645,6 @@ src/core/ext/xds/xds_channel_stack_modifier.cc \ src/core/ext/xds/xds_channel_stack_modifier.h \ src/core/ext/xds/xds_client.cc \ src/core/ext/xds/xds_client.h \ -src/core/ext/xds/xds_client_grpc.cc \ -src/core/ext/xds/xds_client_grpc.h \ src/core/ext/xds/xds_client_stats.cc \ src/core/ext/xds/xds_client_stats.h \ src/core/ext/xds/xds_cluster.cc \ @@ -1675,9 +1673,6 @@ src/core/ext/xds/xds_route_config.h \ src/core/ext/xds/xds_routing.cc \ src/core/ext/xds/xds_routing.h \ src/core/ext/xds/xds_server_config_fetcher.cc \ -src/core/ext/xds/xds_transport.h \ -src/core/ext/xds/xds_transport_grpc.cc \ -src/core/ext/xds/xds_transport_grpc.h \ src/core/lib/README.md \ src/core/lib/address_utils/parse_address.cc \ src/core/lib/address_utils/parse_address.h \