diff --git a/BUILD b/BUILD index be97a52ae94..135a5c083c5 100644 --- a/BUILD +++ b/BUILD @@ -1794,7 +1794,6 @@ grpc_cc_library( "absl/status:statusor", "absl/strings", "absl/synchronization", - "absl/types:optional", "absl/memory", "upb_lib", "protobuf_headers", @@ -2671,6 +2670,7 @@ grpc_cc_library( "//src/core:channel_init", "//src/core:channel_stack_type", "//src/core:construct_destruct", + "//src/core:default_event_engine", "//src/core:dual_ref_counted", "//src/core:env", "//src/core:gpr_atm", diff --git a/src/core/BUILD b/src/core/BUILD index 56417492e00..6676e577a64 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1809,13 +1809,10 @@ grpc_cc_library( ], external_deps = ["absl/functional:any_invocable"], deps = [ - "channel_args", - "channel_args_preconditioning", "context", "default_event_engine_factory", "event_engine_trace", "no_destruct", - "//:config", "//:event_engine_base_hdrs", "//:gpr", "//:grpc_trace", @@ -2961,6 +2958,7 @@ grpc_cc_library( "channel_init", "channel_stack_type", "closure", + "default_event_engine", "exec_ctx_wakeup_scheduler", "http2_errors", "idle_filter_state", @@ -3191,6 +3189,7 @@ grpc_cc_library( "channel_fwd", "channel_init", "channel_stack_type", + "default_event_engine", "gpr_atm", "grpc_sockaddr", "json", @@ -3753,6 +3752,7 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", + "default_event_engine", "grpc_resolver_xds_header", "json", "json_args", @@ -4077,6 +4077,7 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", + "default_event_engine", "grpc_lb_address_filtering", "json", "json_args", diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.cc b/src/core/ext/filters/channel_idle/channel_idle_filter.cc index 479953a0df6..0ac7cf61a3b 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.cc +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.cc @@ -26,6 +26,7 @@ #include "absl/types/optional.h" +#include #include #include @@ -34,6 +35,7 @@ #include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/status_helper.h" @@ -54,6 +56,8 @@ namespace grpc_core { namespace { +using ::grpc_event_engine::experimental::EventEngine; + // TODO(ctiller): The idle filter was disabled in client channel by default // due to b/143502997. Now the bug is fixed enable the filter by default. const auto kDefaultIdleTimeout = Duration::Infinity(); @@ -121,15 +125,19 @@ struct MaxAgeFilter::Config { absl::StatusOr ClientIdleFilter::Create( const ChannelArgs& args, ChannelFilter::Args filter_args) { - ClientIdleFilter filter(filter_args.channel_stack(), - GetClientIdleTimeout(args)); + // TODO(hork): pull EventEngine from args + ClientIdleFilter filter( + filter_args.channel_stack(), GetClientIdleTimeout(args), + grpc_event_engine::experimental::GetDefaultEventEngine()); return absl::StatusOr(std::move(filter)); } absl::StatusOr MaxAgeFilter::Create( const ChannelArgs& args, ChannelFilter::Args filter_args) { + // TODO(hork): pull EventEngine from args MaxAgeFilter filter(filter_args.channel_stack(), - Config::FromChannelArgs(args)); + Config::FromChannelArgs(args), + grpc_event_engine::experimental::GetDefaultEventEngine()); return absl::StatusOr(std::move(filter)); } @@ -203,7 +211,7 @@ void MaxAgeFilter::PostInit() { // (if it did not, it was cancelled) if (status.ok()) CloseChannel(); }, - channel_stack->EventEngine())); + engine_.get())); } } @@ -264,7 +272,7 @@ void ChannelIdleFilter::StartIdleTimer() { [channel_stack, this](absl::Status status) { if (status.ok()) CloseChannel(); }, - channel_stack->EventEngine())); + engine_.get())); } void ChannelIdleFilter::CloseChannel() { @@ -306,10 +314,13 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) { }); } -MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack, - const Config& max_age_config) - : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle), +MaxAgeFilter::MaxAgeFilter( + grpc_channel_stack* channel_stack, const Config& max_age_config, + std::shared_ptr engine) + : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle, + engine), max_connection_age_(max_age_config.max_connection_age), - max_connection_age_grace_(max_age_config.max_connection_age_grace) {} + max_connection_age_grace_(max_age_config.max_connection_age_grace), + engine_(engine) {} } // namespace grpc_core diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.h b/src/core/ext/filters/channel_idle/channel_idle_filter.h index d926d09f71b..bc49a2a0174 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.h +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.h @@ -22,6 +22,7 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" +#include #include #include "src/core/ext/filters/channel_idle/idle_filter_state.h" @@ -58,10 +59,12 @@ class ChannelIdleFilter : public ChannelFilter { using SingleSetActivityPtr = SingleSetPtr; - ChannelIdleFilter(grpc_channel_stack* channel_stack, - Duration client_idle_timeout) + ChannelIdleFilter( + grpc_channel_stack* channel_stack, Duration client_idle_timeout, + std::shared_ptr engine) : channel_stack_(channel_stack), - client_idle_timeout_(client_idle_timeout) {} + client_idle_timeout_(client_idle_timeout), + engine_(engine) {} grpc_channel_stack* channel_stack() { return channel_stack_; }; @@ -87,6 +90,7 @@ class ChannelIdleFilter : public ChannelFilter { std::make_shared(false)}; SingleSetActivityPtr activity_; + std::shared_ptr engine_; }; class ClientIdleFilter final : public ChannelIdleFilter { @@ -127,13 +131,16 @@ class MaxAgeFilter final : public ChannelIdleFilter { MaxAgeFilter* filter_; }; - MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config); + MaxAgeFilter( + grpc_channel_stack* channel_stack, const Config& max_age_config, + std::shared_ptr engine); void Shutdown() override; SingleSetActivityPtr max_age_activity_; Duration max_connection_age_; Duration max_connection_age_grace_; + std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index a3c99194c44..31f573a0e12 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -37,7 +37,6 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" -#include #include #include #include @@ -916,10 +915,6 @@ class ClientChannel::ClientChannelControlHelper return chand_->default_authority_; } - grpc_event_engine::experimental::EventEngine* GetEventEngine() override { - return chand_->owning_stack_->EventEngine(); - } - void AddTraceEvent(TraceSeverity severity, absl::string_view message) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { if (chand_->resolver_ == nullptr) return; // Shutting down. diff --git a/src/core/ext/filters/client_channel/dynamic_filters.cc b/src/core/ext/filters/client_channel/dynamic_filters.cc index 6d0a015b4ea..c023dc71c72 100644 --- a/src/core/ext/filters/client_channel/dynamic_filters.cc +++ b/src/core/ext/filters/client_channel/dynamic_filters.cc @@ -140,7 +140,8 @@ namespace { absl::StatusOr> CreateChannelStack( const ChannelArgs& args, std::vector filters) { - ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC, args); + ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC); + builder.SetChannelArgs(args); for (auto filter : filters) { builder.AppendFilter(filter); } diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc index b364b32b137..41568e68963 100644 --- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc @@ -25,7 +25,6 @@ #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" -#include #include #include @@ -108,10 +107,6 @@ class ChildPolicyHandler::Helper return parent_->channel_control_helper()->GetAuthority(); } - grpc_event_engine::experimental::EventEngine* GetEventEngine() override { - return parent_->channel_control_helper()->GetEventEngine(); - } - void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { if (parent_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 8a907c958bc..66728f792a3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -105,6 +105,7 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" @@ -302,6 +303,7 @@ class GrpcLb : public LoadBalancingPolicy { bool client_load_report_is_due_ = false; // The closure used for the completion of sending the load report. grpc_closure client_load_report_done_closure_; + std::shared_ptr engine_; }; class SubchannelWrapper : public DelegatingSubchannel { @@ -467,7 +469,6 @@ class GrpcLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; - grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -871,10 +872,6 @@ absl::string_view GrpcLb::Helper::GetAuthority() { return parent_->channel_control_helper()->GetAuthority(); } -grpc_event_engine::experimental::EventEngine* GrpcLb::Helper::GetEventEngine() { - return parent_->channel_control_helper()->GetEventEngine(); -} - void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (parent_->shutting_down_) return; @@ -890,7 +887,8 @@ GrpcLb::BalancerCallState::BalancerCallState( : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState" : nullptr), - grpclb_policy_(std::move(parent_grpclb_policy)) { + grpclb_policy_(std::move(parent_grpclb_policy)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { GPR_ASSERT(grpclb_policy_ != nullptr); GPR_ASSERT(!grpclb_policy()->shutting_down_); // Init the LB call. Note that the LB call will progress every time there's @@ -948,8 +946,7 @@ void GrpcLb::BalancerCallState::Orphan() { // call, then the following cancellation will be a no-op. grpc_call_cancel_internal(lb_call_); if (client_load_report_handle_.has_value() && - grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel( - client_load_report_handle_.value())) { + engine_->Cancel(client_load_report_handle_.value())) { Unref(DEBUG_LOCATION, "client_load_report cancelled"); } // Note that the initial ref is hold by lb_on_balancer_status_received_ @@ -1035,13 +1032,12 @@ void GrpcLb::BalancerCallState::StartQuery() { void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { client_load_report_handle_ = - grpclb_policy()->channel_control_helper()->GetEventEngine()->RunAfter( - client_stats_report_interval_, [this] { - ApplicationCallbackExecCtx callback_exec_ctx; - ExecCtx exec_ctx; - grpclb_policy()->work_serializer()->Run( - [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION); - }); + engine_->RunAfter(client_stats_report_interval_, [this] { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + grpclb_policy()->work_serializer()->Run( + [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION); + }); } void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() { @@ -1907,10 +1903,13 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { } // namespace +} // namespace grpc_core + // // Plugin registration // +namespace grpc_core { void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) { builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory( std::make_unique()); @@ -1929,5 +1928,4 @@ void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) { return true; }); } - } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc index 295fbb4ca55..b5047a79788 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc @@ -38,7 +38,6 @@ #include "absl/strings/string_view.h" #include "absl/types/variant.h" -#include #include #include @@ -346,7 +345,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; - grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -771,11 +769,6 @@ absl::string_view OutlierDetectionLb::Helper::GetAuthority() { return outlier_detection_policy_->channel_control_helper()->GetAuthority(); } -grpc_event_engine::experimental::EventEngine* -OutlierDetectionLb::Helper::GetEventEngine() { - return outlier_detection_policy_->channel_control_helper()->GetEventEngine(); -} - void OutlierDetectionLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (outlier_detection_policy_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc index c5e114bebfb..8913ac88c5e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc +++ b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc @@ -33,7 +33,6 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" -#include #include #include #include @@ -198,7 +197,6 @@ class PriorityLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; - grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -851,12 +849,6 @@ absl::string_view PriorityLb::ChildPriority::Helper::GetAuthority() { return priority_->priority_policy_->channel_control_helper()->GetAuthority(); } -grpc_event_engine::experimental::EventEngine* -PriorityLb::ChildPriority::Helper::GetEventEngine() { - return priority_->priority_policy_->channel_control_helper() - ->GetEventEngine(); -} - void PriorityLb::ChildPriority::Helper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (priority_->priority_policy_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index 28e7e59cddd..363a9f191e8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -53,7 +53,6 @@ #include #include -#include #include #include #include @@ -336,7 +335,6 @@ class RlsLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; - grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -917,11 +915,6 @@ absl::string_view RlsLb::ChildPolicyWrapper::ChildPolicyHelper::GetAuthority() { return wrapper_->lb_policy_->channel_control_helper()->GetAuthority(); } -grpc_event_engine::experimental::EventEngine* -RlsLb::ChildPolicyWrapper::ChildPolicyHelper::GetEventEngine() { - return wrapper_->lb_policy_->channel_control_helper()->GetEventEngine(); -} - void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (wrapper_->is_shutdown_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index 3fe4675fd7a..a8cf390873c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -43,6 +43,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -192,7 +193,6 @@ class WeightedTargetLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; - grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -212,6 +212,7 @@ class WeightedTargetLb : public LoadBalancingPolicy { RefCountedPtr weighted_child_; absl::optional timer_handle_; + std::shared_ptr engine_; }; // Methods for dealing with the child policy. @@ -480,17 +481,16 @@ void WeightedTargetLb::UpdateStateLocked() { WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer( RefCountedPtr weighted_child) - : weighted_child_(std::move(weighted_child)) { + : weighted_child_(std::move(weighted_child)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { timer_handle_ = - weighted_child_->weighted_target_policy_->channel_control_helper() - ->GetEventEngine() - ->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable { - ApplicationCallbackExecCtx app_exec_ctx; - ExecCtx exec_ctx; - self->weighted_child_->weighted_target_policy_->work_serializer() - ->Run([self = std::move(self)] { self->OnTimerLocked(); }, - DEBUG_LOCATION); - }); + engine_->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable { + ApplicationCallbackExecCtx app_exec_ctx; + ExecCtx exec_ctx; + self->weighted_child_->weighted_target_policy_->work_serializer()->Run( + [self = std::move(self)] { self->OnTimerLocked(); }, + DEBUG_LOCATION); + }); } void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() { @@ -502,9 +502,7 @@ void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() { weighted_child_->weighted_target_policy_.get(), weighted_child_.get(), weighted_child_->name_.c_str()); } - weighted_child_->weighted_target_policy_->channel_control_helper() - ->GetEventEngine() - ->Cancel(*timer_handle_); + engine_->Cancel(*timer_handle_); } Unref(); } @@ -703,12 +701,6 @@ absl::string_view WeightedTargetLb::WeightedChild::Helper::GetAuthority() { ->GetAuthority(); } -grpc_event_engine::experimental::EventEngine* -WeightedTargetLb::WeightedChild::Helper::GetEventEngine() { - return weighted_child_->weighted_target_policy_->channel_control_helper() - ->GetEventEngine(); -} - void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (weighted_child_->weighted_target_policy_->shutting_down_) return; @@ -762,7 +754,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return MakeOrphanable(std::move(args)); - } + } // namespace absl::string_view name() const override { return kWeightedTarget; } @@ -779,7 +771,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { return LoadRefCountedFromJson( json, JsonArgs(), "errors validating weighted_target LB policy config"); } -}; +}; // namespace grpc_core } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index ea2b91fe55d..5bc13e7aece 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -30,7 +30,6 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" -#include #include #include #include @@ -177,7 +176,6 @@ class CdsLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; - grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -265,10 +263,6 @@ absl::string_view CdsLb::Helper::GetAuthority() { return parent_->channel_control_helper()->GetAuthority(); } -grpc_event_engine::experimental::EventEngine* CdsLb::Helper::GetEventEngine() { - return parent_->channel_control_helper()->GetEventEngine(); -} - void CdsLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (parent_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index 19057e90a05..fd1c72ba8ae 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -34,7 +34,6 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" -#include #include #include @@ -254,7 +253,6 @@ class XdsClusterImplLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; - grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -681,11 +679,6 @@ absl::string_view XdsClusterImplLb::Helper::GetAuthority() { return xds_cluster_impl_policy_->channel_control_helper()->GetAuthority(); } -grpc_event_engine::experimental::EventEngine* -XdsClusterImplLb::Helper::GetEventEngine() { - return xds_cluster_impl_policy_->channel_control_helper()->GetEventEngine(); -} - void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (xds_cluster_impl_policy_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc index 6fba5d6e3cd..c673b63f45a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc @@ -42,6 +42,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -68,6 +69,7 @@ TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb"); namespace { using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; constexpr Duration kChildRetentionInterval = Duration::Minutes(15); constexpr absl::string_view kXdsClusterManager = @@ -195,7 +197,6 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; - EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -223,6 +224,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { // States for delayed removal. absl::optional delayed_removal_timer_handle_; bool shutdown_ = false; + std::shared_ptr engine_; }; ~XdsClusterManagerLb() override; @@ -428,7 +430,8 @@ XdsClusterManagerLb::ClusterChild::ClusterChild( RefCountedPtr xds_cluster_manager_policy, const std::string& name) : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)), - name_(name) { + name_(name), + engine_(GetDefaultEventEngine()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] created ClusterChild %p for %s", @@ -463,9 +466,7 @@ void XdsClusterManagerLb::ClusterChild::Orphan() { // the child. picker_wrapper_.reset(); if (delayed_removal_timer_handle_.has_value()) { - xds_cluster_manager_policy_->channel_control_helper() - ->GetEventEngine() - ->Cancel(*delayed_removal_timer_handle_); + engine_->Cancel(*delayed_removal_timer_handle_); } shutdown_ = true; Unref(); @@ -508,9 +509,7 @@ absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked( // Update child weight. // Reactivate if needed. if (delayed_removal_timer_handle_.has_value() && - xds_cluster_manager_policy_->channel_control_helper() - ->GetEventEngine() - ->Cancel(*delayed_removal_timer_handle_)) { + engine_->Cancel(*delayed_removal_timer_handle_)) { delayed_removal_timer_handle_.reset(); } // Create child policy if needed. @@ -545,21 +544,17 @@ void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() { void XdsClusterManagerLb::ClusterChild::DeactivateLocked() { // If already deactivated, don't do that again. if (delayed_removal_timer_handle_.has_value()) return; + // Set the child weight to 0 so that future picker won't contain this child. // Start a timer to delete the child. - delayed_removal_timer_handle_ = - xds_cluster_manager_policy_->channel_control_helper() - ->GetEventEngine() - ->RunAfter( - kChildRetentionInterval, - [self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable { - ApplicationCallbackExecCtx application_exec_ctx; - ExecCtx exec_ctx; - self->xds_cluster_manager_policy_->work_serializer()->Run( - [self = std::move(self)]() { - self->OnDelayedRemovalTimerLocked(); - }, - DEBUG_LOCATION); - }); + delayed_removal_timer_handle_ = engine_->RunAfter( + kChildRetentionInterval, + [self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable { + ApplicationCallbackExecCtx application_exec_ctx; + ExecCtx exec_ctx; + self->xds_cluster_manager_policy_->work_serializer()->Run( + [self = std::move(self)]() { self->OnDelayedRemovalTimerLocked(); }, + DEBUG_LOCATION); + }); } void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked() { @@ -630,12 +625,6 @@ absl::string_view XdsClusterManagerLb::ClusterChild::Helper::GetAuthority() { ->GetAuthority(); } -EventEngine* XdsClusterManagerLb::ClusterChild::Helper::GetEventEngine() { - return xds_cluster_manager_child_->xds_cluster_manager_policy_ - ->channel_control_helper() - ->GetEventEngine(); -} - void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index ff6e67feb9f..5ebbcd8f9ef 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -34,7 +34,6 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" -#include #include #include #include @@ -379,7 +378,6 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { // client, which is a watch-based API. void RequestReresolution() override {} absl::string_view GetAuthority() override; - grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -456,12 +454,6 @@ absl::string_view XdsClusterResolverLb::Helper::GetAuthority() { return xds_cluster_resolver_policy_->channel_control_helper()->GetAuthority(); } -grpc_event_engine::experimental::EventEngine* -XdsClusterResolverLb::Helper::GetEventEngine() { - return xds_cluster_resolver_policy_->channel_control_helper() - ->GetEventEngine(); -} - void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (xds_cluster_resolver_policy_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc index 7371b9203b2..1f7d1c83936 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc @@ -28,7 +28,6 @@ #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" -#include #include #include @@ -130,7 +129,6 @@ class XdsWrrLocalityLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; - grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -267,7 +265,7 @@ OrphanablePtr XdsWrrLocalityLb::CreateChildPolicyLocked( lb_policy_args.work_serializer = work_serializer(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = - std::make_unique(Ref(DEBUG_LOCATION, "Helper")); + std::make_unique(this->Ref(DEBUG_LOCATION, "Helper")); auto lb_policy = CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy( "weighted_target_experimental", std::move(lb_policy_args)); @@ -315,11 +313,6 @@ absl::string_view XdsWrrLocalityLb::Helper::GetAuthority() { return xds_wrr_locality_->channel_control_helper()->GetAuthority(); } -grpc_event_engine::experimental::EventEngine* -XdsWrrLocalityLb::Helper::GetEventEngine() { - return xds_wrr_locality_->channel_control_helper()->GetEventEngine(); -} - void XdsWrrLocalityLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { xds_wrr_locality_->channel_control_helper()->AddTraceEvent(severity, message); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 89a29566f72..24048b35e5b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -43,6 +43,7 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/channel_stack_builder_impl.h" #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" @@ -50,6 +51,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" @@ -82,8 +84,6 @@ namespace grpc_core { -using ::grpc_event_engine::experimental::EventEngine; - TraceFlag grpc_trace_subchannel(false, "subchannel"); DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); @@ -626,7 +626,7 @@ Subchannel::Subchannel(SubchannelKey key, pollset_set_(grpc_pollset_set_create()), connector_(std::move(connector)), backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)), - event_engine_(args_.GetObjectRef()) { + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { // A grpc_init is added here to ensure that grpc_shutdown does not happen // until the subchannel is destroyed. Subchannels can persist longer than // channels because they maybe reused/shared among multiple channels. As a @@ -763,7 +763,7 @@ void Subchannel::ResetBackoff() { MutexLock lock(&mu_); backoff_.Reset(); if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && - event_engine_->Cancel(retry_timer_handle_)) { + engine_->Cancel(retry_timer_handle_)) { OnRetryTimerLocked(); } else if (state_ == GRPC_CHANNEL_CONNECTING) { next_attempt_time_ = Timestamp::Now(); @@ -912,7 +912,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { time_until_next_attempt.millis()); SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error)); - retry_timer_handle_ = event_engine_->RunAfter( + retry_timer_handle_ = engine_->RunAfter( time_until_next_attempt, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { { @@ -933,9 +933,9 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { bool Subchannel::PublishTransportLocked() { // Construct channel stack. - ChannelStackBuilderImpl builder("subchannel", GRPC_CLIENT_SUBCHANNEL, - connecting_result_.channel_args); - builder.SetTransport(connecting_result_.transport); + ChannelStackBuilderImpl builder("subchannel", GRPC_CLIENT_SUBCHANNEL); + builder.SetChannelArgs(connecting_result_.channel_args) + .SetTransport(connecting_result_.transport); if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) { return false; } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 26247da98f0..aa8bd1daa9b 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -427,7 +427,7 @@ class Subchannel : public DualRefCounted { // Data producer map. std::map data_producer_map_ ABSL_GUARDED_BY(mu_); - std::shared_ptr event_engine_; + std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 5d3bd2c9de0..23e68ff1c96 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -34,7 +34,6 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" -#include #include #include "src/core/lib/avl/avl.h" @@ -46,11 +45,6 @@ #include "src/core/lib/gprpp/time.h" #include "src/core/lib/surface/channel_stack_type.h" -// TODO(hork): When we're ready to allow setting via a channel arg from the -// application, replace this with a macro in -// include/grpc/impl/codegen/grpc_types.h. -#define GRPC_INTERNAL_ARG_EVENT_ENGINE "grpc.internal.event_engine" - // Channel args are intentionally immutable, to avoid the need for locking. namespace grpc_core { @@ -176,9 +170,6 @@ template struct WrapInSharedPtr : std::integral_constant< bool, std::is_base_of, T>::value> {}; -template <> -struct WrapInSharedPtr - : std::true_type {}; template struct GetObjectImpl; // std::shared_ptr implementation @@ -222,13 +213,7 @@ template struct ChannelArgNameTraits> { static absl::string_view ChannelArgName() { return T::ChannelArgName(); } }; -// Specialization for the EventEngine -template <> -struct ChannelArgNameTraits { - static absl::string_view ChannelArgName() { - return GRPC_INTERNAL_ARG_EVENT_ENGINE; - } -}; + class ChannelArgs { public: class Pointer { diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc index 419493b71d5..b73d38f6322 100644 --- a/src/core/lib/channel/channel_stack.cc +++ b/src/core/lib/channel/channel_stack.cc @@ -30,8 +30,6 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/alloc.h" -using grpc_event_engine::experimental::EventEngine; - grpc_core::TraceFlag grpc_trace_channel(false, "channel"); grpc_core::TraceFlag grpc_trace_channel_stack(false, "channel_stack"); @@ -118,7 +116,6 @@ grpc_error_handle grpc_channel_stack_init( } stack->on_destroy.Init([]() {}); - stack->event_engine.Init(channel_args.GetObjectRef()); size_t call_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + @@ -178,7 +175,6 @@ void grpc_channel_stack_destroy(grpc_channel_stack* stack) { (*stack->on_destroy)(); stack->on_destroy.Destroy(); - stack->event_engine.Destroy(); } grpc_error_handle grpc_call_stack_init( diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index c766ec7c2eb..6d19e126c12 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -49,9 +49,7 @@ #include #include -#include -#include #include #include #include @@ -214,14 +212,6 @@ struct grpc_channel_stack { // should look like and this can go. grpc_core::ManualConstructor> on_destroy; - grpc_core::ManualConstructor< - std::shared_ptr> - event_engine; - - grpc_event_engine::experimental::EventEngine* EventEngine() const { - return event_engine->get(); - } - // Minimal infrastructure to act like a RefCounted thing without converting // everything. // It's likely that we'll want to replace grpc_channel_stack with something diff --git a/src/core/lib/channel/channel_stack_builder.cc b/src/core/lib/channel/channel_stack_builder.cc index 9c86704e422..45ef965a5b3 100644 --- a/src/core/lib/channel/channel_stack_builder.cc +++ b/src/core/lib/channel/channel_stack_builder.cc @@ -26,10 +26,7 @@ namespace grpc_core { -ChannelStackBuilder::ChannelStackBuilder(const char* name, - grpc_channel_stack_type type, - const ChannelArgs& channel_args) - : name_(name), type_(type), args_(channel_args) {} +ChannelStackBuilder::~ChannelStackBuilder() = default; ChannelStackBuilder& ChannelStackBuilder::SetTarget(const char* target) { if (target == nullptr) { @@ -40,6 +37,12 @@ ChannelStackBuilder& ChannelStackBuilder::SetTarget(const char* target) { return *this; } +ChannelStackBuilder& ChannelStackBuilder::SetChannelArgs( + const ChannelArgs& args) { + args_ = args; + return *this; +} + void ChannelStackBuilder::PrependFilter(const grpc_channel_filter* filter) { stack_.insert(stack_.begin(), filter); } diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index ed0315847e6..f08efa72230 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -41,9 +41,8 @@ namespace grpc_core { class ChannelStackBuilder { public: // Initialize with a name. - // channel_args *must be* preconditioned already. - ChannelStackBuilder(const char* name, grpc_channel_stack_type type, - const ChannelArgs& channel_args); + ChannelStackBuilder(const char* name, grpc_channel_stack_type type) + : name_(name), type_(type) {} const char* name() const { return name_; } @@ -63,6 +62,9 @@ class ChannelStackBuilder { // Query the transport. grpc_transport* transport() const { return transport_; } + // Set channel args. + ChannelStackBuilder& SetChannelArgs(const ChannelArgs& args); + // Query the channel args. const ChannelArgs& channel_args() const { return args_; } @@ -96,7 +98,7 @@ class ChannelStackBuilder { virtual absl::StatusOr> Build() = 0; protected: - ~ChannelStackBuilder() = default; + ~ChannelStackBuilder(); private: static std::string unknown_target() { return "unknown"; } diff --git a/src/core/lib/event_engine/default_event_engine.cc b/src/core/lib/event_engine/default_event_engine.cc index 56ce9babe2b..6e46e58991e 100644 --- a/src/core/lib/event_engine/default_event_engine.cc +++ b/src/core/lib/event_engine/default_event_engine.cc @@ -23,9 +23,6 @@ #include -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/channel_args_preconditioning.h" -#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/default_event_engine_factory.h" #include "src/core/lib/event_engine/trace.h" @@ -77,19 +74,5 @@ std::shared_ptr GetDefaultEventEngine() { return engine; } -namespace { -grpc_core::ChannelArgs EnsureEventEngineInChannelArgs( - grpc_core::ChannelArgs args) { - if (args.ContainsObject()) return args; - return args.SetObject(GetDefaultEventEngine()); -} -} // namespace - -void RegisterEventEngineChannelArgPreconditioning( - grpc_core::CoreConfiguration::Builder* builder) { - builder->channel_args_preconditioning()->RegisterStage( - grpc_event_engine::experimental::EnsureEventEngineInChannelArgs); -} - } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/default_event_engine.h b/src/core/lib/event_engine/default_event_engine.h index 4a65bd06d9a..e116a471a0a 100644 --- a/src/core/lib/event_engine/default_event_engine.h +++ b/src/core/lib/event_engine/default_event_engine.h @@ -21,7 +21,6 @@ #include -#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/promise/context.h" namespace grpc_core { @@ -38,11 +37,6 @@ namespace experimental { /// Strongly consider whether you could use \a CreateEventEngine instead. std::shared_ptr GetDefaultEventEngine(); -/// On ingress, ensure that an EventEngine exists in channel args via -/// preconditioning. -void RegisterEventEngineChannelArgPreconditioning( - grpc_core::CoreConfiguration::Builder* builder); - } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/load_balancing/lb_policy.h b/src/core/lib/load_balancing/lb_policy.h index d68bf11ff08..96ed933136d 100644 --- a/src/core/lib/load_balancing/lb_policy.h +++ b/src/core/lib/load_balancing/lb_policy.h @@ -33,7 +33,6 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" -#include #include #include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" @@ -292,9 +291,6 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Returns the channel authority. virtual absl::string_view GetAuthority() = 0; - /// Returns the EventEngine to use for timers and async work. - virtual grpc_event_engine::experimental::EventEngine* GetEventEngine() = 0; - /// Adds a trace message associated with the channel. enum TraceSeverity { TRACE_INFO, TRACE_WARNING, TRACE_ERROR }; virtual void AddTraceEvent(TraceSeverity severity, diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 561ab26ad47..fd33ec61d7a 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -170,12 +170,44 @@ void channelz_node_destroy(void* p) { int channelz_node_cmp(void* p1, void* p2) { return QsortCompare(p1, p2); } const grpc_arg_pointer_vtable channelz_node_arg_vtable = { channelz_node_copy, channelz_node_destroy, channelz_node_cmp}; + +void CreateChannelzNode(ChannelStackBuilder* builder) { + auto args = builder->channel_args(); + // Check whether channelz is enabled. + const bool channelz_enabled = args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) + .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT); + if (!channelz_enabled) return; + // Get parameters needed to create the channelz node. + const size_t channel_tracer_max_memory = std::max( + 0, args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE) + .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT)); + const bool is_internal_channel = + args.GetBool(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL).value_or(false); + // Create the channelz node. + std::string target(builder->target()); + RefCountedPtr channelz_node = + MakeRefCounted( + target.c_str(), channel_tracer_max_memory, is_internal_channel); + channelz_node->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Channel created")); + // Add channelz node to channel args. + // We remove the is_internal_channel arg, since we no longer need it. + builder->SetChannelArgs( + args.Remove(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL) + .Set(GRPC_ARG_CHANNELZ_CHANNEL_NODE, + ChannelArgs::Pointer(channelz_node.release(), + &channelz_node_arg_vtable))); +} + } // namespace absl::StatusOr> Channel::Create( const char* target, ChannelArgs args, grpc_channel_stack_type channel_stack_type, grpc_transport* optional_transport) { + ChannelStackBuilderImpl builder( + grpc_channel_stack_type_string(channel_stack_type), channel_stack_type); if (!args.GetString(GRPC_ARG_DEFAULT_AUTHORITY).has_value()) { auto ssl_override = args.GetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG); if (ssl_override.has_value()) { @@ -190,42 +222,15 @@ absl::StatusOr> Channel::Create( args = channel_args_mutator(target, args, channel_stack_type); } } + builder.SetChannelArgs(args).SetTarget(target).SetTransport( + optional_transport); + if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) { + return nullptr; + } // We only need to do this for clients here. For servers, this will be // done in src/core/lib/surface/server.cc. if (grpc_channel_stack_type_is_client(channel_stack_type)) { - // Check whether channelz is enabled. - if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) - .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) { - // Get parameters needed to create the channelz node. - const size_t channel_tracer_max_memory = std::max( - 0, - args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE) - .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT)); - const bool is_internal_channel = - args.GetBool(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL).value_or(false); - // Create the channelz node. - std::string channelz_node_target{target == nullptr ? "unknown" : target}; - RefCountedPtr channelz_node = - MakeRefCounted(channelz_node_target, - channel_tracer_max_memory, - is_internal_channel); - channelz_node->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string("Channel created")); - // Add channelz node to channel args. - // We remove the is_internal_channel arg, since we no longer need it. - args = args.Remove(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL) - .Set(GRPC_ARG_CHANNELZ_CHANNEL_NODE, - ChannelArgs::Pointer(channelz_node.release(), - &channelz_node_arg_vtable)); - } - } - ChannelStackBuilderImpl builder( - grpc_channel_stack_type_string(channel_stack_type), channel_stack_type, - args); - builder.SetTarget(target).SetTransport(optional_transport); - if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) { - return nullptr; + CreateChannelzNode(&builder); } return CreateWithBuilder(&builder); } diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 16562ef786f..f1c5bb0a352 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -26,6 +26,7 @@ #include #include +#include #include #include @@ -45,6 +46,7 @@ #include "src/core/lib/channel/channel_stack.h" // IWYU pragma: keep #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/channelz.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/cpp_impl_of.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -156,7 +158,7 @@ class Channel : public RefCounted, } grpc_event_engine::experimental::EventEngine* event_engine() const { - return channel_stack_->EventEngine(); + return event_engine_.get(); } private: @@ -174,6 +176,8 @@ class Channel : public RefCounted, MemoryAllocator allocator_; std::string target_; const RefCountedPtr channel_stack_; + const std::shared_ptr + event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine(); }; } // namespace grpc_core diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index fb55a55a404..f1b5bea9516 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -25,12 +25,6 @@ #include "src/core/lib/transport/http_connect_handshaker.h" #include "src/core/lib/transport/tcp_connect_handshaker.h" -namespace grpc_event_engine { -namespace experimental { -extern void RegisterEventEngineChannelArgPreconditioning(grpc_core::CoreConfiguration::Builder* builder); -} // namespace experimental -} // namespace grpc_event_engine - namespace grpc_core { extern void BuildClientChannelConfiguration( @@ -69,7 +63,6 @@ extern void RegisterBinderResolver(CoreConfiguration::Builder* builder); #endif void BuildCoreConfiguration(CoreConfiguration::Builder* builder) { - grpc_event_engine::experimental::RegisterEventEngineChannelArgPreconditioning(builder); // The order of the handshaker registration is crucial here. // We want TCP connect handshaker to be registered last so that it is added to // the start of the handshaker list. diff --git a/test/core/channel/channel_args_test.cc b/test/core/channel/channel_args_test.cc index 7388eec422b..4657c77fa25 100644 --- a/test/core/channel/channel_args_test.cc +++ b/test/core/channel/channel_args_test.cc @@ -28,7 +28,6 @@ #include #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -36,9 +35,6 @@ namespace grpc_core { -using ::grpc_event_engine::experimental::CreateEventEngine; -using ::grpc_event_engine::experimental::EventEngine; - TEST(ChannelArgsTest, Noop) { ChannelArgs(); } TEST(ChannelArgsTest, SetGetRemove) { @@ -168,33 +164,6 @@ TEST(ChannelArgsTest, RetrieveRawPointerFromStoredSharedPtr) { EXPECT_EQ(2, shared_obj.use_count()); } -TEST(ChannelArgsTest, StoreSharedPtrEventEngine) { - auto p = std::shared_ptr(CreateEventEngine()); - ChannelArgs a; - a = a.SetObject(p); - Notification signal; - bool triggered = false; - a.GetObjectRef()->Run([&triggered, &signal] { - triggered = true; - signal.Notify(); - }); - signal.WaitForNotification(); - ASSERT_TRUE(triggered); -} - -TEST(ChannelArgsTest, GetNonOwningEventEngine) { - auto p = std::shared_ptr(CreateEventEngine()); - ASSERT_TRUE(p.unique()); - ChannelArgs a; - a = a.SetObject(p); - ASSERT_FALSE(p.unique()); - ASSERT_EQ(p.use_count(), 2); - EventEngine* engine = a.GetObject(); - (void)engine; - // p and the channel args - ASSERT_EQ(p.use_count(), 2); -} - } // namespace grpc_core TEST(GrpcChannelArgsTest, Create) { diff --git a/test/core/channel/channel_stack_builder_test.cc b/test/core/channel/channel_stack_builder_test.cc index 9693ebf6d03..90ee4a86d01 100644 --- a/test/core/channel/channel_stack_builder_test.cc +++ b/test/core/channel/channel_stack_builder_test.cc @@ -119,8 +119,7 @@ bool AddOriginalFilter(ChannelStackBuilder* builder) { } TEST(ChannelStackBuilder, UnknownTarget) { - ChannelStackBuilderImpl builder("alpha-beta-gamma", GRPC_CLIENT_CHANNEL, - ChannelArgs()); + ChannelStackBuilderImpl builder("alpha-beta-gamma", GRPC_CLIENT_CHANNEL); EXPECT_EQ(builder.target(), "unknown"); } diff --git a/test/core/channel/channel_stack_test.cc b/test/core/channel/channel_stack_test.cc index fff9904e0f9..65eea62be53 100644 --- a/test/core/channel/channel_stack_test.cc +++ b/test/core/channel/channel_stack_test.cc @@ -18,31 +18,24 @@ #include "src/core/lib/channel/channel_stack.h" -#include - #include #include "absl/status/status.h" #include "gtest/gtest.h" +#include #include -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/channel_args_preconditioning.h" -#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "test/core/util/test_config.h" static grpc_error_handle channel_init_func(grpc_channel_element* elem, grpc_channel_element_args* args) { - int test_value = grpc_channel_args_find_integer(args->channel_args, - "test_key", {-1, 0, INT_MAX}); - EXPECT_EQ(test_value, 42); - auto* ee = grpc_channel_args_find_pointer< - grpc_event_engine::experimental::EventEngine>( - args->channel_args, GRPC_INTERNAL_ARG_EVENT_ENGINE); - EXPECT_NE(ee, nullptr); + EXPECT_EQ(args->channel_args->num_args, 1); + EXPECT_EQ(args->channel_args->args[0].type, GRPC_ARG_INTEGER); + EXPECT_STREQ(args->channel_args->args[0].key, "test_key"); + EXPECT_EQ(args->channel_args->args[0].value.integer, 42); EXPECT_TRUE(args->is_first); EXPECT_TRUE(args->is_last); *static_cast(elem->channel_data) = 0; @@ -111,14 +104,11 @@ TEST(ChannelStackTest, CreateChannelStack) { channel_stack = static_cast( gpr_malloc(grpc_channel_stack_size(&filters, 1))); - auto channel_args = grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .Set("test_key", 42); ASSERT_TRUE(GRPC_LOG_IF_ERROR( "grpc_channel_stack_init", grpc_channel_stack_init(1, free_channel, channel_stack, &filters, 1, - channel_args, "test", channel_stack))); + grpc_core::ChannelArgs().Set("test_key", 42), + "test", channel_stack))); EXPECT_EQ(channel_stack->count, 1); channel_elem = grpc_channel_stack_element(channel_stack, 0); channel_data = static_cast(channel_elem->channel_data); diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc index 8bef0283ef4..efc342de5d7 100644 --- a/test/core/channel/minimal_stack_is_minimal_test.cc +++ b/test/core/channel/minimal_stack_is_minimal_test.cc @@ -43,6 +43,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/channel_stack_builder_impl.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -128,17 +129,16 @@ int main(int argc, char** argv) { static int check_stack(const char* file, int line, const char* transport_name, grpc_channel_args* init_args, unsigned channel_stack_type, ...) { - grpc_core::ChannelArgs channel_args = - grpc_core::ChannelArgs::FromC(init_args); // create phony channel stack grpc_core::ChannelStackBuilderImpl builder( - "test", static_cast(channel_stack_type), - channel_args); + "test", static_cast(channel_stack_type)); grpc_transport_vtable fake_transport_vtable; memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); fake_transport_vtable.name = transport_name; grpc_transport fake_transport = {&fake_transport_vtable}; - builder.SetTarget("foo.test.google.fr"); + grpc_core::ChannelArgs channel_args = + grpc_core::ChannelArgs::FromC(init_args); + builder.SetTarget("foo.test.google.fr").SetChannelArgs(channel_args); if (transport_name != nullptr) { builder.SetTransport(&fake_transport); } diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 60ea798d90c..e61fc2c3caf 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -38,7 +38,6 @@ #include "absl/types/variant.h" #include "gtest/gtest.h" -#include #include #include #include @@ -48,7 +47,6 @@ #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -239,10 +237,6 @@ class LoadBalancingPolicyTest : public ::testing::Test { absl::string_view GetAuthority() override { return "server.example.com"; } - grpc_event_engine::experimental::EventEngine* GetEventEngine() override { - return grpc_event_engine::experimental::GetDefaultEventEngine().get(); - } - void AddTraceEvent(TraceSeverity, absl::string_view) override {} LoadBalancingPolicyTest* test_; diff --git a/test/core/filters/filter_fuzzer.cc b/test/core/filters/filter_fuzzer.cc index 69a5fd852bf..6b075578199 100644 --- a/test/core/filters/filter_fuzzer.cc +++ b/test/core/filters/filter_fuzzer.cc @@ -43,13 +43,11 @@ #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/lib/channel/call_finalization.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack_builder_impl.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/channel/promise_based_filter.h" -#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/env.h" @@ -249,9 +247,7 @@ RefCountedPtr LoadAuthorizationEngine( template ChannelArgs LoadChannelArgs(const FuzzerChannelArgs& fuzz_args, GlobalObjects* globals) { - ChannelArgs args = CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr); + ChannelArgs args = ChannelArgs().SetObject(ResourceQuota::Default()); for (const auto& arg : fuzz_args) { if (arg.key() == ResourceQuota::ChannelArgName()) { if (arg.value_case() == filter_fuzzer::ChannelArg::kResourceQuota) { @@ -679,8 +675,8 @@ DEFINE_PROTO_FUZZER(const filter_fuzzer::Msg& msg) { grpc_core::ChannelStackBuilderImpl builder( msg.stack_name().c_str(), - static_cast(msg.channel_stack_type()), - channel_args); + static_cast(msg.channel_stack_type())); + builder.SetChannelArgs(channel_args); builder.AppendFilter(filter); const bool is_client = grpc_channel_stack_type_is_client(builder.channel_stack_type()); diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index 016380bb755..4fdca07ed04 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -26,7 +26,6 @@ #include "absl/strings/string_view.h" #include "absl/types/variant.h" -#include #include #include @@ -157,10 +156,6 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy { return parent_->channel_control_helper()->GetAuthority(); } - grpc_event_engine::experimental::EventEngine* GetEventEngine() override { - return parent_->channel_control_helper()->GetEventEngine(); - } - void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); @@ -279,10 +274,6 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy return parent_->channel_control_helper()->GetAuthority(); } - grpc_event_engine::experimental::EventEngine* GetEventEngine() override { - return parent_->channel_control_helper()->GetEventEngine(); - } - void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); @@ -394,10 +385,6 @@ class AddressTestLoadBalancingPolicy : public ForwardingLoadBalancingPolicy { return parent_->channel_control_helper()->GetAuthority(); } - grpc_event_engine::experimental::EventEngine* GetEventEngine() override { - return parent_->channel_control_helper()->GetEventEngine(); - } - void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); @@ -513,10 +500,6 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy { return parent_->channel_control_helper()->GetAuthority(); } - grpc_event_engine::experimental::EventEngine* GetEventEngine() override { - return parent_->channel_control_helper()->GetEventEngine(); - } - void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); @@ -634,10 +617,6 @@ class OobBackendMetricTestLoadBalancingPolicy return parent_->channel_control_helper()->GetAuthority(); } - grpc_event_engine::experimental::EventEngine* GetEventEngine() override { - return parent_->channel_control_helper()->GetEventEngine(); - } - void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); diff --git a/test/core/xds/xds_channel_stack_modifier_test.cc b/test/core/xds/xds_channel_stack_modifier_test.cc index ca026012e87..9f9bfc5d4cd 100644 --- a/test/core/xds/xds_channel_stack_modifier_test.cc +++ b/test/core/xds/xds_channel_stack_modifier_test.cc @@ -90,8 +90,8 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) { grpc_arg arg = channel_stack_modifier->MakeChannelArg(); // Create a phony ChannelStackBuilder object grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); - ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL, - ChannelArgs::FromC(args)); + ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL); + builder.SetChannelArgs(ChannelArgs::FromC(args)); grpc_channel_args_destroy(args); grpc_transport_vtable fake_transport_vtable; memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); @@ -128,8 +128,8 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertionAfterCensus) { grpc_arg arg = channel_stack_modifier->MakeChannelArg(); // Create a phony ChannelStackBuilder object grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); - ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL, - ChannelArgs::FromC(args)); + ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL); + builder.SetChannelArgs(ChannelArgs::FromC(args)); grpc_channel_args_destroy(args); grpc_transport_vtable fake_transport_vtable; memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index a518f852dd4..3b4b57d74b4 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -501,9 +501,7 @@ static void BM_IsolatedFilter(benchmark::State& state) { FakeClientChannelFactory fake_client_channel_factory; grpc_core::ChannelArgs channel_args = - grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) + grpc_core::ChannelArgs() .SetObject(&fake_client_channel_factory) .Set(GRPC_ARG_SERVER_URI, "localhost"); if (fixture.flags & REQUIRES_TRANSPORT) { @@ -698,12 +696,12 @@ class IsolatedCallFixture { // the grpc_shutdown() run by grpc_channel_destroy(). So we need to // call grpc_init() manually here to balance things out. grpc_init(); - grpc_core::ChannelStackBuilderImpl builder( - "phony", GRPC_CLIENT_CHANNEL, - grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr)); + auto args = grpc_core::CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(nullptr); + grpc_core::ChannelStackBuilderImpl builder("phony", GRPC_CLIENT_CHANNEL); builder.SetTarget("phony_target"); + builder.SetChannelArgs(args); builder.AppendFilter(&isolated_call_filter::isolated_call_filter); { grpc_core::ExecCtx exec_ctx; diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index ef75c955a46..df7444dd5d4 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -30,7 +30,6 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint_pair.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -190,18 +189,12 @@ class EndpointPairFixture : public BaseFixture { /* create channel */ { - grpc_core::ChannelArgs c_args; - { - ChannelArguments args; - args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); - fixture_configuration.ApplyCommonChannelArguments(&args); - // precondition - grpc_channel_args tmp_args; - args.SetChannelArgs(&tmp_args); - c_args = grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(&tmp_args); - } + ChannelArguments args; + args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); + fixture_configuration.ApplyCommonChannelArguments(&args); + + grpc_core::ChannelArgs c_args = + grpc_core::ChannelArgs::FromC(args.c_channel_args()); client_transport_ = grpc_create_chttp2_transport(c_args, endpoints.client, true); GPR_ASSERT(client_transport_);