diff --git a/BUILD b/BUILD index dae2cea68c8..92868f19341 100644 --- a/BUILD +++ b/BUILD @@ -419,7 +419,6 @@ grpc_cc_library( "channel_stack_type", "config", "default_event_engine_factory_hdrs", - "event_engine_base", "gpr_base", "grpc_authorization_base", "grpc_base", @@ -479,7 +478,6 @@ grpc_cc_library( "channel_stack_type", "config", "default_event_engine_factory_hdrs", - "event_engine_base", "gpr_base", "grpc_authorization_base", "grpc_base", @@ -2255,7 +2253,6 @@ grpc_cc_library( "src/core/lib/event_engine/event_engine_factory.h", ], deps = [ - "config", "event_engine_base_hdrs", "gpr_base", ], @@ -2412,9 +2409,6 @@ grpc_cc_library( "src/core/lib/event_engine/event_engine.cc", ], deps = [ - "channel_args", - "channel_args_preconditioning", - "config", "default_event_engine_factory", "default_event_engine_factory_hdrs", "event_engine_base_hdrs", @@ -3076,7 +3070,6 @@ grpc_cc_library( "avl", "channel_stack_type", "dual_ref_counted", - "event_engine_base_hdrs", "gpr_base", "gpr_platform", "grpc_codegen", diff --git a/include/grpc/event_engine/event_engine.h b/include/grpc/event_engine/event_engine.h index da069d21732..3e58f997a22 100644 --- a/include/grpc/event_engine/event_engine.h +++ b/include/grpc/event_engine/event_engine.h @@ -26,7 +26,6 @@ #include #include #include -#include // TODO(vigneshbabu): Define the Endpoint::Write metrics collection system namespace grpc_event_engine { diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index c1b514a62ff..eb1f39055b3 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -455,8 +455,6 @@ typedef struct { * channel arg. Int valued, milliseconds. Defaults to 10 minutes.*/ #define GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS \ "grpc.experimental.server_config_change_drain_grace_time_ms" -/** EventEngine pointer */ -#define GRPC_ARG_EVENT_ENGINE "grpc.event_engine" /** \} */ /** Result of a grpc call. If the caller satisfies the prerequisites of a diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 754aac38322..f357f9f121d 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -50,6 +50,7 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/event_engine_factory.h" #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -82,8 +83,7 @@ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) namespace grpc_core { - -using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; TraceFlag grpc_trace_subchannel(false, "subchannel"); DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); @@ -675,8 +675,6 @@ Subchannel::Subchannel(SubchannelKey key, } else { args_ = grpc_channel_args_copy(args); } - // Initialize EventEngine - event_engine_ = ChannelArgs::FromC(args_).GetObjectRef(); // Initialize channelz. const bool channelz_enabled = grpc_channel_args_find_bool( args_, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT); @@ -804,7 +802,7 @@ void Subchannel::ResetBackoff() { MutexLock lock(&mu_); backoff_.Reset(); if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && - event_engine_->Cancel(retry_timer_handle_)) { + GetDefaultEventEngine()->Cancel(retry_timer_handle_)) { OnRetryTimerLocked(); } else if (state_ == GRPC_CHANNEL_CONNECTING) { next_attempt_time_ = ExecCtx::Get()->Now(); @@ -950,7 +948,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { time_until_next_attempt.millis()); SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error)); - retry_timer_handle_ = event_engine_->RunAfter( + retry_timer_handle_ = GetDefaultEventEngine()->RunAfter( time_until_next_attempt, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { { diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 28a29e6bf3c..59783f59c05 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -23,7 +23,6 @@ #include #include -#include #include #include "absl/base/thread_annotations.h" @@ -420,10 +419,6 @@ class Subchannel : public DualRefCounted { // Data producer map. std::map data_producer_map_ ABSL_GUARDED_BY(mu_); - - // Engine used in this Subchannel - std::shared_ptr event_engine_ - ABSL_GUARDED_BY(mu_); }; } // namespace grpc_core diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 08af5490bc4..47f317da505 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -24,7 +24,6 @@ #include #include // IWYU pragma: keep -#include #include #include #include @@ -34,7 +33,6 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" -#include #include #include "src/core/lib/avl/avl.h" @@ -54,7 +52,7 @@ namespace grpc_core { // ChannelArgs to automatically derive a vtable from a T*. // To participate as a pointer, instances should expose the function: // // Gets the vtable for this type -// static const grpc_arg_pointer_vtable* VTable(); +// static const grpc_channel_arg_vtable* VTable(); // // Performs any mutations required for channel args to own a pointer // // Only needed if ChannelArgs::Set is to be called with a raw pointer. // static void* TakeUnownedPointer(T* p); @@ -88,32 +86,6 @@ struct ChannelArgTypeTraits< }; }; -// Specialization for shared_ptr -// Incurs an allocation because shared_ptr.release is not a thing. -template -struct is_shared_ptr : std::false_type {}; -template -struct is_shared_ptr> : std::true_type {}; -template -struct ChannelArgTypeTraits::value, void>> { - static void* TakeUnownedPointer(T* p) { return p; } - static const grpc_arg_pointer_vtable* VTable() { - static const grpc_arg_pointer_vtable tbl = { - // copy - [](void* p) -> void* { return new T(*static_cast(p)); }, - // destroy - [](void* p) { delete static_cast(p); }, - // compare - [](void* p1, void* p2) { - return QsortCompare(static_cast(p1)->get(), - static_cast(p2)->get()); - }, - }; - return &tbl; - }; -}; - // If a type declares some member 'struct RawPointerChannelArgTag {}' then // we automatically generate a vtable for it that does not do any ownership // management and compares the type by pointer identity. @@ -136,54 +108,6 @@ struct ChannelArgTypeTraits -struct WrapInSharedPtr - : std::integral_constant< - bool, std::is_base_of, T>::value> {}; -template <> -struct WrapInSharedPtr - : std::true_type {}; -template -struct GetObjectImpl; -// std::shared_ptr implementation -template -struct GetObjectImpl::value, void>> { - using Result = T*; - using ReffedResult = std::shared_ptr; - using StoredType = std::shared_ptr*; - static Result Get(StoredType p) { return p->get(); }; - static ReffedResult GetReffed(StoredType p) { return ReffedResult(*p); }; -}; -// RefCountedPtr -template -struct GetObjectImpl::value, void>> { - using Result = T*; - using ReffedResult = RefCountedPtr; - using StoredType = Result; - static Result Get(StoredType p) { return p; }; - static ReffedResult GetReffed(StoredType p) { - if (p == nullptr) return nullptr; - return p->Ref(); - }; -}; - -// Provide the canonical name for a type's channel arg key -template -struct ChannelArgNameTraits { - static absl::string_view ChannelArgName() { return T::ChannelArgName(); } -}; -template -struct ChannelArgNameTraits> { - static absl::string_view ChannelArgName() { return T::ChannelArgName(); } -}; - -// Specialization for the EventEngine -template <> -struct ChannelArgNameTraits { - static absl::string_view ChannelArgName() { return GRPC_ARG_EVENT_ENGINE; } -}; - class ChannelArgs { public: class Pointer { @@ -278,20 +202,6 @@ class ChannelArgs { absl::remove_cvref_t>::VTable())); } template - GRPC_MUST_USE_RESULT absl::enable_if_t< - std::is_same< - const grpc_arg_pointer_vtable*, - decltype(ChannelArgTypeTraits>::VTable())>::value, - ChannelArgs> - Set(absl::string_view name, std::shared_ptr value) const { - auto* store_value = new std::shared_ptr(value); - return Set( - name, - Pointer(ChannelArgTypeTraits>::TakeUnownedPointer( - store_value), - ChannelArgTypeTraits>::VTable())); - } - template GRPC_MUST_USE_RESULT ChannelArgs SetIfUnset(absl::string_view name, T value) { if (Contains(name)) return *this; return Set(name, std::move(value)); @@ -299,19 +209,12 @@ class ChannelArgs { GRPC_MUST_USE_RESULT ChannelArgs Remove(absl::string_view name) const; bool Contains(absl::string_view name) const { return Get(name) != nullptr; } - template - bool ContainsObject() const { - return Get(ChannelArgNameTraits::ChannelArgName()) != nullptr; - } - absl::optional GetInt(absl::string_view name) const; absl::optional GetString(absl::string_view name) const; void* GetVoidPointer(absl::string_view name) const; template - typename GetObjectImpl::StoredType GetPointer( - absl::string_view name) const { - return static_cast::StoredType>( - GetVoidPointer(name)); + T* GetPointer(absl::string_view name) const { + return static_cast(GetVoidPointer(name)); } absl::optional GetDurationFromIntMillis( absl::string_view name) const; @@ -331,18 +234,14 @@ class ChannelArgs { return Set(T::ChannelArgName(), std::move(p)); } template - GRPC_MUST_USE_RESULT ChannelArgs SetObject(std::shared_ptr p) const { - return Set(ChannelArgNameTraits::ChannelArgName(), std::move(p)); - } - template - typename GetObjectImpl::Result GetObject() { - return GetObjectImpl::Get( - GetPointer(ChannelArgNameTraits::ChannelArgName())); + T* GetObject() { + return GetPointer(T::ChannelArgName()); } template - typename GetObjectImpl::ReffedResult GetObjectRef() { - return GetObjectImpl::GetReffed( - GetPointer(ChannelArgNameTraits::ChannelArgName())); + RefCountedPtr GetObjectRef() { + auto* p = GetObject(); + if (p == nullptr) return nullptr; + return p->Ref(); } bool operator<(const ChannelArgs& other) const { return args_ < other.args_; } diff --git a/src/core/lib/event_engine/event_engine.cc b/src/core/lib/event_engine/event_engine.cc index b15d5bfec51..60c51d9a2b0 100644 --- a/src/core/lib/event_engine/event_engine.cc +++ b/src/core/lib/event_engine/event_engine.cc @@ -20,9 +20,6 @@ #include -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/channel_args_preconditioning.h" -#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/event_engine/event_engine_factory.h" namespace grpc_event_engine { @@ -66,27 +63,5 @@ void ResetDefaultEventEngine() { delete g_event_engine.exchange(nullptr, std::memory_order_acq_rel); } -namespace { -grpc_core::ChannelArgs EnsureEventEngineInChannelArgs( - grpc_core::ChannelArgs args) { - if (args.ContainsObject()) return args; - // TODO(hork): Consider deleting GetDefaultEventEngine(), use the factory - // directly when ChannelArgs aren't available. That would eliminate the no-op - // deleter below. - // Store a shared_ptr with a no-op deleter. The default is expected to live - // indefinitely. - return args.SetObject( - std::shared_ptr(GetDefaultEventEngine(), [](auto) {})); -} -} // namespace - } // namespace experimental } // namespace grpc_event_engine - -namespace grpc_core { -void RegisterEventEngine(CoreConfiguration::Builder* builder) { - builder->channel_args_preconditioning()->RegisterStage( - grpc_event_engine::experimental::EnsureEventEngineInChannelArgs); -} - -} // namespace grpc_core diff --git a/src/core/lib/event_engine/event_engine_factory.h b/src/core/lib/event_engine/event_engine_factory.h index 0871dcd7de9..38c104dc5d8 100644 --- a/src/core/lib/event_engine/event_engine_factory.h +++ b/src/core/lib/event_engine/event_engine_factory.h @@ -20,8 +20,6 @@ #include -#include "src/core/lib/config/core_configuration.h" - namespace grpc_event_engine { namespace experimental { @@ -37,10 +35,6 @@ std::unique_ptr DefaultEventEngineFactory(); /// Reset the default event engine void ResetDefaultEventEngine(); -/// On ingress, ensure that an EventEngine exists in channel args via -/// preconditioning. -void RegisterEventEngine(grpc_core::CoreConfiguration::Builder* builder); - } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index 38ca7af3eb1..8c46ddb4c3d 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -105,10 +105,8 @@ extern void RegisterFakeResolver(CoreConfiguration::Builder* builder); #ifdef GPR_SUPPORT_BINDER_TRANSPORT extern void RegisterBinderResolver(CoreConfiguration::Builder* builder); #endif -extern void RegisterEventEngine(CoreConfiguration::Builder* builder); void BuildCoreConfiguration(CoreConfiguration::Builder* builder) { - RegisterEventEngine(builder); // The order of the handshaker registration is crucial here. // We want TCP connect handshaker to be registered last so that it is added to // the start of the handshaker list. diff --git a/test/core/channel/channel_args_test.cc b/test/core/channel/channel_args_test.cc index 70412596594..2aa9741dd66 100644 --- a/test/core/channel/channel_args_test.cc +++ b/test/core/channel/channel_args_test.cc @@ -28,7 +28,6 @@ #include #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/event_engine/event_engine_factory.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -38,9 +37,6 @@ namespace grpc_core { -using ::grpc_event_engine::experimental::CreateEventEngine; -using ::grpc_event_engine::experimental::EventEngine; - TEST(ChannelArgsTest, Noop) { ChannelArgs(); } TEST(ChannelArgsTest, SetGetRemove) { @@ -90,80 +86,6 @@ TEST(ChannelArgsTest, StoreRefCountedPtr) { EXPECT_EQ(a.GetPointer("test")->n, 123); } -TEST(ChannelArgsTest, StoreSharedPtrEventEngine) { - auto p = std::shared_ptr(CreateEventEngine()); - ChannelArgs a; - a = a.SetObject(p); - Mutex mu; - CondVar cv; - bool triggered = false; - MutexLock lock(&mu); - a.GetObjectRef()->Run([&mu, &triggered, &cv] { - MutexLock lock(&mu); - triggered = true; - cv.Signal(); - }); - cv.WaitWithTimeout(&mu, absl::Seconds(1)); - ASSERT_TRUE(triggered); -} - -TEST(ChannelArgsTest, GetNonOwningEventEngine) { - auto p = std::shared_ptr(CreateEventEngine()); - ChannelArgs a; - a = a.SetObject(p); - ASSERT_FALSE(p.unique()); - EventEngine* engine = a.GetObject(); - (void)engine; - // p and the channel args - ASSERT_EQ(p.use_count(), 2); -} - -TEST(ChannelArgsTest, StoreAndRetrieveSharedPtr) { - struct Test : public std::enable_shared_from_this { - explicit Test(int n) : n(n) {} - int n; - static int ChannelArgsCompare(const Test* a, const Test* b) { - return a->n - b->n; - } - static absl::string_view ChannelArgName() { return "grpc.test"; } - }; - std::shared_ptr copied_obj; - { - ChannelArgs a; - auto p = std::make_shared(42); - EXPECT_TRUE(p.unique()); - a = a.SetObject(p); - EXPECT_FALSE(p.unique()); - copied_obj = a.GetObjectRef(); - EXPECT_EQ(copied_obj->n, 42); - // Refs: p, copied_obj, and ChannelArgs - EXPECT_EQ(3, copied_obj.use_count()); - } - // The p and ChannelArgs are deleted. - EXPECT_TRUE(copied_obj.unique()); - EXPECT_EQ(copied_obj->n, 42); -} - -TEST(ChannelArgsTest, RetrieveRawPointerFromStoredSharedPtr) { - struct Test : public std::enable_shared_from_this { - explicit Test(int n) : n(n) {} - int n; - static int ChannelArgsCompare(const Test* a, const Test* b) { - return a->n - b->n; - } - static absl::string_view ChannelArgName() { return "grpc.test"; } - }; - ChannelArgs a; - auto p = std::make_shared(42); - EXPECT_TRUE(p.unique()); - a = a.SetObject(p); - EXPECT_FALSE(p.unique()); - Test* testp = a.GetObject(); - EXPECT_EQ(testp->n, 42); - // Refs: p and ChannelArgs - EXPECT_EQ(2, p.use_count()); -} - TEST(ChannelArgsTest, ObjectApi) { struct MyFancyObject : public RefCounted { explicit MyFancyObject(int n) : n(n) {}