From e28b70a53388ed86a2f81da8f8b63411e38d72d5 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Fri, 1 Jul 2022 09:15:57 -0700 Subject: [PATCH] Add std::shared_ptr support to ChannelArgs, and precondition ChannelArgs with a default EventEngine (#30128) * [WIP] Precondition ChannelArgs with a default EventEngine This is a step towards using ChannelArgs as the primary means of accessing EventEngine instances in gRPC-core. If not explicitly provided by the application, a default EventEngine will populated into ChannelArgs during preconditioning. This is not a final state, we may want to enable ref-counting here instead of using raw pointers. And a refactoring is in order to enable GetObject instead of the more verbose GetPointer(GRPC_ARG_EVENT_ENGINE). * Refactor ChannelArgs::GetObject to support non-conforming classes This allows us to not expose `ChannelArgName` in the public interface. * Add std::shared_ptr to ChannelArgs; Add EventEngine specialization * subchannel fix; cleanup * replace GetSharedPtr with overloads of GetObjectRef * Automated change: Fix sanity tests * fix the fixer * fix raw pointer retrieval from stored shared_ptr * Make GetObjectRef work (not general to shared_ptr) * enable shared_ptr ChannelArg support for shared_from_this * use new EventEngines for tests (not the default global) * Automated change: Fix sanity tests Co-authored-by: drfloob --- BUILD | 7 ++ include/grpc/event_engine/event_engine.h | 1 + include/grpc/impl/codegen/grpc_types.h | 2 + .../ext/filters/client_channel/subchannel.cc | 10 +- .../ext/filters/client_channel/subchannel.h | 5 + src/core/lib/channel/channel_args.h | 119 ++++++++++++++++-- src/core/lib/event_engine/event_engine.cc | 25 ++++ .../lib/event_engine/event_engine_factory.h | 6 + .../plugin_registry/grpc_plugin_registry.cc | 2 + test/core/channel/channel_args_test.cc | 78 ++++++++++++ 10 files changed, 242 insertions(+), 13 deletions(-) diff --git a/BUILD b/BUILD index 042f10b4d87..58acaa0658c 100644 --- a/BUILD +++ b/BUILD @@ -419,6 +419,7 @@ grpc_cc_library( "channel_stack_type", "config", "default_event_engine_factory_hdrs", + "event_engine_base", "gpr_base", "grpc_authorization_base", "grpc_base", @@ -478,6 +479,7 @@ grpc_cc_library( "channel_stack_type", "config", "default_event_engine_factory_hdrs", + "event_engine_base", "gpr_base", "grpc_authorization_base", "grpc_base", @@ -2251,6 +2253,7 @@ grpc_cc_library( "src/core/lib/event_engine/event_engine_factory.h", ], deps = [ + "config", "event_engine_base_hdrs", "gpr_base", ], @@ -2407,6 +2410,9 @@ grpc_cc_library( "src/core/lib/event_engine/event_engine.cc", ], deps = [ + "channel_args", + "channel_args_preconditioning", + "config", "default_event_engine_factory", "default_event_engine_factory_hdrs", "event_engine_base_hdrs", @@ -3067,6 +3073,7 @@ grpc_cc_library( "avl", "channel_stack_type", "dual_ref_counted", + "event_engine_base_hdrs", "gpr_base", "gpr_platform", "grpc_codegen", diff --git a/include/grpc/event_engine/event_engine.h b/include/grpc/event_engine/event_engine.h index 3e58f997a22..da069d21732 100644 --- a/include/grpc/event_engine/event_engine.h +++ b/include/grpc/event_engine/event_engine.h @@ -26,6 +26,7 @@ #include #include #include +#include // TODO(vigneshbabu): Define the Endpoint::Write metrics collection system namespace grpc_event_engine { diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index eb1f39055b3..c1b514a62ff 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -455,6 +455,8 @@ typedef struct { * channel arg. Int valued, milliseconds. Defaults to 10 minutes.*/ #define GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS \ "grpc.experimental.server_config_change_drain_grace_time_ms" +/** EventEngine pointer */ +#define GRPC_ARG_EVENT_ENGINE "grpc.event_engine" /** \} */ /** Result of a grpc call. If the caller satisfies the prerequisites of a diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index c679aee8950..5cc75bead76 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -51,7 +51,6 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/event_engine_factory.h" #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -84,7 +83,8 @@ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) namespace grpc_core { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; + +using ::grpc_event_engine::experimental::EventEngine; TraceFlag grpc_trace_subchannel(false, "subchannel"); DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); @@ -676,6 +676,8 @@ Subchannel::Subchannel(SubchannelKey key, } else { args_ = grpc_channel_args_copy(args); } + // Initialize EventEngine + event_engine_ = ChannelArgs::FromC(args_).GetObjectRef(); // Initialize channelz. const bool channelz_enabled = grpc_channel_args_find_bool( args_, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT); @@ -803,7 +805,7 @@ void Subchannel::ResetBackoff() { MutexLock lock(&mu_); backoff_.Reset(); if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && - GetDefaultEventEngine()->Cancel(retry_timer_handle_)) { + event_engine_->Cancel(retry_timer_handle_)) { OnRetryTimerLocked(); } else if (state_ == GRPC_CHANNEL_CONNECTING) { next_attempt_time_ = ExecCtx::Get()->Now(); @@ -949,7 +951,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { time_until_next_attempt.millis()); SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error)); - retry_timer_handle_ = GetDefaultEventEngine()->RunAfter( + retry_timer_handle_ = event_engine_->RunAfter( time_until_next_attempt, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { { diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 59783f59c05..28a29e6bf3c 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -23,6 +23,7 @@ #include #include +#include #include #include "absl/base/thread_annotations.h" @@ -419,6 +420,10 @@ class Subchannel : public DualRefCounted { // Data producer map. std::map data_producer_map_ ABSL_GUARDED_BY(mu_); + + // Engine used in this Subchannel + std::shared_ptr event_engine_ + ABSL_GUARDED_BY(mu_); }; } // namespace grpc_core diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 47f317da505..08af5490bc4 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -24,6 +24,7 @@ #include #include // IWYU pragma: keep +#include #include #include #include @@ -33,6 +34,7 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" +#include #include #include "src/core/lib/avl/avl.h" @@ -52,7 +54,7 @@ namespace grpc_core { // ChannelArgs to automatically derive a vtable from a T*. // To participate as a pointer, instances should expose the function: // // Gets the vtable for this type -// static const grpc_channel_arg_vtable* VTable(); +// static const grpc_arg_pointer_vtable* VTable(); // // Performs any mutations required for channel args to own a pointer // // Only needed if ChannelArgs::Set is to be called with a raw pointer. // static void* TakeUnownedPointer(T* p); @@ -86,6 +88,32 @@ struct ChannelArgTypeTraits< }; }; +// Specialization for shared_ptr +// Incurs an allocation because shared_ptr.release is not a thing. +template +struct is_shared_ptr : std::false_type {}; +template +struct is_shared_ptr> : std::true_type {}; +template +struct ChannelArgTypeTraits::value, void>> { + static void* TakeUnownedPointer(T* p) { return p; } + static const grpc_arg_pointer_vtable* VTable() { + static const grpc_arg_pointer_vtable tbl = { + // copy + [](void* p) -> void* { return new T(*static_cast(p)); }, + // destroy + [](void* p) { delete static_cast(p); }, + // compare + [](void* p1, void* p2) { + return QsortCompare(static_cast(p1)->get(), + static_cast(p2)->get()); + }, + }; + return &tbl; + }; +}; + // If a type declares some member 'struct RawPointerChannelArgTag {}' then // we automatically generate a vtable for it that does not do any ownership // management and compares the type by pointer identity. @@ -108,6 +136,54 @@ struct ChannelArgTypeTraits +struct WrapInSharedPtr + : std::integral_constant< + bool, std::is_base_of, T>::value> {}; +template <> +struct WrapInSharedPtr + : std::true_type {}; +template +struct GetObjectImpl; +// std::shared_ptr implementation +template +struct GetObjectImpl::value, void>> { + using Result = T*; + using ReffedResult = std::shared_ptr; + using StoredType = std::shared_ptr*; + static Result Get(StoredType p) { return p->get(); }; + static ReffedResult GetReffed(StoredType p) { return ReffedResult(*p); }; +}; +// RefCountedPtr +template +struct GetObjectImpl::value, void>> { + using Result = T*; + using ReffedResult = RefCountedPtr; + using StoredType = Result; + static Result Get(StoredType p) { return p; }; + static ReffedResult GetReffed(StoredType p) { + if (p == nullptr) return nullptr; + return p->Ref(); + }; +}; + +// Provide the canonical name for a type's channel arg key +template +struct ChannelArgNameTraits { + static absl::string_view ChannelArgName() { return T::ChannelArgName(); } +}; +template +struct ChannelArgNameTraits> { + static absl::string_view ChannelArgName() { return T::ChannelArgName(); } +}; + +// Specialization for the EventEngine +template <> +struct ChannelArgNameTraits { + static absl::string_view ChannelArgName() { return GRPC_ARG_EVENT_ENGINE; } +}; + class ChannelArgs { public: class Pointer { @@ -202,6 +278,20 @@ class ChannelArgs { absl::remove_cvref_t>::VTable())); } template + GRPC_MUST_USE_RESULT absl::enable_if_t< + std::is_same< + const grpc_arg_pointer_vtable*, + decltype(ChannelArgTypeTraits>::VTable())>::value, + ChannelArgs> + Set(absl::string_view name, std::shared_ptr value) const { + auto* store_value = new std::shared_ptr(value); + return Set( + name, + Pointer(ChannelArgTypeTraits>::TakeUnownedPointer( + store_value), + ChannelArgTypeTraits>::VTable())); + } + template GRPC_MUST_USE_RESULT ChannelArgs SetIfUnset(absl::string_view name, T value) { if (Contains(name)) return *this; return Set(name, std::move(value)); @@ -209,12 +299,19 @@ class ChannelArgs { GRPC_MUST_USE_RESULT ChannelArgs Remove(absl::string_view name) const; bool Contains(absl::string_view name) const { return Get(name) != nullptr; } + template + bool ContainsObject() const { + return Get(ChannelArgNameTraits::ChannelArgName()) != nullptr; + } + absl::optional GetInt(absl::string_view name) const; absl::optional GetString(absl::string_view name) const; void* GetVoidPointer(absl::string_view name) const; template - T* GetPointer(absl::string_view name) const { - return static_cast(GetVoidPointer(name)); + typename GetObjectImpl::StoredType GetPointer( + absl::string_view name) const { + return static_cast::StoredType>( + GetVoidPointer(name)); } absl::optional GetDurationFromIntMillis( absl::string_view name) const; @@ -234,14 +331,18 @@ class ChannelArgs { return Set(T::ChannelArgName(), std::move(p)); } template - T* GetObject() { - return GetPointer(T::ChannelArgName()); + GRPC_MUST_USE_RESULT ChannelArgs SetObject(std::shared_ptr p) const { + return Set(ChannelArgNameTraits::ChannelArgName(), std::move(p)); } template - RefCountedPtr GetObjectRef() { - auto* p = GetObject(); - if (p == nullptr) return nullptr; - return p->Ref(); + typename GetObjectImpl::Result GetObject() { + return GetObjectImpl::Get( + GetPointer(ChannelArgNameTraits::ChannelArgName())); + } + template + typename GetObjectImpl::ReffedResult GetObjectRef() { + return GetObjectImpl::GetReffed( + GetPointer(ChannelArgNameTraits::ChannelArgName())); } bool operator<(const ChannelArgs& other) const { return args_ < other.args_; } diff --git a/src/core/lib/event_engine/event_engine.cc b/src/core/lib/event_engine/event_engine.cc index 60c51d9a2b0..b15d5bfec51 100644 --- a/src/core/lib/event_engine/event_engine.cc +++ b/src/core/lib/event_engine/event_engine.cc @@ -20,6 +20,9 @@ #include +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_args_preconditioning.h" +#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/event_engine/event_engine_factory.h" namespace grpc_event_engine { @@ -63,5 +66,27 @@ void ResetDefaultEventEngine() { delete g_event_engine.exchange(nullptr, std::memory_order_acq_rel); } +namespace { +grpc_core::ChannelArgs EnsureEventEngineInChannelArgs( + grpc_core::ChannelArgs args) { + if (args.ContainsObject()) return args; + // TODO(hork): Consider deleting GetDefaultEventEngine(), use the factory + // directly when ChannelArgs aren't available. That would eliminate the no-op + // deleter below. + // Store a shared_ptr with a no-op deleter. The default is expected to live + // indefinitely. + return args.SetObject( + std::shared_ptr(GetDefaultEventEngine(), [](auto) {})); +} +} // namespace + } // namespace experimental } // namespace grpc_event_engine + +namespace grpc_core { +void RegisterEventEngine(CoreConfiguration::Builder* builder) { + builder->channel_args_preconditioning()->RegisterStage( + grpc_event_engine::experimental::EnsureEventEngineInChannelArgs); +} + +} // namespace grpc_core diff --git a/src/core/lib/event_engine/event_engine_factory.h b/src/core/lib/event_engine/event_engine_factory.h index 38c104dc5d8..0871dcd7de9 100644 --- a/src/core/lib/event_engine/event_engine_factory.h +++ b/src/core/lib/event_engine/event_engine_factory.h @@ -20,6 +20,8 @@ #include +#include "src/core/lib/config/core_configuration.h" + namespace grpc_event_engine { namespace experimental { @@ -35,6 +37,10 @@ std::unique_ptr DefaultEventEngineFactory(); /// Reset the default event engine void ResetDefaultEventEngine(); +/// On ingress, ensure that an EventEngine exists in channel args via +/// preconditioning. +void RegisterEventEngine(grpc_core::CoreConfiguration::Builder* builder); + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index 8c46ddb4c3d..38ca7af3eb1 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -105,8 +105,10 @@ extern void RegisterFakeResolver(CoreConfiguration::Builder* builder); #ifdef GPR_SUPPORT_BINDER_TRANSPORT extern void RegisterBinderResolver(CoreConfiguration::Builder* builder); #endif +extern void RegisterEventEngine(CoreConfiguration::Builder* builder); void BuildCoreConfiguration(CoreConfiguration::Builder* builder) { + RegisterEventEngine(builder); // The order of the handshaker registration is crucial here. // We want TCP connect handshaker to be registered last so that it is added to // the start of the handshaker list. diff --git a/test/core/channel/channel_args_test.cc b/test/core/channel/channel_args_test.cc index 2aa9741dd66..70412596594 100644 --- a/test/core/channel/channel_args_test.cc +++ b/test/core/channel/channel_args_test.cc @@ -28,6 +28,7 @@ #include #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/event_engine/event_engine_factory.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -37,6 +38,9 @@ namespace grpc_core { +using ::grpc_event_engine::experimental::CreateEventEngine; +using ::grpc_event_engine::experimental::EventEngine; + TEST(ChannelArgsTest, Noop) { ChannelArgs(); } TEST(ChannelArgsTest, SetGetRemove) { @@ -86,6 +90,80 @@ TEST(ChannelArgsTest, StoreRefCountedPtr) { EXPECT_EQ(a.GetPointer("test")->n, 123); } +TEST(ChannelArgsTest, StoreSharedPtrEventEngine) { + auto p = std::shared_ptr(CreateEventEngine()); + ChannelArgs a; + a = a.SetObject(p); + Mutex mu; + CondVar cv; + bool triggered = false; + MutexLock lock(&mu); + a.GetObjectRef()->Run([&mu, &triggered, &cv] { + MutexLock lock(&mu); + triggered = true; + cv.Signal(); + }); + cv.WaitWithTimeout(&mu, absl::Seconds(1)); + ASSERT_TRUE(triggered); +} + +TEST(ChannelArgsTest, GetNonOwningEventEngine) { + auto p = std::shared_ptr(CreateEventEngine()); + ChannelArgs a; + a = a.SetObject(p); + ASSERT_FALSE(p.unique()); + EventEngine* engine = a.GetObject(); + (void)engine; + // p and the channel args + ASSERT_EQ(p.use_count(), 2); +} + +TEST(ChannelArgsTest, StoreAndRetrieveSharedPtr) { + struct Test : public std::enable_shared_from_this { + explicit Test(int n) : n(n) {} + int n; + static int ChannelArgsCompare(const Test* a, const Test* b) { + return a->n - b->n; + } + static absl::string_view ChannelArgName() { return "grpc.test"; } + }; + std::shared_ptr copied_obj; + { + ChannelArgs a; + auto p = std::make_shared(42); + EXPECT_TRUE(p.unique()); + a = a.SetObject(p); + EXPECT_FALSE(p.unique()); + copied_obj = a.GetObjectRef(); + EXPECT_EQ(copied_obj->n, 42); + // Refs: p, copied_obj, and ChannelArgs + EXPECT_EQ(3, copied_obj.use_count()); + } + // The p and ChannelArgs are deleted. + EXPECT_TRUE(copied_obj.unique()); + EXPECT_EQ(copied_obj->n, 42); +} + +TEST(ChannelArgsTest, RetrieveRawPointerFromStoredSharedPtr) { + struct Test : public std::enable_shared_from_this { + explicit Test(int n) : n(n) {} + int n; + static int ChannelArgsCompare(const Test* a, const Test* b) { + return a->n - b->n; + } + static absl::string_view ChannelArgName() { return "grpc.test"; } + }; + ChannelArgs a; + auto p = std::make_shared(42); + EXPECT_TRUE(p.unique()); + a = a.SetObject(p); + EXPECT_FALSE(p.unique()); + Test* testp = a.GetObject(); + EXPECT_EQ(testp->n, 42); + // Refs: p and ChannelArgs + EXPECT_EQ(2, p.use_count()); +} + TEST(ChannelArgsTest, ObjectApi) { struct MyFancyObject : public RefCounted { explicit MyFancyObject(int n) : n(n) {}