pull/36732/head
Craig Tiller 11 months ago
parent d9b610ea1a
commit a5cbcd8c7e
  1. 68
      src/core/client_channel/client_channel.cc
  2. 25
      src/core/client_channel/client_channel.h
  3. 1541
      src/core/client_channel/subchannel.cc
  4. 2
      src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h
  5. 19
      src/core/lib/channel/channel_args.h
  6. 3
      src/core/lib/channel/context.h
  7. 2
      src/core/lib/surface/channel.h
  8. 5
      src/core/load_balancing/lb_policy.h

@ -46,9 +46,9 @@
#include <grpc/status.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include <grpc/support/metrics.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpc/support/metrics.h>
#include "src/core/client_channel/backup_poller.h"
#include "src/core/client_channel/client_channel_internal.h"
@ -60,6 +60,7 @@
#include "src/core/client_channel/retry_filter.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_interface_internal.h"
#include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/metrics.h"
@ -110,7 +111,6 @@
#include "src/core/resolver/resolver_registry.h"
#include "src/core/service_config/service_config_call_data.h"
#include "src/core/service_config/service_config_impl.h"
#include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h"
namespace grpc_core {
@ -328,9 +328,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
DEBUG_LOCATION);
}
grpc_pollset_set* interested_parties() override {
return watcher_->interested_parties();
}
grpc_pollset_set* interested_parties() override { return nullptr; }
private:
void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
@ -778,7 +776,8 @@ const NoInterceptor LbCallTracingFilter::Call::OnServerToClientMessage;
} // namespace
class ClientChannel::LoadBalancedCallDestination : public UnstartedCallDestination {
class ClientChannel::LoadBalancedCallDestination
: public UnstartedCallDestination {
public:
explicit LoadBalancedCallDestination(
RefCountedPtr<ClientChannel> client_channel)
@ -931,6 +930,20 @@ absl::StatusOr<OrphanablePtr<Channel>> ClientChannel::Create(
std::move(*default_service_config), client_channel_factory);
}
namespace {
std::string GetDefaultAuthorityFromChannelArgs(const ChannelArgs& channel_args,
absl::string_view target) {
absl::optional<std::string> default_authority =
channel_args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
if (!default_authority.has_value()) {
return CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
target);
} else {
return std::move(*default_authority);
}
}
} // namespace
ClientChannel::ClientChannel(
std::string target, ChannelArgs channel_args, std::string uri_to_resolve,
RefCountedPtr<ServiceConfig> default_service_config,
@ -943,12 +956,9 @@ ClientChannel::ClientChannel(
internal::ClientChannelServiceConfigParser::ParserIndex()),
default_service_config_(std::move(default_service_config)),
client_channel_factory_(client_channel_factory),
default_authority_(
GetDefaultAuthorityFromChannelArgs(channel_args_, this->target())),
channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
interested_parties_(grpc_pollset_set_create()),
lb_call_size_estimator_(1024),
lb_call_allocator_(channel_args_.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryOwner()),
idle_timeout_(GetClientIdleTimeout(channel_args_)),
resolver_data_for_calls_(ResolverDataForCalls{}),
picker_(nullptr),
@ -965,18 +975,9 @@ ClientChannel::ClientChannel(
} else {
keepalive_time_ = -1; // unset
}
// Set default authority.
absl::optional<std::string> default_authority =
channel_args_.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
if (!default_authority.has_value()) {
default_authority_ =
CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
target);
} else {
default_authority_ = std::move(*default_authority);
}
// Get stats plugins for channel.
experimental:: StatsPluginChannelScope scope(this->target(), default_authority_);
experimental::StatsPluginChannelScope scope(this->target(),
default_authority_);
stats_plugin_group_ =
GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
}
@ -985,7 +986,6 @@ ClientChannel::~ClientChannel() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "client_channel=%p: destroying", this);
}
grpc_pollset_set_destroy(interested_parties_);
}
void ClientChannel::Orphan() {
@ -1196,14 +1196,14 @@ CallInitiator ClientChannel::CreateCall(
CheckConnectivityState(/*try_to_connect=*/true);
// Create an initiator/unstarted-handler pair.
auto call = MakeCallPair(std::move(client_initial_metadata),
GetContext<EventEngine>(), arena, nullptr, GetContext<grpc_call_context_element>());
GetContext<EventEngine>(), arena, nullptr,
GetContext<grpc_call_context_element>());
// Spawn a promise to wait for the resolver result.
// This will eventually start the call.
call.initiator.SpawnGuarded(
"wait-for-name-resolution",
[self = RefAsSubclass<ClientChannel>(),
unstarted_handler = std::move(call.handler),
was_queued = false]() mutable {
"wait-for-name-resolution", [self = RefAsSubclass<ClientChannel>(),
unstarted_handler = std::move(call.handler),
was_queued = false]() mutable {
const bool wait_for_ready =
unstarted_handler.UnprocessedClientInitialMetadata()
.GetOrCreatePointer(WaitForReady())
@ -1260,9 +1260,7 @@ void ClientChannel::CreateResolverLocked() {
this, uri_to_resolve_.c_str());
}
resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver(
uri_to_resolve_, channel_args_,
interested_parties_, // FIXME: remove somehow
work_serializer_,
uri_to_resolve_, channel_args_, nullptr, work_serializer_,
std::make_unique<ResolverResultHandler>(RefAsSubclass<ClientChannel>()));
// Since the validity of the args was checked when the channel was created,
// CreateResolver() must return a non-null result.
@ -1629,7 +1627,8 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
if (idle_state_.DecreaseCallCount()) StartIdleTimer();
});
}
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(GRPC_CLIENT_CHANNEL, builder);
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
GRPC_CLIENT_CHANNEL, builder);
// FIXME: add filters returned by config selector
#if 0
std::vector<const grpc_channel_filter*> filters =
@ -1648,9 +1647,8 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
if (enable_retries) {
Crash("call v3 stack does not yet support retries");
} else {
call_destination =
MakeRefCounted<LoadBalancedCallDestination>(
RefAsSubclass<ClientChannel>());
call_destination = MakeRefCounted<LoadBalancedCallDestination>(
RefAsSubclass<ClientChannel>());
}
auto filter_stack = builder.Build(call_destination);
// Send result to data plane.

@ -167,25 +167,16 @@ class ClientChannel : public Channel {
LoadBalancingPolicy::SubchannelPicker& picker,
UnstartedCallHandler& unstarted_handler);
//
// Fields set at construction and never modified.
//
ChannelArgs channel_args_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
std::string uri_to_resolve_;
const ChannelArgs channel_args_;
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_;
const std::string uri_to_resolve_;
const size_t service_config_parser_index_;
RefCountedPtr<ServiceConfig> default_service_config_;
ClientChannelFactory* client_channel_factory_;
std::string default_authority_;
channelz::ChannelNode* channelz_node_;
const RefCountedPtr<ServiceConfig> default_service_config_;
ClientChannelFactory* const client_channel_factory_;
const std::string default_authority_;
channelz::ChannelNode* const channelz_node_;
GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_;
grpc_pollset_set* interested_parties_;
//
// State for LB calls.
//
CallSizeEstimator lb_call_size_estimator_;
MemoryAllocator lb_call_allocator_;
//
// Idleness state.

File diff suppressed because it is too large Load Diff

@ -40,7 +40,7 @@
namespace grpc_core {
Duration GetClientIdleTimeout(const ChannelArgs& args) ;
Duration GetClientIdleTimeout(const ChannelArgs& args);
class LegacyChannelIdleFilter : public ChannelFilter {
public:

@ -89,7 +89,8 @@ struct IsRawPointerTagged {
static constexpr bool kValue = false;
};
template <typename T>
struct IsRawPointerTagged<T, absl::void_t<typename T::RawPointerChannelArgTag>> {
struct IsRawPointerTagged<T,
absl::void_t<typename T::RawPointerChannelArgTag>> {
static constexpr bool kValue = true;
};
} // namespace channel_args_detail
@ -100,14 +101,14 @@ struct IsRawPointerTagged<T, absl::void_t<typename T::RawPointerChannelArgTag>>
template <typename T>
struct ChannelArgTypeTraits<
T, absl::enable_if_t<
!channel_args_detail::IsRawPointerTagged<T>::kValue &&(
std::is_base_of<RefCounted<channel_args_detail::RefType<T>>,
channel_args_detail::RefType<T>>::value ||
std::is_base_of<RefCounted<channel_args_detail::RefType<T>,
NonPolymorphicRefCount>,
channel_args_detail::RefType<T>>::value ||
std::is_base_of<DualRefCounted<channel_args_detail::RefType<T>>,
channel_args_detail::RefType<T>>::value),
!channel_args_detail::IsRawPointerTagged<T>::kValue &&
(std::is_base_of<RefCounted<channel_args_detail::RefType<T>>,
channel_args_detail::RefType<T>>::value ||
std::is_base_of<RefCounted<channel_args_detail::RefType<T>,
NonPolymorphicRefCount>,
channel_args_detail::RefType<T>>::value ||
std::is_base_of<DualRefCounted<channel_args_detail::RefType<T>>,
channel_args_detail::RefType<T>>::value),
void>> {
static const grpc_arg_pointer_vtable* VTable() {
static const grpc_arg_pointer_vtable tbl = {

@ -108,7 +108,8 @@ class Context<T, absl::void_t<decltype(OldStyleContext<T>::kIndex)>> {
.value);
}
static void set(T* value) {
auto& elem = GetContext<grpc_call_context_element>()[OldStyleContext<T>::kIndex];
auto& elem =
GetContext<grpc_call_context_element>()[OldStyleContext<T>::kIndex];
if (elem.destroy != nullptr) {
elem.destroy(elem.value);
elem.destroy = nullptr;

@ -43,8 +43,8 @@
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/call_arena_allocator.h"
#include "src/core/lib/transport/connectivity_state.h"
// Forward declaration to avoid dependency loop.
struct grpc_channel_stack;

@ -488,10 +488,11 @@ namespace promise_detail {
template <>
struct OldStyleContext<LoadBalancingPolicy::SubchannelCallTrackerInterface> {
static constexpr grpc_context_index kIndex = GRPC_SUBCHANNEL_CALL_TRACKER_INTERFACE;
static constexpr grpc_context_index kIndex =
GRPC_SUBCHANNEL_CALL_TRACKER_INTERFACE;
};
}
} // namespace promise_detail
} // namespace grpc_core

Loading…
Cancel
Save