From 23c7e487794aea4f287caa4598cab757e359910c Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Tue, 25 Oct 2022 14:49:59 -0700 Subject: [PATCH] Precondition ChannelArgs with EventEngines (#31166) * Precondition ChannelArgs with EventEngines If an EventEngine is not explicitly provided to ChannelArgs, the default EventEngine will be set when ChannelArgs are preconditioned. * channel_idle_filter: EE from channel_args * grpclb: EE from channel_args * weighted_target: ee from channel_args * sanitize * xds cluster manager * posix native resolver: own an EE ref from iomgr initialization * reviewer feedback * reviewer feedback * iwyu * iwyu * change ownership and remove unneeded methods * clang_format and use consistent engine naming * store EE ref in channel_stack and use it in channel idle filter * don't store a separate shared_ptr in NativeDNSResolver * add GetEventEngine() method to LB policy helper interface * stop holding refs to the EE instance in LB policies * clang-format * change channel stack to get EE instance from channel args * update XdsWrrLocalityLb * fix lb_policy_test * precondition channel_args in ServerBuilder and microbenchmark fixtures * add required engine to channel_stack test * sanitize * dep fix * add EE to filter fuzzer * precondition BM_IsolatedFilter channelargs * fix * remove unused using statement * iwyu again?? * remove preconditioning from C++ surface API * fix bm_call_create * Automated change: Fix sanity tests * iwyu * rm this-> * rm unused deps * add internal EE arg macro * precondition filter_fuzzer * Automated change: Fix sanity tests * iwyu * ChannelStackBuilder requires preconditioned ChannelArgs * iwyu * iwyu again? * rm build.SetChannelArgs; rm unused declaration * fix nullptr string creation Co-authored-by: Mark D. Roth Co-authored-by: drfloob --- BUILD | 2 +- src/core/BUILD | 7 +- .../channel_idle/channel_idle_filter.cc | 29 +++----- .../channel_idle/channel_idle_filter.h | 15 ++-- .../filters/client_channel/client_channel.cc | 5 ++ .../filters/client_channel/dynamic_filters.cc | 3 +- .../lb_policy/child_policy_handler.cc | 5 ++ .../client_channel/lb_policy/grpclb/grpclb.cc | 30 ++++---- .../outlier_detection/outlier_detection.cc | 7 ++ .../lb_policy/priority/priority.cc | 8 +++ .../client_channel/lb_policy/rls/rls.cc | 7 ++ .../weighted_target/weighted_target.cc | 36 ++++++---- .../client_channel/lb_policy/xds/cds.cc | 6 ++ .../lb_policy/xds/xds_cluster_impl.cc | 7 ++ .../lb_policy/xds/xds_cluster_manager.cc | 45 +++++++----- .../lb_policy/xds/xds_cluster_resolver.cc | 8 +++ .../lb_policy/xds/xds_wrr_locality.cc | 9 ++- .../ext/filters/client_channel/subchannel.cc | 16 ++--- .../ext/filters/client_channel/subchannel.h | 2 +- src/core/lib/channel/channel_args.h | 17 ++++- src/core/lib/channel/channel_stack.cc | 4 ++ src/core/lib/channel/channel_stack.h | 10 +++ src/core/lib/channel/channel_stack_builder.cc | 11 ++- src/core/lib/channel/channel_stack_builder.h | 10 ++- .../lib/event_engine/default_event_engine.cc | 17 +++++ .../lib/event_engine/default_event_engine.h | 6 ++ src/core/lib/load_balancing/lb_policy.h | 4 ++ src/core/lib/surface/channel.cc | 71 +++++++++---------- src/core/lib/surface/channel.h | 6 +- .../plugin_registry/grpc_plugin_registry.cc | 7 ++ test/core/channel/channel_args_test.cc | 31 ++++++++ .../channel/channel_stack_builder_test.cc | 3 +- test/core/channel/channel_stack_test.cc | 24 +++++-- .../channel/minimal_stack_is_minimal_test.cc | 10 +-- .../lb_policy/lb_policy_test_lib.h | 6 ++ test/core/filters/filter_fuzzer.cc | 10 ++- test/core/util/test_lb_policies.cc | 21 ++++++ .../xds/xds_channel_stack_modifier_test.cc | 8 +-- test/cpp/microbenchmarks/bm_call_create.cc | 14 ++-- test/cpp/microbenchmarks/fullstack_fixtures.h | 19 +++-- 40 files changed, 374 insertions(+), 182 deletions(-) diff --git a/BUILD b/BUILD index 135a5c083c5..be97a52ae94 100644 --- a/BUILD +++ b/BUILD @@ -1794,6 +1794,7 @@ grpc_cc_library( "absl/status:statusor", "absl/strings", "absl/synchronization", + "absl/types:optional", "absl/memory", "upb_lib", "protobuf_headers", @@ -2670,7 +2671,6 @@ grpc_cc_library( "//src/core:channel_init", "//src/core:channel_stack_type", "//src/core:construct_destruct", - "//src/core:default_event_engine", "//src/core:dual_ref_counted", "//src/core:env", "//src/core:gpr_atm", diff --git a/src/core/BUILD b/src/core/BUILD index 9d59c5938eb..1cfe7933c70 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1809,10 +1809,13 @@ grpc_cc_library( ], external_deps = ["absl/functional:any_invocable"], deps = [ + "channel_args", + "channel_args_preconditioning", "context", "default_event_engine_factory", "event_engine_trace", "no_destruct", + "//:config", "//:event_engine_base_hdrs", "//:gpr", "//:grpc_trace", @@ -2958,7 +2961,6 @@ grpc_cc_library( "channel_init", "channel_stack_type", "closure", - "default_event_engine", "exec_ctx_wakeup_scheduler", "http2_errors", "idle_filter_state", @@ -3189,7 +3191,6 @@ grpc_cc_library( "channel_fwd", "channel_init", "channel_stack_type", - "default_event_engine", "gpr_atm", "grpc_sockaddr", "json", @@ -3751,7 +3752,6 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", - "default_event_engine", "grpc_resolver_xds_header", "json", "json_args", @@ -4076,7 +4076,6 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", - "default_event_engine", "grpc_lb_address_filtering", "json", "json_args", diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.cc b/src/core/ext/filters/channel_idle/channel_idle_filter.cc index 0ac7cf61a3b..479953a0df6 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.cc +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.cc @@ -26,7 +26,6 @@ #include "absl/types/optional.h" -#include #include #include @@ -35,7 +34,6 @@ #include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/status_helper.h" @@ -56,8 +54,6 @@ namespace grpc_core { namespace { -using ::grpc_event_engine::experimental::EventEngine; - // TODO(ctiller): The idle filter was disabled in client channel by default // due to b/143502997. Now the bug is fixed enable the filter by default. const auto kDefaultIdleTimeout = Duration::Infinity(); @@ -125,19 +121,15 @@ struct MaxAgeFilter::Config { absl::StatusOr ClientIdleFilter::Create( const ChannelArgs& args, ChannelFilter::Args filter_args) { - // TODO(hork): pull EventEngine from args - ClientIdleFilter filter( - filter_args.channel_stack(), GetClientIdleTimeout(args), - grpc_event_engine::experimental::GetDefaultEventEngine()); + ClientIdleFilter filter(filter_args.channel_stack(), + GetClientIdleTimeout(args)); return absl::StatusOr(std::move(filter)); } absl::StatusOr MaxAgeFilter::Create( const ChannelArgs& args, ChannelFilter::Args filter_args) { - // TODO(hork): pull EventEngine from args MaxAgeFilter filter(filter_args.channel_stack(), - Config::FromChannelArgs(args), - grpc_event_engine::experimental::GetDefaultEventEngine()); + Config::FromChannelArgs(args)); return absl::StatusOr(std::move(filter)); } @@ -211,7 +203,7 @@ void MaxAgeFilter::PostInit() { // (if it did not, it was cancelled) if (status.ok()) CloseChannel(); }, - engine_.get())); + channel_stack->EventEngine())); } } @@ -272,7 +264,7 @@ void ChannelIdleFilter::StartIdleTimer() { [channel_stack, this](absl::Status status) { if (status.ok()) CloseChannel(); }, - engine_.get())); + channel_stack->EventEngine())); } void ChannelIdleFilter::CloseChannel() { @@ -314,13 +306,10 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) { }); } -MaxAgeFilter::MaxAgeFilter( - grpc_channel_stack* channel_stack, const Config& max_age_config, - std::shared_ptr engine) - : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle, - engine), +MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack, + const Config& max_age_config) + : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle), max_connection_age_(max_age_config.max_connection_age), - max_connection_age_grace_(max_age_config.max_connection_age_grace), - engine_(engine) {} + max_connection_age_grace_(max_age_config.max_connection_age_grace) {} } // namespace grpc_core diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.h b/src/core/ext/filters/channel_idle/channel_idle_filter.h index bc49a2a0174..d926d09f71b 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.h +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.h @@ -22,7 +22,6 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" -#include #include #include "src/core/ext/filters/channel_idle/idle_filter_state.h" @@ -59,12 +58,10 @@ class ChannelIdleFilter : public ChannelFilter { using SingleSetActivityPtr = SingleSetPtr; - ChannelIdleFilter( - grpc_channel_stack* channel_stack, Duration client_idle_timeout, - std::shared_ptr engine) + ChannelIdleFilter(grpc_channel_stack* channel_stack, + Duration client_idle_timeout) : channel_stack_(channel_stack), - client_idle_timeout_(client_idle_timeout), - engine_(engine) {} + client_idle_timeout_(client_idle_timeout) {} grpc_channel_stack* channel_stack() { return channel_stack_; }; @@ -90,7 +87,6 @@ class ChannelIdleFilter : public ChannelFilter { std::make_shared(false)}; SingleSetActivityPtr activity_; - std::shared_ptr engine_; }; class ClientIdleFilter final : public ChannelIdleFilter { @@ -131,16 +127,13 @@ class MaxAgeFilter final : public ChannelIdleFilter { MaxAgeFilter* filter_; }; - MaxAgeFilter( - grpc_channel_stack* channel_stack, const Config& max_age_config, - std::shared_ptr engine); + MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config); void Shutdown() override; SingleSetActivityPtr max_age_activity_; Duration max_connection_age_; Duration max_connection_age_grace_; - std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 31f573a0e12..a3c99194c44 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -37,6 +37,7 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" +#include #include #include #include @@ -915,6 +916,10 @@ class ClientChannel::ClientChannelControlHelper return chand_->default_authority_; } + grpc_event_engine::experimental::EventEngine* GetEventEngine() override { + return chand_->owning_stack_->EventEngine(); + } + void AddTraceEvent(TraceSeverity severity, absl::string_view message) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { if (chand_->resolver_ == nullptr) return; // Shutting down. diff --git a/src/core/ext/filters/client_channel/dynamic_filters.cc b/src/core/ext/filters/client_channel/dynamic_filters.cc index c023dc71c72..6d0a015b4ea 100644 --- a/src/core/ext/filters/client_channel/dynamic_filters.cc +++ b/src/core/ext/filters/client_channel/dynamic_filters.cc @@ -140,8 +140,7 @@ namespace { absl::StatusOr> CreateChannelStack( const ChannelArgs& args, std::vector filters) { - ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC); - builder.SetChannelArgs(args); + ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC, args); for (auto filter : filters) { builder.AppendFilter(filter); } diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc index 41568e68963..b364b32b137 100644 --- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc @@ -25,6 +25,7 @@ #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" +#include #include #include @@ -107,6 +108,10 @@ class ChildPolicyHandler::Helper return parent_->channel_control_helper()->GetAuthority(); } + grpc_event_engine::experimental::EventEngine* GetEventEngine() override { + return parent_->channel_control_helper()->GetEventEngine(); + } + void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { if (parent_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 66728f792a3..8a907c958bc 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -105,7 +105,6 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" @@ -303,7 +302,6 @@ class GrpcLb : public LoadBalancingPolicy { bool client_load_report_is_due_ = false; // The closure used for the completion of sending the load report. grpc_closure client_load_report_done_closure_; - std::shared_ptr engine_; }; class SubchannelWrapper : public DelegatingSubchannel { @@ -469,6 +467,7 @@ class GrpcLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -872,6 +871,10 @@ absl::string_view GrpcLb::Helper::GetAuthority() { return parent_->channel_control_helper()->GetAuthority(); } +grpc_event_engine::experimental::EventEngine* GrpcLb::Helper::GetEventEngine() { + return parent_->channel_control_helper()->GetEventEngine(); +} + void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (parent_->shutting_down_) return; @@ -887,8 +890,7 @@ GrpcLb::BalancerCallState::BalancerCallState( : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState" : nullptr), - grpclb_policy_(std::move(parent_grpclb_policy)), - engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { + grpclb_policy_(std::move(parent_grpclb_policy)) { GPR_ASSERT(grpclb_policy_ != nullptr); GPR_ASSERT(!grpclb_policy()->shutting_down_); // Init the LB call. Note that the LB call will progress every time there's @@ -946,7 +948,8 @@ void GrpcLb::BalancerCallState::Orphan() { // call, then the following cancellation will be a no-op. grpc_call_cancel_internal(lb_call_); if (client_load_report_handle_.has_value() && - engine_->Cancel(client_load_report_handle_.value())) { + grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel( + client_load_report_handle_.value())) { Unref(DEBUG_LOCATION, "client_load_report cancelled"); } // Note that the initial ref is hold by lb_on_balancer_status_received_ @@ -1032,12 +1035,13 @@ void GrpcLb::BalancerCallState::StartQuery() { void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { client_load_report_handle_ = - engine_->RunAfter(client_stats_report_interval_, [this] { - ApplicationCallbackExecCtx callback_exec_ctx; - ExecCtx exec_ctx; - grpclb_policy()->work_serializer()->Run( - [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION); - }); + grpclb_policy()->channel_control_helper()->GetEventEngine()->RunAfter( + client_stats_report_interval_, [this] { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + grpclb_policy()->work_serializer()->Run( + [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION); + }); } void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() { @@ -1903,13 +1907,10 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { } // namespace -} // namespace grpc_core - // // Plugin registration // -namespace grpc_core { void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) { builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory( std::make_unique()); @@ -1928,4 +1929,5 @@ void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) { return true; }); } + } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc index b5047a79788..295fbb4ca55 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc @@ -38,6 +38,7 @@ #include "absl/strings/string_view.h" #include "absl/types/variant.h" +#include #include #include @@ -345,6 +346,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -769,6 +771,11 @@ absl::string_view OutlierDetectionLb::Helper::GetAuthority() { return outlier_detection_policy_->channel_control_helper()->GetAuthority(); } +grpc_event_engine::experimental::EventEngine* +OutlierDetectionLb::Helper::GetEventEngine() { + return outlier_detection_policy_->channel_control_helper()->GetEventEngine(); +} + void OutlierDetectionLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (outlier_detection_policy_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc index 8913ac88c5e..c5e114bebfb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc +++ b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc @@ -33,6 +33,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include #include #include #include @@ -197,6 +198,7 @@ class PriorityLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -849,6 +851,12 @@ absl::string_view PriorityLb::ChildPriority::Helper::GetAuthority() { return priority_->priority_policy_->channel_control_helper()->GetAuthority(); } +grpc_event_engine::experimental::EventEngine* +PriorityLb::ChildPriority::Helper::GetEventEngine() { + return priority_->priority_policy_->channel_control_helper() + ->GetEventEngine(); +} + void PriorityLb::ChildPriority::Helper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (priority_->priority_policy_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index 363a9f191e8..28e7e59cddd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -53,6 +53,7 @@ #include #include +#include #include #include #include @@ -335,6 +336,7 @@ class RlsLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -915,6 +917,11 @@ absl::string_view RlsLb::ChildPolicyWrapper::ChildPolicyHelper::GetAuthority() { return wrapper_->lb_policy_->channel_control_helper()->GetAuthority(); } +grpc_event_engine::experimental::EventEngine* +RlsLb::ChildPolicyWrapper::ChildPolicyHelper::GetEventEngine() { + return wrapper_->lb_policy_->channel_control_helper()->GetEventEngine(); +} + void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (wrapper_->is_shutdown_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index a8cf390873c..3fe4675fd7a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -43,7 +43,6 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -193,6 +192,7 @@ class WeightedTargetLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -212,7 +212,6 @@ class WeightedTargetLb : public LoadBalancingPolicy { RefCountedPtr weighted_child_; absl::optional timer_handle_; - std::shared_ptr engine_; }; // Methods for dealing with the child policy. @@ -481,16 +480,17 @@ void WeightedTargetLb::UpdateStateLocked() { WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer( RefCountedPtr weighted_child) - : weighted_child_(std::move(weighted_child)), - engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { + : weighted_child_(std::move(weighted_child)) { timer_handle_ = - engine_->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable { - ApplicationCallbackExecCtx app_exec_ctx; - ExecCtx exec_ctx; - self->weighted_child_->weighted_target_policy_->work_serializer()->Run( - [self = std::move(self)] { self->OnTimerLocked(); }, - DEBUG_LOCATION); - }); + weighted_child_->weighted_target_policy_->channel_control_helper() + ->GetEventEngine() + ->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable { + ApplicationCallbackExecCtx app_exec_ctx; + ExecCtx exec_ctx; + self->weighted_child_->weighted_target_policy_->work_serializer() + ->Run([self = std::move(self)] { self->OnTimerLocked(); }, + DEBUG_LOCATION); + }); } void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() { @@ -502,7 +502,9 @@ void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() { weighted_child_->weighted_target_policy_.get(), weighted_child_.get(), weighted_child_->name_.c_str()); } - engine_->Cancel(*timer_handle_); + weighted_child_->weighted_target_policy_->channel_control_helper() + ->GetEventEngine() + ->Cancel(*timer_handle_); } Unref(); } @@ -701,6 +703,12 @@ absl::string_view WeightedTargetLb::WeightedChild::Helper::GetAuthority() { ->GetAuthority(); } +grpc_event_engine::experimental::EventEngine* +WeightedTargetLb::WeightedChild::Helper::GetEventEngine() { + return weighted_child_->weighted_target_policy_->channel_control_helper() + ->GetEventEngine(); +} + void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (weighted_child_->weighted_target_policy_->shutting_down_) return; @@ -754,7 +762,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return MakeOrphanable(std::move(args)); - } // namespace + } absl::string_view name() const override { return kWeightedTarget; } @@ -771,7 +779,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { return LoadRefCountedFromJson( json, JsonArgs(), "errors validating weighted_target LB policy config"); } -}; // namespace grpc_core +}; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 5bc13e7aece..ea2b91fe55d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -30,6 +30,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include #include #include #include @@ -176,6 +177,7 @@ class CdsLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -263,6 +265,10 @@ absl::string_view CdsLb::Helper::GetAuthority() { return parent_->channel_control_helper()->GetAuthority(); } +grpc_event_engine::experimental::EventEngine* CdsLb::Helper::GetEventEngine() { + return parent_->channel_control_helper()->GetEventEngine(); +} + void CdsLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (parent_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index fd1c72ba8ae..19057e90a05 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -34,6 +34,7 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" +#include #include #include @@ -253,6 +254,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -679,6 +681,11 @@ absl::string_view XdsClusterImplLb::Helper::GetAuthority() { return xds_cluster_impl_policy_->channel_control_helper()->GetAuthority(); } +grpc_event_engine::experimental::EventEngine* +XdsClusterImplLb::Helper::GetEventEngine() { + return xds_cluster_impl_policy_->channel_control_helper()->GetEventEngine(); +} + void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (xds_cluster_impl_policy_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc index c673b63f45a..6fba5d6e3cd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc @@ -42,7 +42,6 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -69,7 +68,6 @@ TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb"); namespace { using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; constexpr Duration kChildRetentionInterval = Duration::Minutes(15); constexpr absl::string_view kXdsClusterManager = @@ -197,6 +195,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; + EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -224,7 +223,6 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { // States for delayed removal. absl::optional delayed_removal_timer_handle_; bool shutdown_ = false; - std::shared_ptr engine_; }; ~XdsClusterManagerLb() override; @@ -430,8 +428,7 @@ XdsClusterManagerLb::ClusterChild::ClusterChild( RefCountedPtr xds_cluster_manager_policy, const std::string& name) : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)), - name_(name), - engine_(GetDefaultEventEngine()) { + name_(name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] created ClusterChild %p for %s", @@ -466,7 +463,9 @@ void XdsClusterManagerLb::ClusterChild::Orphan() { // the child. picker_wrapper_.reset(); if (delayed_removal_timer_handle_.has_value()) { - engine_->Cancel(*delayed_removal_timer_handle_); + xds_cluster_manager_policy_->channel_control_helper() + ->GetEventEngine() + ->Cancel(*delayed_removal_timer_handle_); } shutdown_ = true; Unref(); @@ -509,7 +508,9 @@ absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked( // Update child weight. // Reactivate if needed. if (delayed_removal_timer_handle_.has_value() && - engine_->Cancel(*delayed_removal_timer_handle_)) { + xds_cluster_manager_policy_->channel_control_helper() + ->GetEventEngine() + ->Cancel(*delayed_removal_timer_handle_)) { delayed_removal_timer_handle_.reset(); } // Create child policy if needed. @@ -544,17 +545,21 @@ void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() { void XdsClusterManagerLb::ClusterChild::DeactivateLocked() { // If already deactivated, don't do that again. if (delayed_removal_timer_handle_.has_value()) return; - // Set the child weight to 0 so that future picker won't contain this child. // Start a timer to delete the child. - delayed_removal_timer_handle_ = engine_->RunAfter( - kChildRetentionInterval, - [self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable { - ApplicationCallbackExecCtx application_exec_ctx; - ExecCtx exec_ctx; - self->xds_cluster_manager_policy_->work_serializer()->Run( - [self = std::move(self)]() { self->OnDelayedRemovalTimerLocked(); }, - DEBUG_LOCATION); - }); + delayed_removal_timer_handle_ = + xds_cluster_manager_policy_->channel_control_helper() + ->GetEventEngine() + ->RunAfter( + kChildRetentionInterval, + [self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable { + ApplicationCallbackExecCtx application_exec_ctx; + ExecCtx exec_ctx; + self->xds_cluster_manager_policy_->work_serializer()->Run( + [self = std::move(self)]() { + self->OnDelayedRemovalTimerLocked(); + }, + DEBUG_LOCATION); + }); } void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked() { @@ -625,6 +630,12 @@ absl::string_view XdsClusterManagerLb::ClusterChild::Helper::GetAuthority() { ->GetAuthority(); } +EventEngine* XdsClusterManagerLb::ClusterChild::Helper::GetEventEngine() { + return xds_cluster_manager_child_->xds_cluster_manager_policy_ + ->channel_control_helper() + ->GetEventEngine(); +} + void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 5ebbcd8f9ef..ff6e67feb9f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -34,6 +34,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include #include #include #include @@ -378,6 +379,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { // client, which is a watch-based API. void RequestReresolution() override {} absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -454,6 +456,12 @@ absl::string_view XdsClusterResolverLb::Helper::GetAuthority() { return xds_cluster_resolver_policy_->channel_control_helper()->GetAuthority(); } +grpc_event_engine::experimental::EventEngine* +XdsClusterResolverLb::Helper::GetEventEngine() { + return xds_cluster_resolver_policy_->channel_control_helper() + ->GetEventEngine(); +} + void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (xds_cluster_resolver_policy_->shutting_down_) return; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc index 1f7d1c83936..7371b9203b2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc @@ -28,6 +28,7 @@ #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" +#include #include #include @@ -129,6 +130,7 @@ class XdsWrrLocalityLb : public LoadBalancingPolicy { std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; @@ -265,7 +267,7 @@ OrphanablePtr XdsWrrLocalityLb::CreateChildPolicyLocked( lb_policy_args.work_serializer = work_serializer(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = - std::make_unique(this->Ref(DEBUG_LOCATION, "Helper")); + std::make_unique(Ref(DEBUG_LOCATION, "Helper")); auto lb_policy = CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy( "weighted_target_experimental", std::move(lb_policy_args)); @@ -313,6 +315,11 @@ absl::string_view XdsWrrLocalityLb::Helper::GetAuthority() { return xds_wrr_locality_->channel_control_helper()->GetAuthority(); } +grpc_event_engine::experimental::EventEngine* +XdsWrrLocalityLb::Helper::GetEventEngine() { + return xds_wrr_locality_->channel_control_helper()->GetEventEngine(); +} + void XdsWrrLocalityLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { xds_wrr_locality_->channel_control_helper()->AddTraceEvent(severity, message); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 24048b35e5b..89a29566f72 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -43,7 +43,6 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/channel_stack_builder_impl.h" #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" @@ -51,7 +50,6 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" @@ -84,6 +82,8 @@ namespace grpc_core { +using ::grpc_event_engine::experimental::EventEngine; + TraceFlag grpc_trace_subchannel(false, "subchannel"); DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); @@ -626,7 +626,7 @@ Subchannel::Subchannel(SubchannelKey key, pollset_set_(grpc_pollset_set_create()), connector_(std::move(connector)), backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)), - engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { + event_engine_(args_.GetObjectRef()) { // A grpc_init is added here to ensure that grpc_shutdown does not happen // until the subchannel is destroyed. Subchannels can persist longer than // channels because they maybe reused/shared among multiple channels. As a @@ -763,7 +763,7 @@ void Subchannel::ResetBackoff() { MutexLock lock(&mu_); backoff_.Reset(); if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && - engine_->Cancel(retry_timer_handle_)) { + event_engine_->Cancel(retry_timer_handle_)) { OnRetryTimerLocked(); } else if (state_ == GRPC_CHANNEL_CONNECTING) { next_attempt_time_ = Timestamp::Now(); @@ -912,7 +912,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { time_until_next_attempt.millis()); SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error)); - retry_timer_handle_ = engine_->RunAfter( + retry_timer_handle_ = event_engine_->RunAfter( time_until_next_attempt, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { { @@ -933,9 +933,9 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { bool Subchannel::PublishTransportLocked() { // Construct channel stack. - ChannelStackBuilderImpl builder("subchannel", GRPC_CLIENT_SUBCHANNEL); - builder.SetChannelArgs(connecting_result_.channel_args) - .SetTransport(connecting_result_.transport); + ChannelStackBuilderImpl builder("subchannel", GRPC_CLIENT_SUBCHANNEL, + connecting_result_.channel_args); + builder.SetTransport(connecting_result_.transport); if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) { return false; } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index aa8bd1daa9b..26247da98f0 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -427,7 +427,7 @@ class Subchannel : public DualRefCounted { // Data producer map. std::map data_producer_map_ ABSL_GUARDED_BY(mu_); - std::shared_ptr engine_; + std::shared_ptr event_engine_; }; } // namespace grpc_core diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 23e68ff1c96..5d3bd2c9de0 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -34,6 +34,7 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" +#include #include #include "src/core/lib/avl/avl.h" @@ -45,6 +46,11 @@ #include "src/core/lib/gprpp/time.h" #include "src/core/lib/surface/channel_stack_type.h" +// TODO(hork): When we're ready to allow setting via a channel arg from the +// application, replace this with a macro in +// include/grpc/impl/codegen/grpc_types.h. +#define GRPC_INTERNAL_ARG_EVENT_ENGINE "grpc.internal.event_engine" + // Channel args are intentionally immutable, to avoid the need for locking. namespace grpc_core { @@ -170,6 +176,9 @@ template struct WrapInSharedPtr : std::integral_constant< bool, std::is_base_of, T>::value> {}; +template <> +struct WrapInSharedPtr + : std::true_type {}; template struct GetObjectImpl; // std::shared_ptr implementation @@ -213,7 +222,13 @@ template struct ChannelArgNameTraits> { static absl::string_view ChannelArgName() { return T::ChannelArgName(); } }; - +// Specialization for the EventEngine +template <> +struct ChannelArgNameTraits { + static absl::string_view ChannelArgName() { + return GRPC_INTERNAL_ARG_EVENT_ENGINE; + } +}; class ChannelArgs { public: class Pointer { diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc index b73d38f6322..419493b71d5 100644 --- a/src/core/lib/channel/channel_stack.cc +++ b/src/core/lib/channel/channel_stack.cc @@ -30,6 +30,8 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/alloc.h" +using grpc_event_engine::experimental::EventEngine; + grpc_core::TraceFlag grpc_trace_channel(false, "channel"); grpc_core::TraceFlag grpc_trace_channel_stack(false, "channel_stack"); @@ -116,6 +118,7 @@ grpc_error_handle grpc_channel_stack_init( } stack->on_destroy.Init([]() {}); + stack->event_engine.Init(channel_args.GetObjectRef()); size_t call_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + @@ -175,6 +178,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack* stack) { (*stack->on_destroy)(); stack->on_destroy.Destroy(); + stack->event_engine.Destroy(); } grpc_error_handle grpc_call_stack_init( diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 6d19e126c12..c766ec7c2eb 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -49,7 +49,9 @@ #include #include +#include +#include #include #include #include @@ -212,6 +214,14 @@ struct grpc_channel_stack { // should look like and this can go. grpc_core::ManualConstructor> on_destroy; + grpc_core::ManualConstructor< + std::shared_ptr> + event_engine; + + grpc_event_engine::experimental::EventEngine* EventEngine() const { + return event_engine->get(); + } + // Minimal infrastructure to act like a RefCounted thing without converting // everything. // It's likely that we'll want to replace grpc_channel_stack with something diff --git a/src/core/lib/channel/channel_stack_builder.cc b/src/core/lib/channel/channel_stack_builder.cc index 45ef965a5b3..9c86704e422 100644 --- a/src/core/lib/channel/channel_stack_builder.cc +++ b/src/core/lib/channel/channel_stack_builder.cc @@ -26,7 +26,10 @@ namespace grpc_core { -ChannelStackBuilder::~ChannelStackBuilder() = default; +ChannelStackBuilder::ChannelStackBuilder(const char* name, + grpc_channel_stack_type type, + const ChannelArgs& channel_args) + : name_(name), type_(type), args_(channel_args) {} ChannelStackBuilder& ChannelStackBuilder::SetTarget(const char* target) { if (target == nullptr) { @@ -37,12 +40,6 @@ ChannelStackBuilder& ChannelStackBuilder::SetTarget(const char* target) { return *this; } -ChannelStackBuilder& ChannelStackBuilder::SetChannelArgs( - const ChannelArgs& args) { - args_ = args; - return *this; -} - void ChannelStackBuilder::PrependFilter(const grpc_channel_filter* filter) { stack_.insert(stack_.begin(), filter); } diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index f08efa72230..ed0315847e6 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -41,8 +41,9 @@ namespace grpc_core { class ChannelStackBuilder { public: // Initialize with a name. - ChannelStackBuilder(const char* name, grpc_channel_stack_type type) - : name_(name), type_(type) {} + // channel_args *must be* preconditioned already. + ChannelStackBuilder(const char* name, grpc_channel_stack_type type, + const ChannelArgs& channel_args); const char* name() const { return name_; } @@ -62,9 +63,6 @@ class ChannelStackBuilder { // Query the transport. grpc_transport* transport() const { return transport_; } - // Set channel args. - ChannelStackBuilder& SetChannelArgs(const ChannelArgs& args); - // Query the channel args. const ChannelArgs& channel_args() const { return args_; } @@ -98,7 +96,7 @@ class ChannelStackBuilder { virtual absl::StatusOr> Build() = 0; protected: - ~ChannelStackBuilder(); + ~ChannelStackBuilder() = default; private: static std::string unknown_target() { return "unknown"; } diff --git a/src/core/lib/event_engine/default_event_engine.cc b/src/core/lib/event_engine/default_event_engine.cc index 6e46e58991e..56ce9babe2b 100644 --- a/src/core/lib/event_engine/default_event_engine.cc +++ b/src/core/lib/event_engine/default_event_engine.cc @@ -23,6 +23,9 @@ #include +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_args_preconditioning.h" +#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/default_event_engine_factory.h" #include "src/core/lib/event_engine/trace.h" @@ -74,5 +77,19 @@ std::shared_ptr GetDefaultEventEngine() { return engine; } +namespace { +grpc_core::ChannelArgs EnsureEventEngineInChannelArgs( + grpc_core::ChannelArgs args) { + if (args.ContainsObject()) return args; + return args.SetObject(GetDefaultEventEngine()); +} +} // namespace + +void RegisterEventEngineChannelArgPreconditioning( + grpc_core::CoreConfiguration::Builder* builder) { + builder->channel_args_preconditioning()->RegisterStage( + grpc_event_engine::experimental::EnsureEventEngineInChannelArgs); +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/default_event_engine.h b/src/core/lib/event_engine/default_event_engine.h index e116a471a0a..4a65bd06d9a 100644 --- a/src/core/lib/event_engine/default_event_engine.h +++ b/src/core/lib/event_engine/default_event_engine.h @@ -21,6 +21,7 @@ #include +#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/promise/context.h" namespace grpc_core { @@ -37,6 +38,11 @@ namespace experimental { /// Strongly consider whether you could use \a CreateEventEngine instead. std::shared_ptr GetDefaultEventEngine(); +/// On ingress, ensure that an EventEngine exists in channel args via +/// preconditioning. +void RegisterEventEngineChannelArgPreconditioning( + grpc_core::CoreConfiguration::Builder* builder); + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/load_balancing/lb_policy.h b/src/core/lib/load_balancing/lb_policy.h index 96ed933136d..d68bf11ff08 100644 --- a/src/core/lib/load_balancing/lb_policy.h +++ b/src/core/lib/load_balancing/lb_policy.h @@ -33,6 +33,7 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" +#include #include #include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" @@ -291,6 +292,9 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Returns the channel authority. virtual absl::string_view GetAuthority() = 0; + /// Returns the EventEngine to use for timers and async work. + virtual grpc_event_engine::experimental::EventEngine* GetEventEngine() = 0; + /// Adds a trace message associated with the channel. enum TraceSeverity { TRACE_INFO, TRACE_WARNING, TRACE_ERROR }; virtual void AddTraceEvent(TraceSeverity severity, diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index fd33ec61d7a..561ab26ad47 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -170,44 +170,12 @@ void channelz_node_destroy(void* p) { int channelz_node_cmp(void* p1, void* p2) { return QsortCompare(p1, p2); } const grpc_arg_pointer_vtable channelz_node_arg_vtable = { channelz_node_copy, channelz_node_destroy, channelz_node_cmp}; - -void CreateChannelzNode(ChannelStackBuilder* builder) { - auto args = builder->channel_args(); - // Check whether channelz is enabled. - const bool channelz_enabled = args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) - .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT); - if (!channelz_enabled) return; - // Get parameters needed to create the channelz node. - const size_t channel_tracer_max_memory = std::max( - 0, args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE) - .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT)); - const bool is_internal_channel = - args.GetBool(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL).value_or(false); - // Create the channelz node. - std::string target(builder->target()); - RefCountedPtr channelz_node = - MakeRefCounted( - target.c_str(), channel_tracer_max_memory, is_internal_channel); - channelz_node->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string("Channel created")); - // Add channelz node to channel args. - // We remove the is_internal_channel arg, since we no longer need it. - builder->SetChannelArgs( - args.Remove(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL) - .Set(GRPC_ARG_CHANNELZ_CHANNEL_NODE, - ChannelArgs::Pointer(channelz_node.release(), - &channelz_node_arg_vtable))); -} - } // namespace absl::StatusOr> Channel::Create( const char* target, ChannelArgs args, grpc_channel_stack_type channel_stack_type, grpc_transport* optional_transport) { - ChannelStackBuilderImpl builder( - grpc_channel_stack_type_string(channel_stack_type), channel_stack_type); if (!args.GetString(GRPC_ARG_DEFAULT_AUTHORITY).has_value()) { auto ssl_override = args.GetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG); if (ssl_override.has_value()) { @@ -222,15 +190,42 @@ absl::StatusOr> Channel::Create( args = channel_args_mutator(target, args, channel_stack_type); } } - builder.SetChannelArgs(args).SetTarget(target).SetTransport( - optional_transport); - if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) { - return nullptr; - } // We only need to do this for clients here. For servers, this will be // done in src/core/lib/surface/server.cc. if (grpc_channel_stack_type_is_client(channel_stack_type)) { - CreateChannelzNode(&builder); + // Check whether channelz is enabled. + if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) + .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) { + // Get parameters needed to create the channelz node. + const size_t channel_tracer_max_memory = std::max( + 0, + args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE) + .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT)); + const bool is_internal_channel = + args.GetBool(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL).value_or(false); + // Create the channelz node. + std::string channelz_node_target{target == nullptr ? "unknown" : target}; + RefCountedPtr channelz_node = + MakeRefCounted(channelz_node_target, + channel_tracer_max_memory, + is_internal_channel); + channelz_node->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Channel created")); + // Add channelz node to channel args. + // We remove the is_internal_channel arg, since we no longer need it. + args = args.Remove(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL) + .Set(GRPC_ARG_CHANNELZ_CHANNEL_NODE, + ChannelArgs::Pointer(channelz_node.release(), + &channelz_node_arg_vtable)); + } + } + ChannelStackBuilderImpl builder( + grpc_channel_stack_type_string(channel_stack_type), channel_stack_type, + args); + builder.SetTarget(target).SetTransport(optional_transport); + if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) { + return nullptr; } return CreateWithBuilder(&builder); } diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index f1c5bb0a352..16562ef786f 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -26,7 +26,6 @@ #include #include -#include #include #include @@ -46,7 +45,6 @@ #include "src/core/lib/channel/channel_stack.h" // IWYU pragma: keep #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/channelz.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/cpp_impl_of.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -158,7 +156,7 @@ class Channel : public RefCounted, } grpc_event_engine::experimental::EventEngine* event_engine() const { - return event_engine_.get(); + return channel_stack_->EventEngine(); } private: @@ -176,8 +174,6 @@ class Channel : public RefCounted, MemoryAllocator allocator_; std::string target_; const RefCountedPtr channel_stack_; - const std::shared_ptr - event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine(); }; } // namespace grpc_core diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index f1b5bea9516..fb55a55a404 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -25,6 +25,12 @@ #include "src/core/lib/transport/http_connect_handshaker.h" #include "src/core/lib/transport/tcp_connect_handshaker.h" +namespace grpc_event_engine { +namespace experimental { +extern void RegisterEventEngineChannelArgPreconditioning(grpc_core::CoreConfiguration::Builder* builder); +} // namespace experimental +} // namespace grpc_event_engine + namespace grpc_core { extern void BuildClientChannelConfiguration( @@ -63,6 +69,7 @@ extern void RegisterBinderResolver(CoreConfiguration::Builder* builder); #endif void BuildCoreConfiguration(CoreConfiguration::Builder* builder) { + grpc_event_engine::experimental::RegisterEventEngineChannelArgPreconditioning(builder); // The order of the handshaker registration is crucial here. // We want TCP connect handshaker to be registered last so that it is added to // the start of the handshaker list. diff --git a/test/core/channel/channel_args_test.cc b/test/core/channel/channel_args_test.cc index 4657c77fa25..7388eec422b 100644 --- a/test/core/channel/channel_args_test.cc +++ b/test/core/channel/channel_args_test.cc @@ -28,6 +28,7 @@ #include #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -35,6 +36,9 @@ namespace grpc_core { +using ::grpc_event_engine::experimental::CreateEventEngine; +using ::grpc_event_engine::experimental::EventEngine; + TEST(ChannelArgsTest, Noop) { ChannelArgs(); } TEST(ChannelArgsTest, SetGetRemove) { @@ -164,6 +168,33 @@ TEST(ChannelArgsTest, RetrieveRawPointerFromStoredSharedPtr) { EXPECT_EQ(2, shared_obj.use_count()); } +TEST(ChannelArgsTest, StoreSharedPtrEventEngine) { + auto p = std::shared_ptr(CreateEventEngine()); + ChannelArgs a; + a = a.SetObject(p); + Notification signal; + bool triggered = false; + a.GetObjectRef()->Run([&triggered, &signal] { + triggered = true; + signal.Notify(); + }); + signal.WaitForNotification(); + ASSERT_TRUE(triggered); +} + +TEST(ChannelArgsTest, GetNonOwningEventEngine) { + auto p = std::shared_ptr(CreateEventEngine()); + ASSERT_TRUE(p.unique()); + ChannelArgs a; + a = a.SetObject(p); + ASSERT_FALSE(p.unique()); + ASSERT_EQ(p.use_count(), 2); + EventEngine* engine = a.GetObject(); + (void)engine; + // p and the channel args + ASSERT_EQ(p.use_count(), 2); +} + } // namespace grpc_core TEST(GrpcChannelArgsTest, Create) { diff --git a/test/core/channel/channel_stack_builder_test.cc b/test/core/channel/channel_stack_builder_test.cc index 90ee4a86d01..9693ebf6d03 100644 --- a/test/core/channel/channel_stack_builder_test.cc +++ b/test/core/channel/channel_stack_builder_test.cc @@ -119,7 +119,8 @@ bool AddOriginalFilter(ChannelStackBuilder* builder) { } TEST(ChannelStackBuilder, UnknownTarget) { - ChannelStackBuilderImpl builder("alpha-beta-gamma", GRPC_CLIENT_CHANNEL); + ChannelStackBuilderImpl builder("alpha-beta-gamma", GRPC_CLIENT_CHANNEL, + ChannelArgs()); EXPECT_EQ(builder.target(), "unknown"); } diff --git a/test/core/channel/channel_stack_test.cc b/test/core/channel/channel_stack_test.cc index 65eea62be53..fff9904e0f9 100644 --- a/test/core/channel/channel_stack_test.cc +++ b/test/core/channel/channel_stack_test.cc @@ -18,24 +18,31 @@ #include "src/core/lib/channel/channel_stack.h" +#include + #include #include "absl/status/status.h" #include "gtest/gtest.h" -#include #include +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_args_preconditioning.h" +#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "test/core/util/test_config.h" static grpc_error_handle channel_init_func(grpc_channel_element* elem, grpc_channel_element_args* args) { - EXPECT_EQ(args->channel_args->num_args, 1); - EXPECT_EQ(args->channel_args->args[0].type, GRPC_ARG_INTEGER); - EXPECT_STREQ(args->channel_args->args[0].key, "test_key"); - EXPECT_EQ(args->channel_args->args[0].value.integer, 42); + int test_value = grpc_channel_args_find_integer(args->channel_args, + "test_key", {-1, 0, INT_MAX}); + EXPECT_EQ(test_value, 42); + auto* ee = grpc_channel_args_find_pointer< + grpc_event_engine::experimental::EventEngine>( + args->channel_args, GRPC_INTERNAL_ARG_EVENT_ENGINE); + EXPECT_NE(ee, nullptr); EXPECT_TRUE(args->is_first); EXPECT_TRUE(args->is_last); *static_cast(elem->channel_data) = 0; @@ -104,11 +111,14 @@ TEST(ChannelStackTest, CreateChannelStack) { channel_stack = static_cast( gpr_malloc(grpc_channel_stack_size(&filters, 1))); + auto channel_args = grpc_core::CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(nullptr) + .Set("test_key", 42); ASSERT_TRUE(GRPC_LOG_IF_ERROR( "grpc_channel_stack_init", grpc_channel_stack_init(1, free_channel, channel_stack, &filters, 1, - grpc_core::ChannelArgs().Set("test_key", 42), - "test", channel_stack))); + channel_args, "test", channel_stack))); EXPECT_EQ(channel_stack->count, 1); channel_elem = grpc_channel_stack_element(channel_stack, 0); channel_data = static_cast(channel_elem->channel_data); diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc index efc342de5d7..8bef0283ef4 100644 --- a/test/core/channel/minimal_stack_is_minimal_test.cc +++ b/test/core/channel/minimal_stack_is_minimal_test.cc @@ -43,7 +43,6 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/channel_stack_builder_impl.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -129,16 +128,17 @@ int main(int argc, char** argv) { static int check_stack(const char* file, int line, const char* transport_name, grpc_channel_args* init_args, unsigned channel_stack_type, ...) { + grpc_core::ChannelArgs channel_args = + grpc_core::ChannelArgs::FromC(init_args); // create phony channel stack grpc_core::ChannelStackBuilderImpl builder( - "test", static_cast(channel_stack_type)); + "test", static_cast(channel_stack_type), + channel_args); grpc_transport_vtable fake_transport_vtable; memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); fake_transport_vtable.name = transport_name; grpc_transport fake_transport = {&fake_transport_vtable}; - grpc_core::ChannelArgs channel_args = - grpc_core::ChannelArgs::FromC(init_args); - builder.SetTarget("foo.test.google.fr").SetChannelArgs(channel_args); + builder.SetTarget("foo.test.google.fr"); if (transport_name != nullptr) { builder.SetTransport(&fake_transport); } diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index e61fc2c3caf..60ea798d90c 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -38,6 +38,7 @@ #include "absl/types/variant.h" #include "gtest/gtest.h" +#include #include #include #include @@ -47,6 +48,7 @@ #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -237,6 +239,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { absl::string_view GetAuthority() override { return "server.example.com"; } + grpc_event_engine::experimental::EventEngine* GetEventEngine() override { + return grpc_event_engine::experimental::GetDefaultEventEngine().get(); + } + void AddTraceEvent(TraceSeverity, absl::string_view) override {} LoadBalancingPolicyTest* test_; diff --git a/test/core/filters/filter_fuzzer.cc b/test/core/filters/filter_fuzzer.cc index 6b075578199..69a5fd852bf 100644 --- a/test/core/filters/filter_fuzzer.cc +++ b/test/core/filters/filter_fuzzer.cc @@ -43,11 +43,13 @@ #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/lib/channel/call_finalization.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack_builder_impl.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/env.h" @@ -247,7 +249,9 @@ RefCountedPtr LoadAuthorizationEngine( template ChannelArgs LoadChannelArgs(const FuzzerChannelArgs& fuzz_args, GlobalObjects* globals) { - ChannelArgs args = ChannelArgs().SetObject(ResourceQuota::Default()); + ChannelArgs args = CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(nullptr); for (const auto& arg : fuzz_args) { if (arg.key() == ResourceQuota::ChannelArgName()) { if (arg.value_case() == filter_fuzzer::ChannelArg::kResourceQuota) { @@ -675,8 +679,8 @@ DEFINE_PROTO_FUZZER(const filter_fuzzer::Msg& msg) { grpc_core::ChannelStackBuilderImpl builder( msg.stack_name().c_str(), - static_cast(msg.channel_stack_type())); - builder.SetChannelArgs(channel_args); + static_cast(msg.channel_stack_type()), + channel_args); builder.AppendFilter(filter); const bool is_client = grpc_channel_stack_type_is_client(builder.channel_stack_type()); diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index 4fdca07ed04..016380bb755 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -26,6 +26,7 @@ #include "absl/strings/string_view.h" #include "absl/types/variant.h" +#include #include #include @@ -156,6 +157,10 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy { return parent_->channel_control_helper()->GetAuthority(); } + grpc_event_engine::experimental::EventEngine* GetEventEngine() override { + return parent_->channel_control_helper()->GetEventEngine(); + } + void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); @@ -274,6 +279,10 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy return parent_->channel_control_helper()->GetAuthority(); } + grpc_event_engine::experimental::EventEngine* GetEventEngine() override { + return parent_->channel_control_helper()->GetEventEngine(); + } + void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); @@ -385,6 +394,10 @@ class AddressTestLoadBalancingPolicy : public ForwardingLoadBalancingPolicy { return parent_->channel_control_helper()->GetAuthority(); } + grpc_event_engine::experimental::EventEngine* GetEventEngine() override { + return parent_->channel_control_helper()->GetEventEngine(); + } + void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); @@ -500,6 +513,10 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy { return parent_->channel_control_helper()->GetAuthority(); } + grpc_event_engine::experimental::EventEngine* GetEventEngine() override { + return parent_->channel_control_helper()->GetEventEngine(); + } + void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); @@ -617,6 +634,10 @@ class OobBackendMetricTestLoadBalancingPolicy return parent_->channel_control_helper()->GetAuthority(); } + grpc_event_engine::experimental::EventEngine* GetEventEngine() override { + return parent_->channel_control_helper()->GetEventEngine(); + } + void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_->channel_control_helper()->AddTraceEvent(severity, message); diff --git a/test/core/xds/xds_channel_stack_modifier_test.cc b/test/core/xds/xds_channel_stack_modifier_test.cc index 9f9bfc5d4cd..ca026012e87 100644 --- a/test/core/xds/xds_channel_stack_modifier_test.cc +++ b/test/core/xds/xds_channel_stack_modifier_test.cc @@ -90,8 +90,8 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) { grpc_arg arg = channel_stack_modifier->MakeChannelArg(); // Create a phony ChannelStackBuilder object grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); - ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL); - builder.SetChannelArgs(ChannelArgs::FromC(args)); + ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL, + ChannelArgs::FromC(args)); grpc_channel_args_destroy(args); grpc_transport_vtable fake_transport_vtable; memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); @@ -128,8 +128,8 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertionAfterCensus) { grpc_arg arg = channel_stack_modifier->MakeChannelArg(); // Create a phony ChannelStackBuilder object grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); - ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL); - builder.SetChannelArgs(ChannelArgs::FromC(args)); + ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL, + ChannelArgs::FromC(args)); grpc_channel_args_destroy(args); grpc_transport_vtable fake_transport_vtable; memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 3b4b57d74b4..a518f852dd4 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -501,7 +501,9 @@ static void BM_IsolatedFilter(benchmark::State& state) { FakeClientChannelFactory fake_client_channel_factory; grpc_core::ChannelArgs channel_args = - grpc_core::ChannelArgs() + grpc_core::CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(nullptr) .SetObject(&fake_client_channel_factory) .Set(GRPC_ARG_SERVER_URI, "localhost"); if (fixture.flags & REQUIRES_TRANSPORT) { @@ -696,12 +698,12 @@ class IsolatedCallFixture { // the grpc_shutdown() run by grpc_channel_destroy(). So we need to // call grpc_init() manually here to balance things out. grpc_init(); - auto args = grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr); - grpc_core::ChannelStackBuilderImpl builder("phony", GRPC_CLIENT_CHANNEL); + grpc_core::ChannelStackBuilderImpl builder( + "phony", GRPC_CLIENT_CHANNEL, + grpc_core::CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(nullptr)); builder.SetTarget("phony_target"); - builder.SetChannelArgs(args); builder.AppendFilter(&isolated_call_filter::isolated_call_filter); { grpc_core::ExecCtx exec_ctx; diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index df7444dd5d4..ef75c955a46 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -30,6 +30,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint_pair.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -189,12 +190,18 @@ class EndpointPairFixture : public BaseFixture { /* create channel */ { - ChannelArguments args; - args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); - fixture_configuration.ApplyCommonChannelArguments(&args); - - grpc_core::ChannelArgs c_args = - grpc_core::ChannelArgs::FromC(args.c_channel_args()); + grpc_core::ChannelArgs c_args; + { + ChannelArguments args; + args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); + fixture_configuration.ApplyCommonChannelArguments(&args); + // precondition + grpc_channel_args tmp_args; + args.SetChannelArgs(&tmp_args); + c_args = grpc_core::CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(&tmp_args); + } client_transport_ = grpc_create_chttp2_transport(c_args, endpoints.client, true); GPR_ASSERT(client_transport_);