Add std::shared_ptr support to ChannelArgs, and precondition ChannelArgs with a default EventEngine (#30128)

* [WIP] Precondition ChannelArgs with a default EventEngine

This is a step towards using ChannelArgs as the primary means of
accessing EventEngine instances in gRPC-core. If not explicitly provided
by the application, a default EventEngine will populated into
ChannelArgs during preconditioning.

This is not a final state, we may want to enable ref-counting here
instead of using raw pointers. And a refactoring is in order to enable
GetObject instead of the more verbose
GetPointer<EventEngine>(GRPC_ARG_EVENT_ENGINE).

* Refactor ChannelArgs::GetObject to support non-conforming classes

This allows us to not expose `ChannelArgName` in the public interface.

* Add std::shared_ptr to ChannelArgs; Add EventEngine specialization

* subchannel fix; cleanup

* replace GetSharedPtr with overloads of GetObjectRef

* Automated change: Fix sanity tests

* fix the fixer

* fix raw pointer retrieval from stored shared_ptr

* Make GetObjectRef<EventEngine> work (not general to shared_ptr)

* enable shared_ptr ChannelArg support for shared_from_this

* use new EventEngines for tests (not the default global)

* Automated change: Fix sanity tests

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/30170/head
AJ Heller 3 years ago committed by GitHub
parent 74e1023f0a
commit e28b70a533
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      BUILD
  2. 1
      include/grpc/event_engine/event_engine.h
  3. 2
      include/grpc/impl/codegen/grpc_types.h
  4. 10
      src/core/ext/filters/client_channel/subchannel.cc
  5. 5
      src/core/ext/filters/client_channel/subchannel.h
  6. 119
      src/core/lib/channel/channel_args.h
  7. 25
      src/core/lib/event_engine/event_engine.cc
  8. 6
      src/core/lib/event_engine/event_engine_factory.h
  9. 2
      src/core/plugin_registry/grpc_plugin_registry.cc
  10. 78
      test/core/channel/channel_args_test.cc

@ -419,6 +419,7 @@ grpc_cc_library(
"channel_stack_type",
"config",
"default_event_engine_factory_hdrs",
"event_engine_base",
"gpr_base",
"grpc_authorization_base",
"grpc_base",
@ -478,6 +479,7 @@ grpc_cc_library(
"channel_stack_type",
"config",
"default_event_engine_factory_hdrs",
"event_engine_base",
"gpr_base",
"grpc_authorization_base",
"grpc_base",
@ -2251,6 +2253,7 @@ grpc_cc_library(
"src/core/lib/event_engine/event_engine_factory.h",
],
deps = [
"config",
"event_engine_base_hdrs",
"gpr_base",
],
@ -2407,6 +2410,9 @@ grpc_cc_library(
"src/core/lib/event_engine/event_engine.cc",
],
deps = [
"channel_args",
"channel_args_preconditioning",
"config",
"default_event_engine_factory",
"default_event_engine_factory_hdrs",
"event_engine_base_hdrs",
@ -3067,6 +3073,7 @@ grpc_cc_library(
"avl",
"channel_stack_type",
"dual_ref_counted",
"event_engine_base_hdrs",
"gpr_base",
"gpr_platform",
"grpc_codegen",

@ -26,6 +26,7 @@
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/port.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/impl/codegen/grpc_types.h>
// TODO(vigneshbabu): Define the Endpoint::Write metrics collection system
namespace grpc_event_engine {

@ -455,6 +455,8 @@ typedef struct {
* channel arg. Int valued, milliseconds. Defaults to 10 minutes.*/
#define GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS \
"grpc.experimental.server_config_change_drain_grace_time_ms"
/** EventEngine pointer */
#define GRPC_ARG_EVENT_ENGINE "grpc.event_engine"
/** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a

@ -51,7 +51,6 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -84,7 +83,8 @@
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
namespace grpc_core {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
using ::grpc_event_engine::experimental::EventEngine;
TraceFlag grpc_trace_subchannel(false, "subchannel");
DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
@ -676,6 +676,8 @@ Subchannel::Subchannel(SubchannelKey key,
} else {
args_ = grpc_channel_args_copy(args);
}
// Initialize EventEngine
event_engine_ = ChannelArgs::FromC(args_).GetObjectRef<EventEngine>();
// Initialize channelz.
const bool channelz_enabled = grpc_channel_args_find_bool(
args_, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT);
@ -803,7 +805,7 @@ void Subchannel::ResetBackoff() {
MutexLock lock(&mu_);
backoff_.Reset();
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
GetDefaultEventEngine()->Cancel(retry_timer_handle_)) {
event_engine_->Cancel(retry_timer_handle_)) {
OnRetryTimerLocked();
} else if (state_ == GRPC_CHANNEL_CONNECTING) {
next_attempt_time_ = ExecCtx::Get()->Now();
@ -949,7 +951,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
time_until_next_attempt.millis());
SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error));
retry_timer_handle_ = GetDefaultEventEngine()->RunAfter(
retry_timer_handle_ = event_engine_->RunAfter(
time_until_next_attempt,
[self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
{

@ -23,6 +23,7 @@
#include <deque>
#include <map>
#include <memory>
#include <string>
#include "absl/base/thread_annotations.h"
@ -419,6 +420,10 @@ class Subchannel : public DualRefCounted<Subchannel> {
// Data producer map.
std::map<UniqueTypeName, DataProducerInterface*> data_producer_map_
ABSL_GUARDED_BY(mu_);
// Engine used in this Subchannel
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_
ABSL_GUARDED_BY(mu_);
};
} // namespace grpc_core

@ -24,6 +24,7 @@
#include <stddef.h>
#include <algorithm> // IWYU pragma: keep
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
@ -33,6 +34,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/avl/avl.h"
@ -52,7 +54,7 @@ namespace grpc_core {
// ChannelArgs to automatically derive a vtable from a T*.
// To participate as a pointer, instances should expose the function:
// // Gets the vtable for this type
// static const grpc_channel_arg_vtable* VTable();
// static const grpc_arg_pointer_vtable* VTable();
// // Performs any mutations required for channel args to own a pointer
// // Only needed if ChannelArgs::Set is to be called with a raw pointer.
// static void* TakeUnownedPointer(T* p);
@ -86,6 +88,32 @@ struct ChannelArgTypeTraits<
};
};
// Specialization for shared_ptr
// Incurs an allocation because shared_ptr.release is not a thing.
template <typename T>
struct is_shared_ptr : std::false_type {};
template <typename T>
struct is_shared_ptr<std::shared_ptr<T>> : std::true_type {};
template <typename T>
struct ChannelArgTypeTraits<T,
absl::enable_if_t<is_shared_ptr<T>::value, void>> {
static void* TakeUnownedPointer(T* p) { return p; }
static const grpc_arg_pointer_vtable* VTable() {
static const grpc_arg_pointer_vtable tbl = {
// copy
[](void* p) -> void* { return new T(*static_cast<T*>(p)); },
// destroy
[](void* p) { delete static_cast<T*>(p); },
// compare
[](void* p1, void* p2) {
return QsortCompare(static_cast<const T*>(p1)->get(),
static_cast<const T*>(p2)->get());
},
};
return &tbl;
};
};
// If a type declares some member 'struct RawPointerChannelArgTag {}' then
// we automatically generate a vtable for it that does not do any ownership
// management and compares the type by pointer identity.
@ -108,6 +136,54 @@ struct ChannelArgTypeTraits<T,
};
};
// GetObject support for shared_ptr and RefCountedPtr
template <typename T>
struct WrapInSharedPtr
: std::integral_constant<
bool, std::is_base_of<std::enable_shared_from_this<T>, T>::value> {};
template <>
struct WrapInSharedPtr<grpc_event_engine::experimental::EventEngine>
: std::true_type {};
template <typename T, typename Ignored = void /* for SFINAE */>
struct GetObjectImpl;
// std::shared_ptr implementation
template <typename T>
struct GetObjectImpl<T, absl::enable_if_t<WrapInSharedPtr<T>::value, void>> {
using Result = T*;
using ReffedResult = std::shared_ptr<T>;
using StoredType = std::shared_ptr<T>*;
static Result Get(StoredType p) { return p->get(); };
static ReffedResult GetReffed(StoredType p) { return ReffedResult(*p); };
};
// RefCountedPtr
template <typename T>
struct GetObjectImpl<T, absl::enable_if_t<!WrapInSharedPtr<T>::value, void>> {
using Result = T*;
using ReffedResult = RefCountedPtr<T>;
using StoredType = Result;
static Result Get(StoredType p) { return p; };
static ReffedResult GetReffed(StoredType p) {
if (p == nullptr) return nullptr;
return p->Ref();
};
};
// Provide the canonical name for a type's channel arg key
template <typename T>
struct ChannelArgNameTraits {
static absl::string_view ChannelArgName() { return T::ChannelArgName(); }
};
template <typename T>
struct ChannelArgNameTraits<std::shared_ptr<T>> {
static absl::string_view ChannelArgName() { return T::ChannelArgName(); }
};
// Specialization for the EventEngine
template <>
struct ChannelArgNameTraits<grpc_event_engine::experimental::EventEngine> {
static absl::string_view ChannelArgName() { return GRPC_ARG_EVENT_ENGINE; }
};
class ChannelArgs {
public:
class Pointer {
@ -202,6 +278,20 @@ class ChannelArgs {
absl::remove_cvref_t<decltype(*store_value)>>::VTable()));
}
template <typename T>
GRPC_MUST_USE_RESULT absl::enable_if_t<
std::is_same<
const grpc_arg_pointer_vtable*,
decltype(ChannelArgTypeTraits<std::shared_ptr<T>>::VTable())>::value,
ChannelArgs>
Set(absl::string_view name, std::shared_ptr<T> value) const {
auto* store_value = new std::shared_ptr<T>(value);
return Set(
name,
Pointer(ChannelArgTypeTraits<std::shared_ptr<T>>::TakeUnownedPointer(
store_value),
ChannelArgTypeTraits<std::shared_ptr<T>>::VTable()));
}
template <typename T>
GRPC_MUST_USE_RESULT ChannelArgs SetIfUnset(absl::string_view name, T value) {
if (Contains(name)) return *this;
return Set(name, std::move(value));
@ -209,12 +299,19 @@ class ChannelArgs {
GRPC_MUST_USE_RESULT ChannelArgs Remove(absl::string_view name) const;
bool Contains(absl::string_view name) const { return Get(name) != nullptr; }
template <typename T>
bool ContainsObject() const {
return Get(ChannelArgNameTraits<T>::ChannelArgName()) != nullptr;
}
absl::optional<int> GetInt(absl::string_view name) const;
absl::optional<absl::string_view> GetString(absl::string_view name) const;
void* GetVoidPointer(absl::string_view name) const;
template <typename T>
T* GetPointer(absl::string_view name) const {
return static_cast<T*>(GetVoidPointer(name));
typename GetObjectImpl<T>::StoredType GetPointer(
absl::string_view name) const {
return static_cast<typename GetObjectImpl<T>::StoredType>(
GetVoidPointer(name));
}
absl::optional<Duration> GetDurationFromIntMillis(
absl::string_view name) const;
@ -234,14 +331,18 @@ class ChannelArgs {
return Set(T::ChannelArgName(), std::move(p));
}
template <typename T>
T* GetObject() {
return GetPointer<T>(T::ChannelArgName());
GRPC_MUST_USE_RESULT ChannelArgs SetObject(std::shared_ptr<T> p) const {
return Set(ChannelArgNameTraits<T>::ChannelArgName(), std::move(p));
}
template <typename T>
RefCountedPtr<T> GetObjectRef() {
auto* p = GetObject<T>();
if (p == nullptr) return nullptr;
return p->Ref();
typename GetObjectImpl<T>::Result GetObject() {
return GetObjectImpl<T>::Get(
GetPointer<T>(ChannelArgNameTraits<T>::ChannelArgName()));
}
template <typename T>
typename GetObjectImpl<T>::ReffedResult GetObjectRef() {
return GetObjectImpl<T>::GetReffed(
GetPointer<T>(ChannelArgNameTraits<T>::ChannelArgName()));
}
bool operator<(const ChannelArgs& other) const { return args_ < other.args_; }

@ -20,6 +20,9 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
namespace grpc_event_engine {
@ -63,5 +66,27 @@ void ResetDefaultEventEngine() {
delete g_event_engine.exchange(nullptr, std::memory_order_acq_rel);
}
namespace {
grpc_core::ChannelArgs EnsureEventEngineInChannelArgs(
grpc_core::ChannelArgs args) {
if (args.ContainsObject<EventEngine>()) return args;
// TODO(hork): Consider deleting GetDefaultEventEngine(), use the factory
// directly when ChannelArgs aren't available. That would eliminate the no-op
// deleter below.
// Store a shared_ptr with a no-op deleter. The default is expected to live
// indefinitely.
return args.SetObject<EventEngine>(
std::shared_ptr<EventEngine>(GetDefaultEventEngine(), [](auto) {}));
}
} // namespace
} // namespace experimental
} // namespace grpc_event_engine
namespace grpc_core {
void RegisterEventEngine(CoreConfiguration::Builder* builder) {
builder->channel_args_preconditioning()->RegisterStage(
grpc_event_engine::experimental::EnsureEventEngineInChannelArgs);
}
} // namespace grpc_core

@ -20,6 +20,8 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/config/core_configuration.h"
namespace grpc_event_engine {
namespace experimental {
@ -35,6 +37,10 @@ std::unique_ptr<EventEngine> DefaultEventEngineFactory();
/// Reset the default event engine
void ResetDefaultEventEngine();
/// On ingress, ensure that an EventEngine exists in channel args via
/// preconditioning.
void RegisterEventEngine(grpc_core::CoreConfiguration::Builder* builder);
} // namespace experimental
} // namespace grpc_event_engine

@ -105,8 +105,10 @@ extern void RegisterFakeResolver(CoreConfiguration::Builder* builder);
#ifdef GPR_SUPPORT_BINDER_TRANSPORT
extern void RegisterBinderResolver(CoreConfiguration::Builder* builder);
#endif
extern void RegisterEventEngine(CoreConfiguration::Builder* builder);
void BuildCoreConfiguration(CoreConfiguration::Builder* builder) {
RegisterEventEngine(builder);
// The order of the handshaker registration is crucial here.
// We want TCP connect handshaker to be registered last so that it is added to
// the start of the handshaker list.

@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -37,6 +38,9 @@
namespace grpc_core {
using ::grpc_event_engine::experimental::CreateEventEngine;
using ::grpc_event_engine::experimental::EventEngine;
TEST(ChannelArgsTest, Noop) { ChannelArgs(); }
TEST(ChannelArgsTest, SetGetRemove) {
@ -86,6 +90,80 @@ TEST(ChannelArgsTest, StoreRefCountedPtr) {
EXPECT_EQ(a.GetPointer<Test>("test")->n, 123);
}
TEST(ChannelArgsTest, StoreSharedPtrEventEngine) {
auto p = std::shared_ptr<EventEngine>(CreateEventEngine());
ChannelArgs a;
a = a.SetObject(p);
Mutex mu;
CondVar cv;
bool triggered = false;
MutexLock lock(&mu);
a.GetObjectRef<EventEngine>()->Run([&mu, &triggered, &cv] {
MutexLock lock(&mu);
triggered = true;
cv.Signal();
});
cv.WaitWithTimeout(&mu, absl::Seconds(1));
ASSERT_TRUE(triggered);
}
TEST(ChannelArgsTest, GetNonOwningEventEngine) {
auto p = std::shared_ptr<EventEngine>(CreateEventEngine());
ChannelArgs a;
a = a.SetObject(p);
ASSERT_FALSE(p.unique());
EventEngine* engine = a.GetObject<EventEngine>();
(void)engine;
// p and the channel args
ASSERT_EQ(p.use_count(), 2);
}
TEST(ChannelArgsTest, StoreAndRetrieveSharedPtr) {
struct Test : public std::enable_shared_from_this<Test> {
explicit Test(int n) : n(n) {}
int n;
static int ChannelArgsCompare(const Test* a, const Test* b) {
return a->n - b->n;
}
static absl::string_view ChannelArgName() { return "grpc.test"; }
};
std::shared_ptr<Test> copied_obj;
{
ChannelArgs a;
auto p = std::make_shared<Test>(42);
EXPECT_TRUE(p.unique());
a = a.SetObject(p);
EXPECT_FALSE(p.unique());
copied_obj = a.GetObjectRef<Test>();
EXPECT_EQ(copied_obj->n, 42);
// Refs: p, copied_obj, and ChannelArgs
EXPECT_EQ(3, copied_obj.use_count());
}
// The p and ChannelArgs are deleted.
EXPECT_TRUE(copied_obj.unique());
EXPECT_EQ(copied_obj->n, 42);
}
TEST(ChannelArgsTest, RetrieveRawPointerFromStoredSharedPtr) {
struct Test : public std::enable_shared_from_this<Test> {
explicit Test(int n) : n(n) {}
int n;
static int ChannelArgsCompare(const Test* a, const Test* b) {
return a->n - b->n;
}
static absl::string_view ChannelArgName() { return "grpc.test"; }
};
ChannelArgs a;
auto p = std::make_shared<Test>(42);
EXPECT_TRUE(p.unique());
a = a.SetObject(p);
EXPECT_FALSE(p.unique());
Test* testp = a.GetObject<Test>();
EXPECT_EQ(testp->n, 42);
// Refs: p and ChannelArgs
EXPECT_EQ(2, p.use_count());
}
TEST(ChannelArgsTest, ObjectApi) {
struct MyFancyObject : public RefCounted<MyFancyObject> {
explicit MyFancyObject(int n) : n(n) {}

Loading…
Cancel
Save