diff --git a/BUILD b/BUILD index 7c7758141be..f39cbe81610 100644 --- a/BUILD +++ b/BUILD @@ -1268,6 +1268,7 @@ grpc_cc_library( external_deps = ["absl/status"], deps = [ "activity", + "context", "default_event_engine", "event_engine_base_hdrs", "exec_ctx", @@ -2731,9 +2732,13 @@ grpc_cc_library( ], external_deps = ["absl/functional:any_invocable"], deps = [ + "context", "default_event_engine_factory", "event_engine_base_hdrs", + "event_engine_trace", "gpr", + "grpc_trace", + "no_destruct", ], ) @@ -3678,6 +3683,7 @@ grpc_cc_library( "closure", "config", "debug_location", + "default_event_engine", "exec_ctx", "exec_ctx_wakeup_scheduler", "gpr", diff --git a/CMakeLists.txt b/CMakeLists.txt index e5794dea330..ad3ea34cdd3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -921,6 +921,7 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx crl_ssl_transport_security_test) endif() + add_dependencies(buildtests_cxx default_engine_methods_test) add_dependencies(buildtests_cxx delegating_channel_test) add_dependencies(buildtests_cxx destroy_grpclb_channel_with_active_connect_stress_test) add_dependencies(buildtests_cxx dns_resolver_cooldown_test) @@ -8689,6 +8690,41 @@ endif() endif() if(gRPC_BUILD_TESTS) +add_executable(default_engine_methods_test + test/core/event_engine/default_engine_methods_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(default_engine_methods_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(default_engine_methods_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + +endif() +if(gRPC_BUILD_TESTS) + add_executable(delegating_channel_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 9e5ab247570..27260339f51 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5378,6 +5378,15 @@ targets: - linux - posix - mac +- name: default_engine_methods_test + gtest: true + build: test + language: c++ + headers: [] + src: + - test/core/event_engine/default_engine_methods_test.cc + deps: + - grpc_test_util - name: delegating_channel_test gtest: true build: test diff --git a/include/grpc/event_engine/event_engine.h b/include/grpc/event_engine/event_engine.h index 52edb5ad8e9..304f2cdfa71 100644 --- a/include/grpc/event_engine/event_engine.h +++ b/include/grpc/event_engine/event_engine.h @@ -72,7 +72,7 @@ namespace experimental { /// server->Wait(); /// //////////////////////////////////////////////////////////////////////////////// -class EventEngine { +class EventEngine : public std::enable_shared_from_this { public: /// A duration between two events. /// @@ -427,16 +427,24 @@ class EventEngine { /// Replace gRPC's default EventEngine factory. /// -/// Applications may call \a SetDefaultEventEngineFactory at any time to replace -/// the default factory used within gRPC. EventEngines will be created when -/// necessary, when they are otherwise not provided by the application. +/// Applications may call \a SetEventEngineFactory time to replace the default +/// factory used within gRPC. EventEngines will be created when necessary, when +/// they are otherwise not provided by the application. /// /// To be certain that none of the gRPC-provided built-in EventEngines are /// created, applications must set a custom EventEngine factory method *before* /// grpc is initialized. -void SetDefaultEventEngineFactory( +void SetEventEngineFactory( absl::AnyInvocable()> factory); +/// Revert to using gRPC's default EventEngine factory. +/// +/// Applications that have called \a SetEventEngineFactory can unregister their +/// custom factory, reverting to use gRPC's built-in default EventEngines. This +/// has no effect on any EventEngines that were already created using the custom +/// factory. +void RevertToDefaultEventEngineFactory(); + /// Create an EventEngine using the default factory. std::unique_ptr CreateEventEngine(); diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.cc b/src/core/ext/filters/channel_idle/channel_idle_filter.cc index c2218ed5c46..6a94b0e6e63 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.cc +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.cc @@ -26,6 +26,7 @@ #include "absl/types/optional.h" +#include #include #include @@ -34,6 +35,7 @@ #include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/closure.h" @@ -52,6 +54,9 @@ namespace grpc_core { namespace { + +using ::grpc_event_engine::experimental::EventEngine; + // TODO(ctiller): The idle filter was disabled in client channel by default // due to b/143502997. Now the bug is fixed enable the filter by default. const auto kDefaultIdleTimeout = Duration::Infinity(); @@ -119,15 +124,19 @@ struct MaxAgeFilter::Config { absl::StatusOr ClientIdleFilter::Create( const ChannelArgs& args, ChannelFilter::Args filter_args) { - ClientIdleFilter filter(filter_args.channel_stack(), - GetClientIdleTimeout(args)); + // TODO(hork): pull EventEngine from args + ClientIdleFilter filter( + filter_args.channel_stack(), GetClientIdleTimeout(args), + grpc_event_engine::experimental::GetDefaultEventEngine()); return absl::StatusOr(std::move(filter)); } absl::StatusOr MaxAgeFilter::Create( const ChannelArgs& args, ChannelFilter::Args filter_args) { + // TODO(hork): pull EventEngine from args MaxAgeFilter filter(filter_args.channel_stack(), - Config::FromChannelArgs(args)); + Config::FromChannelArgs(args), + grpc_event_engine::experimental::GetDefaultEventEngine()); return absl::StatusOr(std::move(filter)); } @@ -194,12 +203,14 @@ void MaxAgeFilter::PostInit() { [this] { return Sleep(ExecCtx::Get()->Now() + max_connection_age_grace_); }), - ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) { + ExecCtxWakeupScheduler(), + [channel_stack, this](absl::Status status) { // OnDone -- close the connection if the promise completed // successfully. // (if it did not, it was cancelled) if (status.ok()) CloseChannel(); - })); + }, + engine_.get())); } } @@ -255,10 +266,12 @@ void ChannelIdleFilter::StartIdleTimer() { } }); }); - activity_.Set(MakeActivity(std::move(promise), ExecCtxWakeupScheduler{}, - [channel_stack, this](absl::Status status) { - if (status.ok()) CloseChannel(); - })); + activity_.Set(MakeActivity( + std::move(promise), ExecCtxWakeupScheduler{}, + [channel_stack, this](absl::Status status) { + if (status.ok()) CloseChannel(); + }, + engine_.get())); } void ChannelIdleFilter::CloseChannel() { @@ -300,10 +313,13 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) { }); } -MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack, - const Config& max_age_config) - : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle), +MaxAgeFilter::MaxAgeFilter( + grpc_channel_stack* channel_stack, const Config& max_age_config, + std::shared_ptr engine) + : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle, + engine), max_connection_age_(max_age_config.max_connection_age), - max_connection_age_grace_(max_age_config.max_connection_age_grace) {} + max_connection_age_grace_(max_age_config.max_connection_age_grace), + engine_(engine) {} } // namespace grpc_core diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.h b/src/core/ext/filters/channel_idle/channel_idle_filter.h index d926d09f71b..bc49a2a0174 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.h +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.h @@ -22,6 +22,7 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" +#include #include #include "src/core/ext/filters/channel_idle/idle_filter_state.h" @@ -58,10 +59,12 @@ class ChannelIdleFilter : public ChannelFilter { using SingleSetActivityPtr = SingleSetPtr; - ChannelIdleFilter(grpc_channel_stack* channel_stack, - Duration client_idle_timeout) + ChannelIdleFilter( + grpc_channel_stack* channel_stack, Duration client_idle_timeout, + std::shared_ptr engine) : channel_stack_(channel_stack), - client_idle_timeout_(client_idle_timeout) {} + client_idle_timeout_(client_idle_timeout), + engine_(engine) {} grpc_channel_stack* channel_stack() { return channel_stack_; }; @@ -87,6 +90,7 @@ class ChannelIdleFilter : public ChannelFilter { std::make_shared(false)}; SingleSetActivityPtr activity_; + std::shared_ptr engine_; }; class ClientIdleFilter final : public ChannelIdleFilter { @@ -127,13 +131,16 @@ class MaxAgeFilter final : public ChannelIdleFilter { MaxAgeFilter* filter_; }; - MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config); + MaxAgeFilter( + grpc_channel_stack* channel_stack, const Config& max_age_config, + std::shared_ptr engine); void Shutdown() override; SingleSetActivityPtr max_age_activity_; Duration max_connection_age_; Duration max_connection_age_grace_; + std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 4069cc66d47..e89dd779e98 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -158,7 +158,6 @@ const char kGrpcLbAddressAttributeKey[] = "grpclb"; namespace { using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; constexpr absl::string_view kGrpclb = "grpclb"; @@ -266,6 +265,7 @@ class GrpcLb : public LoadBalancingPolicy { bool client_load_report_is_due_ = false; // The closure used for the completion of sending the load report. grpc_closure client_load_report_done_closure_; + std::shared_ptr engine_; }; class SubchannelWrapper : public DelegatingSubchannel { @@ -849,7 +849,8 @@ GrpcLb::BalancerCallState::BalancerCallState( : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState" : nullptr), - grpclb_policy_(std::move(parent_grpclb_policy)) { + grpclb_policy_(std::move(parent_grpclb_policy)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { GPR_ASSERT(grpclb_policy_ != nullptr); GPR_ASSERT(!grpclb_policy()->shutting_down_); // Init the LB call. Note that the LB call will progress every time there's @@ -907,7 +908,7 @@ void GrpcLb::BalancerCallState::Orphan() { // call, then the following cancellation will be a no-op. grpc_call_cancel_internal(lb_call_); if (client_load_report_handle_.has_value() && - GetDefaultEventEngine()->Cancel(client_load_report_handle_.value())) { + engine_->Cancel(client_load_report_handle_.value())) { Unref(DEBUG_LOCATION, "client_load_report cancelled"); } // Note that the initial ref is hold by lb_on_balancer_status_received_ @@ -993,7 +994,7 @@ void GrpcLb::BalancerCallState::StartQuery() { void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { client_load_report_handle_ = - GetDefaultEventEngine()->RunAfter(client_stats_report_interval_, [this] { + engine_->RunAfter(client_stats_report_interval_, [this] { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; MaybeSendClientLoadReport(); diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index 98e119bece8..03037b98b4f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -68,7 +68,6 @@ TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb"); namespace { using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; constexpr absl::string_view kWeightedTarget = "weighted_target_experimental"; @@ -197,6 +196,7 @@ class WeightedTargetLb : public LoadBalancingPolicy { RefCountedPtr weighted_child_; absl::optional timer_handle_; + std::shared_ptr engine_; }; // Methods for dealing with the child policy. @@ -453,9 +453,10 @@ void WeightedTargetLb::UpdateStateLocked() { WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer( RefCountedPtr weighted_child) - : weighted_child_(std::move(weighted_child)) { - timer_handle_ = GetDefaultEventEngine()->RunAfter( - kChildRetentionInterval, [self = Ref()]() mutable { + : weighted_child_(std::move(weighted_child)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { + timer_handle_ = + engine_->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable { self->weighted_child_->weighted_target_policy_->work_serializer()->Run( [self = std::move(self)] { self->OnTimerLocked(); }, DEBUG_LOCATION); @@ -471,7 +472,7 @@ void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() { weighted_child_->weighted_target_policy_.get(), weighted_child_.get(), weighted_child_->name_.c_str()); } - GetDefaultEventEngine()->Cancel(*timer_handle_); + engine_->Cancel(*timer_handle_); } Unref(); } @@ -681,7 +682,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return MakeOrphanable(std::move(args)); - } + } // namespace absl::string_view name() const override { return kWeightedTarget; } @@ -765,7 +766,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { } return child_config; } -}; +}; // namespace grpc_core } // namespace diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 05ce63dabb4..48ea65c669f 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -83,7 +83,6 @@ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) namespace grpc_core { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; TraceFlag grpc_trace_subchannel(false, "subchannel"); DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); @@ -631,7 +630,8 @@ Subchannel::Subchannel(SubchannelKey key, args_(args), pollset_set_(grpc_pollset_set_create()), connector_(std::move(connector)), - backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)) { + backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { // A grpc_init is added here to ensure that grpc_shutdown does not happen // until the subchannel is destroyed. Subchannels can persist longer than // channels because they maybe reused/shared among multiple channels. As a @@ -766,7 +766,7 @@ void Subchannel::ResetBackoff() { MutexLock lock(&mu_); backoff_.Reset(); if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && - GetDefaultEventEngine()->Cancel(retry_timer_handle_)) { + engine_->Cancel(retry_timer_handle_)) { OnRetryTimerLocked(); } else if (state_ == GRPC_CHANNEL_CONNECTING) { next_attempt_time_ = ExecCtx::Get()->Now(); @@ -909,7 +909,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { time_until_next_attempt.millis()); SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error)); - retry_timer_handle_ = GetDefaultEventEngine()->RunAfter( + retry_timer_handle_ = engine_->RunAfter( time_until_next_attempt, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { { diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 99dbda1dbef..1e4450173d0 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -23,6 +23,7 @@ #include #include +#include #include #include "absl/base/thread_annotations.h" @@ -419,6 +420,7 @@ class Subchannel : public DualRefCounted { // Data producer map. std::map data_producer_map_ ABSL_GUARDED_BY(mu_); + std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 9d26eca3f2e..2d39997cc62 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -57,7 +57,6 @@ namespace grpc_core { using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; TraceFlag grpc_xds_client_trace(false, "xds_client"); TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount"); @@ -191,7 +190,7 @@ class XdsClient::ChannelState::AdsCallState if (state.resource != nullptr) return; // Start timer. ads_calld_ = std::move(ads_calld); - timer_handle_ = GetDefaultEventEngine()->RunAfter( + timer_handle_ = ads_calld_->xds_client()->engine()->RunAfter( ads_calld_->xds_client()->request_timeout_, [self = Ref(DEBUG_LOCATION, "timer")]() { ApplicationCallbackExecCtx callback_exec_ctx; @@ -214,7 +213,7 @@ class XdsClient::ChannelState::AdsCallState // TODO(roth): Find a way to write a test for this case. timer_start_needed_ = false; if (timer_handle_.has_value()) { - GetDefaultEventEngine()->Cancel(*timer_handle_); + ads_calld_->xds_client()->engine()->Cancel(*timer_handle_); timer_handle_.reset(); } } @@ -565,7 +564,7 @@ void XdsClient::ChannelState::RetryableCall::Orphan() { shutting_down_ = true; calld_.reset(); if (timer_handle_.has_value()) { - GetDefaultEventEngine()->Cancel(*timer_handle_); + chand()->xds_client()->engine()->Cancel(*timer_handle_); timer_handle_.reset(); } this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned"); @@ -608,7 +607,7 @@ void XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked() { chand()->xds_client(), chand()->server_.server_uri.c_str(), timeout.millis()); } - timer_handle_ = GetDefaultEventEngine()->RunAfter( + timer_handle_ = chand()->xds_client()->engine()->RunAfter( timeout, [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() { ApplicationCallbackExecCtx callback_exec_ctx; @@ -1106,7 +1105,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { if (timer_handle_.has_value() && - GetDefaultEventEngine()->Cancel(*timer_handle_)) { + xds_client()->engine()->Cancel(*timer_handle_)) { timer_handle_.reset(); Unref(DEBUG_LOCATION, "Orphan"); } @@ -1114,7 +1113,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { void XdsClient::ChannelState::LrsCallState::Reporter:: ScheduleNextReportLocked() { - timer_handle_ = GetDefaultEventEngine()->RunAfter(report_interval_, [this]() { + timer_handle_ = xds_client()->engine()->RunAfter(report_interval_, [this]() { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; if (OnNextReportTimer()) { @@ -1385,7 +1384,8 @@ XdsClient::XdsClient(std::unique_ptr bootstrap, transport_factory_(std::move(transport_factory)), request_timeout_(resource_request_timeout), xds_federation_enabled_(XdsFederationEnabled()), - api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_) { + api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); } diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index f51eddbc561..a6f3debdd0a 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -32,6 +32,8 @@ #include "absl/strings/string_view.h" #include "upb/def.hpp" +#include + #include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client_stats.h" @@ -146,6 +148,10 @@ class XdsClient : public DualRefCounted { // implementation. std::string DumpClientConfigBinary(); + grpc_event_engine::experimental::EventEngine* engine() { + return engine_.get(); + } + private: struct XdsResourceKey { std::string id; @@ -301,6 +307,7 @@ class XdsClient : public DualRefCounted { const bool xds_federation_enabled_; XdsApi api_; WorkSerializer work_serializer_; + std::shared_ptr engine_; Mutex mu_; diff --git a/src/core/lib/channel/promise_based_filter.cc b/src/core/lib/channel/promise_based_filter.cc index 61c1a1a256b..6b903b65de8 100644 --- a/src/core/lib/channel/promise_based_filter.cc +++ b/src/core/lib/channel/promise_based_filter.cc @@ -27,6 +27,7 @@ #include #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/slice/slice.h" @@ -44,7 +45,8 @@ BaseCallData::BaseCallData(grpc_call_element* elem, arena_(args->arena), call_combiner_(args->call_combiner), deadline_(args->deadline), - context_(args->context) { + context_(args->context), + event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { if (flags & kFilterExaminesServerInitialMetadata) { server_initial_metadata_latch_ = arena_->New>(); } diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index e7e816a54b3..ee15f71ca30 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -25,12 +25,14 @@ #include #include +#include #include #include #include "absl/container/inlined_vector.h" #include "absl/meta/type_traits.h" +#include #include #include @@ -39,6 +41,7 @@ #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" +#include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/call_combiner.h" @@ -151,7 +154,9 @@ class BaseCallData : public Activity, private Wakeable { : public promise_detail::Context, public promise_detail::Context, public promise_detail::Context, - public promise_detail::Context { + public promise_detail::Context, + public promise_detail::Context< + grpc_event_engine::experimental::EventEngine> { public: explicit ScopedContext(BaseCallData* call_data) : promise_detail::Context(call_data->arena_), @@ -159,8 +164,9 @@ class BaseCallData : public Activity, private Wakeable { call_data->context_), promise_detail::Context( call_data->pollent_.load(std::memory_order_acquire)), - promise_detail::Context(&call_data->finalization_) { - } + promise_detail::Context(&call_data->finalization_), + promise_detail::Context( + call_data->event_engine_.get()) {} }; class Flusher { @@ -267,6 +273,7 @@ class BaseCallData : public Activity, private Wakeable { grpc_call_context_element* const context_; std::atomic pollent_{nullptr}; Latch* server_initial_metadata_latch_ = nullptr; + std::shared_ptr event_engine_; }; class ClientCallData : public BaseCallData { diff --git a/src/core/lib/event_engine/default_event_engine.cc b/src/core/lib/event_engine/default_event_engine.cc index 70c4eb17175..a69a255fa6f 100644 --- a/src/core/lib/event_engine/default_event_engine.cc +++ b/src/core/lib/event_engine/default_event_engine.cc @@ -23,7 +23,11 @@ #include +#include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/default_event_engine_factory.h" +#include "src/core/lib/event_engine/trace.h" +#include "src/core/lib/gprpp/no_destruct.h" +#include "src/core/lib/gprpp/sync.h" namespace grpc_event_engine { namespace experimental { @@ -31,14 +35,22 @@ namespace experimental { namespace { std::atomic()>*> g_event_engine_factory{nullptr}; -std::atomic g_event_engine{nullptr}; +grpc_core::NoDestruct g_mu; +grpc_core::NoDestruct> g_event_engine; } // namespace -void SetDefaultEventEngineFactory( +void SetEventEngineFactory( absl::AnyInvocable()> factory) { delete g_event_engine_factory.exchange( new absl::AnyInvocable()>( std::move(factory))); + // Forget any previous EventEngines + grpc_core::MutexLock lock(&*g_mu); + g_event_engine->reset(); +} + +void RevertToDefaultEventEngineFactory() { + delete g_event_engine_factory.exchange(nullptr); } std::unique_ptr CreateEventEngine() { @@ -48,23 +60,22 @@ std::unique_ptr CreateEventEngine() { return DefaultEventEngineFactory(); } -EventEngine* GetDefaultEventEngine() { - EventEngine* engine = g_event_engine.load(std::memory_order_acquire); - if (engine == nullptr) { - auto* created = CreateEventEngine().release(); - if (g_event_engine.compare_exchange_strong(engine, created, - std::memory_order_acq_rel, - std::memory_order_acquire)) { - engine = created; - } else { - delete created; - } +std::shared_ptr GetDefaultEventEngine() { + grpc_core::MutexLock lock(&*g_mu); + if (std::shared_ptr engine = g_event_engine->lock()) { + GRPC_EVENT_ENGINE_TRACE("DefaultEventEngine::%p use_count:%ld", + engine.get(), engine.use_count()); + return engine; } + std::shared_ptr engine{CreateEventEngine()}; + GRPC_EVENT_ENGINE_TRACE("Created DefaultEventEngine::%p", engine.get()); + *g_event_engine = engine; return engine; } void ResetDefaultEventEngine() { - delete g_event_engine.exchange(nullptr, std::memory_order_acq_rel); + grpc_core::MutexLock lock(&*g_mu); + g_event_engine->reset(); } } // namespace experimental diff --git a/src/core/lib/event_engine/default_event_engine.h b/src/core/lib/event_engine/default_event_engine.h index 1a182bd73f3..88181ad423e 100644 --- a/src/core/lib/event_engine/default_event_engine.h +++ b/src/core/lib/event_engine/default_event_engine.h @@ -17,8 +17,17 @@ #include +#include + #include +#include "src/core/lib/promise/context.h" + +namespace grpc_core { +template <> +struct ContextType {}; +} // namespace grpc_core + namespace grpc_event_engine { namespace experimental { @@ -26,7 +35,7 @@ namespace experimental { /// /// The concept of a global EventEngine may go away in a post-iomgr world. /// Strongly consider whether you could use \a CreateEventEngine instead. -EventEngine* GetDefaultEventEngine(); +std::shared_ptr GetDefaultEventEngine(); /// Reset the default event engine void ResetDefaultEventEngine(); diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 3c96ac0f4d7..0319820b86e 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -46,8 +46,6 @@ namespace grpc_core { namespace { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - class NativeDNSRequest { public: NativeDNSRequest( @@ -80,6 +78,9 @@ class NativeDNSRequest { } // namespace +NativeDNSResolver::NativeDNSResolver() + : engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {} + NativeDNSResolver* NativeDNSResolver::GetOrCreate() { static NativeDNSResolver* instance = new NativeDNSResolver(); return instance; @@ -183,7 +184,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupSRV( absl::string_view /* name */, Duration /* timeout */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Native resolver does not support looking up SRV records")); }); @@ -196,7 +197,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupTXT( grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Native resolver does not support looking up TXT records")); }); diff --git a/src/core/lib/iomgr/resolve_address_posix.h b/src/core/lib/iomgr/resolve_address_posix.h index a07163d144e..3e14ed29b3c 100644 --- a/src/core/lib/iomgr/resolve_address_posix.h +++ b/src/core/lib/iomgr/resolve_address_posix.h @@ -57,6 +57,10 @@ class NativeDNSResolver : public DNSResolver { // NativeDNSResolver does not support cancellation. bool Cancel(TaskHandle handle) override; + + private: + NativeDNSResolver(); + std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index b6a6cef4091..ae057a93d23 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -49,8 +49,6 @@ namespace grpc_core { namespace { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - class NativeDNSRequest { public: NativeDNSRequest( @@ -83,6 +81,9 @@ class NativeDNSRequest { } // namespace +NativeDNSResolver::NativeDNSResolver() + : engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {} + NativeDNSResolver* NativeDNSResolver::GetOrCreate() { static NativeDNSResolver* instance = new NativeDNSResolver(); return instance; @@ -167,7 +168,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupSRV( absl::string_view /* name */, Duration /* deadline */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Native resolver does not support looking up SRV records")); }); @@ -180,7 +181,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupTXT( grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Native resolver does not support looking up TXT records")); }); diff --git a/src/core/lib/iomgr/resolve_address_windows.h b/src/core/lib/iomgr/resolve_address_windows.h index 901d47331d3..23ce068dcc1 100644 --- a/src/core/lib/iomgr/resolve_address_windows.h +++ b/src/core/lib/iomgr/resolve_address_windows.h @@ -57,6 +57,10 @@ class NativeDNSResolver : public DNSResolver { // NativeDNSResolver does not support cancellation. bool Cancel(TaskHandle handle) override; + + private: + NativeDNSResolver(); + std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index 3bdc4d4a59d..7bbf024e6d4 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -430,8 +430,8 @@ class PromiseActivity final : public FreestandingActivity, // Notification that we're no longer executing - it's ok to destruct the // promise. void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { - GPR_ASSERT(!done_); - done_ = true; + GPR_ASSERT(!absl::exchange(done_, true)); + ScopedContext contexts(this); Destruct(&promise_holder_.promise); } diff --git a/src/core/lib/promise/sleep.cc b/src/core/lib/promise/sleep.cc index 8ff910283df..9e66ba0411f 100644 --- a/src/core/lib/promise/sleep.cc +++ b/src/core/lib/promise/sleep.cc @@ -19,15 +19,17 @@ #include #include +#include -#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/context.h" namespace grpc_core { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; +using ::grpc_event_engine::experimental::EventEngine; Sleep::Sleep(Timestamp deadline) : deadline_(deadline) {} @@ -50,9 +52,12 @@ Poll Sleep::operator()() { } Sleep::ActiveClosure::ActiveClosure(Timestamp deadline) - : waker_(Activity::current()->MakeOwningWaker()), - timer_handle_(GetDefaultEventEngine()->RunAfter( - deadline - ExecCtx::Get()->Now(), this)) {} + : waker_(Activity::current()->MakeOwningWaker()) { + auto engine = GetContext(); + GPR_ASSERT(engine != nullptr && + "An EventEngine context is required for Promise Sleep"); + timer_handle_ = engine->RunAfter(deadline - ExecCtx::Get()->Now(), this); +} void Sleep::ActiveClosure::Run() { ApplicationCallbackExecCtx callback_exec_ctx; @@ -69,7 +74,7 @@ void Sleep::ActiveClosure::Cancel() { // If we cancel correctly then we must own both refs still and can simply // delete without unreffing twice, otherwise try unreffing since this may be // the last owned ref. - if (GetDefaultEventEngine()->Cancel(timer_handle_) || refs_.Unref()) { + if (GetContext()->Cancel(timer_handle_) || refs_.Unref()) { delete this; } } diff --git a/src/core/lib/promise/sleep.h b/src/core/lib/promise/sleep.h index f791eff48c3..c86e82f6f0f 100644 --- a/src/core/lib/promise/sleep.h +++ b/src/core/lib/promise/sleep.h @@ -66,8 +66,7 @@ class Sleep final { Waker waker_; // One ref dropped by Run(), the other by Cancel(). RefCount refs_{2}; - const grpc_event_engine::experimental::EventEngine::TaskHandle - timer_handle_; + grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_; }; Timestamp deadline_; diff --git a/src/cpp/server/orca/orca_service.cc b/src/cpp/server/orca/orca_service.cc index 21c25413fbb..9bb70db1723 100644 --- a/src/cpp/server/orca/orca_service.cc +++ b/src/cpp/server/orca/orca_service.cc @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -54,7 +55,6 @@ namespace grpc { namespace experimental { using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; // // OrcaService::Reactor @@ -64,7 +64,9 @@ class OrcaService::Reactor : public ServerWriteReactor, public grpc_core::RefCounted { public: explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer) - : RefCounted("OrcaService::Reactor"), service_(service) { + : RefCounted("OrcaService::Reactor"), + service_(service), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { // Get slice from request. Slice slice; GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok()); @@ -127,7 +129,7 @@ class OrcaService::Reactor : public ServerWriteReactor, grpc_core::ExecCtx exec_ctx; grpc::internal::MutexLock lock(&timer_mu_); if (cancelled_) return false; - timer_handle_ = GetDefaultEventEngine()->RunAfter( + timer_handle_ = engine_->RunAfter( report_interval_, [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); }); return true; @@ -136,8 +138,7 @@ class OrcaService::Reactor : public ServerWriteReactor, bool MaybeCancelTimer() { grpc::internal::MutexLock lock(&timer_mu_); cancelled_ = true; - if (timer_handle_.has_value() && - GetDefaultEventEngine()->Cancel(*timer_handle_)) { + if (timer_handle_.has_value() && engine_->Cancel(*timer_handle_)) { timer_handle_.reset(); return true; } @@ -161,6 +162,7 @@ class OrcaService::Reactor : public ServerWriteReactor, grpc_core::Duration report_interval_; ByteBuffer response_; + std::shared_ptr engine_; }; // diff --git a/src/php/bin/run_tests.sh b/src/php/bin/run_tests.sh index 6dd66b9fcb1..6b204bc7b61 100755 --- a/src/php/bin/run_tests.sh +++ b/src/php/bin/run_tests.sh @@ -47,8 +47,12 @@ if [ -x "$(command -v valgrind)" ]; then # TODO(jtattermusch): reenable the test once https://github.com/grpc/grpc/issues/29098 is fixed. if [ "$(uname -m)" != "aarch64" ]; then $(which valgrind) --error-exitcode=10 --leak-check=yes \ + -v \ + --num-callers=30 \ + --suppressions=../tests/MemoryLeakTest/ignore_leaks.supp \ $VALGRIND_UNDEF_VALUE_ERRORS \ $(which php) $extension_dir -d max_execution_time=300 \ ../tests/MemoryLeakTest/MemoryLeakTest.php fi fi + diff --git a/src/php/tests/MemoryLeakTest/ignore_leaks.supp b/src/php/tests/MemoryLeakTest/ignore_leaks.supp new file mode 100644 index 00000000000..dccd088d905 --- /dev/null +++ b/src/php/tests/MemoryLeakTest/ignore_leaks.supp @@ -0,0 +1,13 @@ +{ + static Posix NativeDNSResolver + Memcheck:Leak + match-leak-kinds: possible + ... + fun:pthread_create@@GLIBC_2.2.5 + ... + fun:_ZN17grpc_event_engine12experimental21GetDefaultEventEngineEv + fun:_ZN9grpc_core17NativeDNSResolverC1Ev + fun:_ZN9grpc_core17NativeDNSResolver11GetOrCreateEv + ... +} + diff --git a/test/core/channel/channel_args_test.cc b/test/core/channel/channel_args_test.cc index ee5559ae0a9..12cf8d5f085 100644 --- a/test/core/channel/channel_args_test.cc +++ b/test/core/channel/channel_args_test.cc @@ -28,6 +28,7 @@ #include #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index 63cb1f8a3c6..45e370c31db 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include @@ -62,12 +63,12 @@ static struct iomgr_args { namespace { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - grpc_core::DNSResolver* g_default_dns_resolver; class TestDNSResolver : public grpc_core::DNSResolver { public: + TestDNSResolver() + : engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {} // Wrapper around default resolve_address in order to count the number of // times we incur in a system-level name resolution. TaskHandle LookupHostname( @@ -113,7 +114,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { absl::string_view /* name */, grpc_core::Duration /* timeout */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Testing DNS resolver does not support looking up SRV records")); }); @@ -126,7 +127,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Testing DNS resolver does not support looking up TXT records")); }); @@ -135,6 +136,9 @@ class TestDNSResolver : public grpc_core::DNSResolver { // Not cancellable bool Cancel(TaskHandle /*handle*/) override { return false; } + + private: + std::shared_ptr engine_; }; } // namespace diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index a36e3cebfc0..218651002d2 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -138,6 +138,7 @@ static void finish_resolve(void* arg, grpc_error_handle error) { namespace { +using ::grpc_event_engine::experimental::FuzzingEventEngine; using ::grpc_event_engine::experimental::GetDefaultEventEngine; class FuzzerDNSResolver : public grpc_core::DNSResolver { @@ -178,8 +179,8 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver { }; // Gets the singleton instance, possibly creating it first - static FuzzerDNSResolver* GetOrCreate() { - static FuzzerDNSResolver* instance = new FuzzerDNSResolver(); + static FuzzerDNSResolver* GetOrCreate(FuzzingEventEngine* engine) { + static FuzzerDNSResolver* instance = new FuzzerDNSResolver(engine); return instance; } @@ -206,7 +207,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver { absl::string_view /* name */, grpc_core::Duration /* timeout */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Fuzzing DNS resolver does not support looking up SRV records")); }); @@ -219,7 +220,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver { grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Fuzing DNS resolver does not support looking up TXT records")); }); @@ -228,6 +229,10 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver { // FuzzerDNSResolver does not support cancellation. bool Cancel(TaskHandle /*handle*/) override { return false; } + + private: + explicit FuzzerDNSResolver(FuzzingEventEngine* engine) : engine_(engine) {} + FuzzingEventEngine* engine_; }; } // namespace @@ -818,21 +823,22 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log); gpr_free(grpc_trace_fuzzer); grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable); - grpc_event_engine::experimental::SetDefaultEventEngineFactory( + + grpc_event_engine::experimental::SetEventEngineFactory( [actions = msg.event_engine_actions()]() { - return absl::make_unique< - grpc_event_engine::experimental::FuzzingEventEngine>( - grpc_event_engine::experimental::FuzzingEventEngine::Options(), - actions); + return absl::make_unique( + FuzzingEventEngine::Options(), actions); }); - grpc_event_engine::experimental::GetDefaultEventEngine(); + auto engine = + std::dynamic_pointer_cast(GetDefaultEventEngine()); + FuzzingEventEngine::SetGlobalNowImplEngine(engine.get()); grpc_init(); grpc_timer_manager_set_threading(false); { grpc_core::ExecCtx exec_ctx; grpc_core::Executor::SetThreadingAll(false); } - grpc_core::SetDNSResolver(FuzzerDNSResolver::GetOrCreate()); + grpc_core::SetDNSResolver(FuzzerDNSResolver::GetOrCreate(engine.get())); grpc_dns_lookup_hostname_ares = my_dns_lookup_ares; grpc_cancel_ares_request = my_cancel_ares_request; @@ -869,14 +875,10 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { while (action_index < msg.actions_size() || g_channel != nullptr || g_server != nullptr || pending_channel_watches > 0 || pending_pings > 0 || ActiveCall() != nullptr) { - static_cast( - grpc_event_engine::experimental::GetDefaultEventEngine()) - ->Tick(); + engine->Tick(); if (action_index == msg.actions_size()) { - static_cast( - grpc_event_engine::experimental::GetDefaultEventEngine()) - ->FuzzingDone(); + engine->FuzzingDone(); if (g_channel != nullptr) { grpc_channel_destroy(g_channel); g_channel = nullptr; @@ -1223,4 +1225,5 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { grpc_resource_quota_unref(g_resource_quota); grpc_shutdown_blocking(); + FuzzingEventEngine::UnsetGlobalNowImplEngine(engine.get()); } diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc index 8e031868f78..99156b9e161 100644 --- a/test/core/end2end/goaway_server_test.cc +++ b/test/core/end2end/goaway_server_test.cc @@ -82,12 +82,12 @@ static void set_resolve_port(int port) { namespace { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - grpc_core::DNSResolver* g_default_dns_resolver; class TestDNSResolver : public grpc_core::DNSResolver { public: + TestDNSResolver() + : engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {} TaskHandle LookupHostname( std::function>)> on_resolved, @@ -114,7 +114,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { absl::string_view /* name */, grpc_core::Duration /* timeout */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Testing DNS resolver does not support looking up SRV records")); }); @@ -127,7 +127,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { on_resolved(absl::UnimplementedError( "The Testing DNS resolver does not support looking up TXT records")); }); @@ -159,6 +159,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { std::move(addrs)); } } + std::shared_ptr engine_; }; } // namespace diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index dea38abe6bc..d1a01393a66 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -61,6 +61,19 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "default_engine_methods_test", + srcs = ["default_engine_methods_test.cc"], + external_deps = ["gtest"], + deps = [ + "//:default_event_engine", + "//:event_engine_base_hdrs", + "//:gpr", + "//:grpc", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_test( name = "smoke_test", srcs = ["smoke_test.cc"], diff --git a/test/core/event_engine/default_engine_methods_test.cc b/test/core/event_engine/default_engine_methods_test.cc new file mode 100644 index 00000000000..94e6fbf8be3 --- /dev/null +++ b/test/core/event_engine/default_engine_methods_test.cc @@ -0,0 +1,129 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include + +#include +#include + +#include +#include + +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/gprpp/sync.h" +#include "test/core/util/test_config.h" + +namespace { + +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + +class DefaultEngineTest : public testing::Test { + protected: + // Does nothing, fills space that a nullptr could not + class FakeEventEngine : public EventEngine { + public: + FakeEventEngine() = default; + ~FakeEventEngine() override = default; + absl::StatusOr> CreateListener( + Listener::AcceptCallback /* on_accept */, + absl::AnyInvocable /* on_shutdown */, + const grpc_event_engine::experimental::EndpointConfig& /* config */, + std::unique_ptr< + grpc_event_engine::experimental:: + MemoryAllocatorFactory> /* memory_allocator_factory */) + override { + return absl::UnimplementedError("test"); + }; + ConnectionHandle Connect( + OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */, + const grpc_event_engine::experimental::EndpointConfig& /* args */, + grpc_event_engine::experimental::MemoryAllocator /* memory_allocator */, + Duration /* timeout */) override { + return {-1, -1}; + }; + bool CancelConnect(ConnectionHandle /* handle */) override { + return false; + }; + bool IsWorkerThread() override { return false; }; + std::unique_ptr GetDNSResolver( + const DNSResolver::ResolverOptions& /* options */) override { + return nullptr; + }; + void Run(Closure* /* closure */) override{}; + void Run(absl::AnyInvocable /* closure */) override{}; + TaskHandle RunAfter(Duration /* when */, Closure* /* closure */) override { + return {-1, -1}; + } + TaskHandle RunAfter(Duration /* when */, + absl::AnyInvocable /* closure */) override { + return {-1, -1}; + } + bool Cancel(TaskHandle /* handle */) override { return false; }; + }; +}; + +TEST_F(DefaultEngineTest, SharedPtrGlobalEventEngineLifetimesAreValid) { + int create_count = 0; + grpc_event_engine::experimental::SetEventEngineFactory([&create_count] { + ++create_count; + return absl::make_unique(); + }); + std::shared_ptr ee2; + { + std::shared_ptr ee1 = GetDefaultEventEngine(); + ASSERT_EQ(1, create_count); + ee2 = GetDefaultEventEngine(); + ASSERT_EQ(1, create_count); + ASSERT_EQ(ee2.use_count(), 2); + } + // Ensure the first shared_ptr did not delete the global + ASSERT_TRUE(ee2.unique()); + ASSERT_FALSE(ee2->IsWorkerThread()); // useful for ASAN + // destroy the global engine via the last shared_ptr, and create a new one. + ee2.reset(); + ee2 = GetDefaultEventEngine(); + ASSERT_EQ(2, create_count); + ASSERT_TRUE(ee2.unique()); + grpc_event_engine::experimental::RevertToDefaultEventEngineFactory(); +} + +TEST_F(DefaultEngineTest, StressTestSharedPtr) { + constexpr int thread_count = 13; + constexpr absl::Duration spin_time = absl::Seconds(3); + std::vector threads; + threads.reserve(thread_count); + for (int i = 0; i < thread_count; i++) { + threads.emplace_back([&spin_time] { + auto timeout = absl::Now() + spin_time; + do { + GetDefaultEventEngine().reset(); + } while (timeout > absl::Now()); + }); + } + for (auto& thd : threads) { + thd.join(); + } +} +} // namespace + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/core/event_engine/factory_test.cc b/test/core/event_engine/factory_test.cc new file mode 100644 index 00000000000..a4daabe9977 --- /dev/null +++ b/test/core/event_engine/factory_test.cc @@ -0,0 +1,79 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include + +#include "src/core/lib/event_engine/default_event_engine.h" +#include "test/core/event_engine/util/aborting_event_engine.h" +#include "test/core/util/test_config.h" + +namespace { +using ::grpc_event_engine::experimental::AbortingEventEngine; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::EventEngineFactoryReset; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; +using ::grpc_event_engine::experimental::SetEventEngineFactory; + +class EventEngineFactoryTest : public testing::Test { + public: + EventEngineFactoryTest() = default; + ~EventEngineFactoryTest() { EventEngineFactoryReset(); } +}; + +TEST_F(EventEngineFactoryTest, CustomFactoryIsUsed) { + int counter{0}; + SetEventEngineFactory([&counter] { + ++counter; + return absl::make_unique(); + }); + auto ee1 = GetDefaultEventEngine(); + ASSERT_EQ(counter, 1); + auto ee2 = GetDefaultEventEngine(); + ASSERT_EQ(counter, 1); + ASSERT_EQ(ee1, ee2); +} + +TEST_F(EventEngineFactoryTest, FactoryResetWorks) { + // eliminate a global default if one has been created already. + EventEngineFactoryReset(); + int counter{0}; + SetEventEngineFactory([&counter]() -> std::unique_ptr { + // called at most twice; + EXPECT_LE(++counter, 2); + return absl::make_unique(); + }); + auto custom_ee = GetDefaultEventEngine(); + ASSERT_EQ(counter, 1); + auto same_ee = GetDefaultEventEngine(); + ASSERT_EQ(custom_ee, same_ee); + ASSERT_EQ(counter, 1); + EventEngineFactoryReset(); + auto default_ee = GetDefaultEventEngine(); + ASSERT_NE(custom_ee, default_ee); +} +} // namespace + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/core/event_engine/fuzzing_event_engine/BUILD b/test/core/event_engine/fuzzing_event_engine/BUILD index 3766178ff34..ec67a7c2675 100644 --- a/test/core/event_engine/fuzzing_event_engine/BUILD +++ b/test/core/event_engine/fuzzing_event_engine/BUILD @@ -27,6 +27,7 @@ grpc_cc_library( hdrs = ["fuzzing_event_engine.h"], deps = [ ":fuzzing_event_engine_proto", + "//:default_event_engine", "//:event_engine_base_hdrs", "//:time", ], diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc index 04767976d94..049171dad27 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc @@ -31,16 +31,12 @@ namespace experimental { namespace { const intptr_t kTaskHandleSalt = 12345; FuzzingEventEngine* g_fuzzing_event_engine = nullptr; +gpr_timespec (*g_orig_gpr_now_impl)(gpr_clock_type clock_type); } // namespace FuzzingEventEngine::FuzzingEventEngine( Options options, const fuzzing_event_engine::Actions& actions) : final_tick_length_(options.final_tick_length) { - GPR_ASSERT(g_fuzzing_event_engine == nullptr); - g_fuzzing_event_engine = this; - - gpr_now_impl = GlobalNowImpl; - tick_increments_.clear(); task_delays_.clear(); tasks_by_id_.clear(); @@ -57,7 +53,8 @@ FuzzingEventEngine::FuzzingEventEngine( grpc_core::TestOnlySetProcessEpoch(NowAsTimespec(GPR_CLOCK_MONOTONIC)); auto update_delay = [](std::map* map, - fuzzing_event_engine::Delay delay, Duration max) { + const fuzzing_event_engine::Delay& delay, + Duration max) { auto& value = (*map)[delay.id()]; if (delay.delay_us() > static_cast(max.count() / GPR_NS_PER_US)) { value = max; @@ -84,11 +81,6 @@ void FuzzingEventEngine::FuzzingDone() { tick_increments_.clear(); } -FuzzingEventEngine::~FuzzingEventEngine() { - GPR_ASSERT(g_fuzzing_event_engine == this); - g_fuzzing_event_engine = nullptr; -} - gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) { // TODO(ctiller): add a facility to track realtime and monotonic clocks // separately to simulate divergence. @@ -98,12 +90,6 @@ gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) { return {secs.count(), static_cast((d - secs).count()), clock_type}; } -gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) { - GPR_ASSERT(g_fuzzing_event_engine != nullptr); - grpc_core::MutexLock lock(&g_fuzzing_event_engine->mu_); - return g_fuzzing_event_engine->NowAsTimespec(clock_type); -} - void FuzzingEventEngine::Tick() { std::vector> to_run; { @@ -207,5 +193,25 @@ bool FuzzingEventEngine::Cancel(TaskHandle handle) { return true; } +gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) { + GPR_ASSERT(g_fuzzing_event_engine != nullptr); + grpc_core::MutexLock lock(&g_fuzzing_event_engine->mu_); + return g_fuzzing_event_engine->NowAsTimespec(clock_type); +} + +void FuzzingEventEngine::SetGlobalNowImplEngine(FuzzingEventEngine* engine) { + GPR_ASSERT(g_fuzzing_event_engine == nullptr); + g_fuzzing_event_engine = engine; + g_orig_gpr_now_impl = gpr_now_impl; + gpr_now_impl = GlobalNowImpl; +} + +void FuzzingEventEngine::UnsetGlobalNowImplEngine(FuzzingEventEngine* engine) { + GPR_ASSERT(g_fuzzing_event_engine == engine); + g_fuzzing_event_engine = nullptr; + gpr_now_impl = g_orig_gpr_now_impl; + g_orig_gpr_now_impl = nullptr; +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h index 97e8295a575..a18bc613e62 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h @@ -40,7 +40,7 @@ class FuzzingEventEngine : public EventEngine { }; explicit FuzzingEventEngine(Options options, const fuzzing_event_engine::Actions& actions); - ~FuzzingEventEngine() override; + ~FuzzingEventEngine() override = default; void FuzzingDone(); void Tick(); @@ -76,6 +76,11 @@ class FuzzingEventEngine : public EventEngine { Time Now() ABSL_LOCKS_EXCLUDED(mu_); + static void SetGlobalNowImplEngine(FuzzingEventEngine* engine) + ABSL_LOCKS_EXCLUDED(mu_); + static void UnsetGlobalNowImplEngine(FuzzingEventEngine* engine) + ABSL_LOCKS_EXCLUDED(mu_); + private: struct Task { Task(intptr_t id, absl::AnyInvocable closure) @@ -88,7 +93,6 @@ class FuzzingEventEngine : public EventEngine { ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); static gpr_timespec GlobalNowImpl(gpr_clock_type clock_type) ABSL_LOCKS_EXCLUDED(mu_); - const Duration final_tick_length_; grpc_core::Mutex mu_; diff --git a/test/core/event_engine/posix/lock_free_event_test.cc b/test/core/event_engine/posix/lock_free_event_test.cc index f142649b6f4..b3cd43a1b2a 100644 --- a/test/core/event_engine/posix/lock_free_event_test.cc +++ b/test/core/event_engine/posix/lock_free_event_test.cc @@ -26,20 +26,21 @@ #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/gprpp/sync.h" +using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::posix_engine::Scheduler; namespace { class TestScheduler : public Scheduler { public: - explicit TestScheduler(grpc_event_engine::experimental::EventEngine* engine) - : engine_(engine) {} + explicit TestScheduler(std::shared_ptr engine) + : engine_(std::move(engine)) {} void Run( grpc_event_engine::experimental::EventEngine::Closure* closure) override { engine_->Run(closure); } private: - grpc_event_engine::experimental::EventEngine* engine_; + std::shared_ptr engine_; }; TestScheduler* g_scheduler; @@ -56,8 +57,8 @@ TEST(LockFreeEventTest, BasicTest) { event.InitEvent(); grpc_core::MutexLock lock(&mu); // Set NotifyOn first and then SetReady - event.NotifyOn( - IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { + event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( + [&mu, &cv](const absl::Status& status) { grpc_core::MutexLock lock(&mu); EXPECT_TRUE(status.ok()); cv.Signal(); @@ -67,8 +68,8 @@ TEST(LockFreeEventTest, BasicTest) { // SetReady first first and then call NotifyOn event.SetReady(); - event.NotifyOn( - IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { + event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( + [&mu, &cv](const absl::Status& status) { grpc_core::MutexLock lock(&mu); EXPECT_TRUE(status.ok()); cv.Signal(); @@ -76,8 +77,8 @@ TEST(LockFreeEventTest, BasicTest) { EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10))); // Set NotifyOn and then call SetShutdown - event.NotifyOn( - IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { + event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( + [&mu, &cv](const absl::Status& status) { grpc_core::MutexLock lock(&mu); EXPECT_FALSE(status.ok()); EXPECT_EQ(status, absl::CancelledError("Shutdown")); @@ -111,7 +112,7 @@ TEST(LockFreeEventTest, MultiThreadedTest) { active++; if (thread_id == 0) { event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( - [&mu, &cv, &signalled](absl::Status status) { + [&mu, &cv, &signalled](const absl::Status& status) { grpc_core::MutexLock lock(&mu); EXPECT_TRUE(status.ok()); signalled = true; @@ -145,9 +146,7 @@ TEST(LockFreeEventTest, MultiThreadedTest) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_event_engine::experimental::EventEngine* engine = - grpc_event_engine::experimental::GetDefaultEventEngine(); - EXPECT_NE(engine, nullptr); - g_scheduler = new TestScheduler(engine); + g_scheduler = new TestScheduler( + grpc_event_engine::experimental::GetDefaultEventEngine()); return RUN_ALL_TESTS(); } diff --git a/test/core/event_engine/smoke_test.cc b/test/core/event_engine/smoke_test.cc index a4a99e21a76..3c5d0ce79b4 100644 --- a/test/core/event_engine/smoke_test.cc +++ b/test/core/event_engine/smoke_test.cc @@ -27,13 +27,13 @@ using ::testing::MockFunction; class EventEngineSmokeTest : public testing::Test {}; -TEST_F(EventEngineSmokeTest, SetDefaultEventEngineFactoryLinks) { +TEST_F(EventEngineSmokeTest, SetEventEngineFactoryLinks) { // See https://github.com/grpc/grpc/pull/28707 testing::MockFunction< std::unique_ptr()> factory; EXPECT_CALL(factory, Call()).Times(1); - grpc_event_engine::experimental::SetDefaultEventEngineFactory( + grpc_event_engine::experimental::SetEventEngineFactory( factory.AsStdFunction()); EXPECT_EQ(nullptr, grpc_event_engine::experimental::CreateEventEngine()); } diff --git a/test/core/filters/filter_fuzzer.cc b/test/core/filters/filter_fuzzer.cc index e36b912c92e..2cb5ebe49e5 100644 --- a/test/core/filters/filter_fuzzer.cc +++ b/test/core/filters/filter_fuzzer.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include "absl/memory/memory.h" @@ -36,14 +37,15 @@ bool squelch = true; static void dont_log(gpr_log_func_args* /*args*/) {} -static gpr_timespec g_now; +static grpc_core::Mutex g_now_mu; +static gpr_timespec g_now ABSL_GUARDED_BY(g_now_mu); extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); static gpr_timespec now_impl(gpr_clock_type clock_type) { GPR_ASSERT(clock_type != GPR_TIMESPAN); - gpr_timespec ts = g_now; - ts.clock_type = clock_type; - return ts; + grpc_core::MutexLock lock(&g_now_mu); + g_now.clock_type = clock_type; + return g_now; } namespace grpc_core { @@ -280,11 +282,13 @@ class MainLoop { switch (action.type_case()) { case filter_fuzzer::Action::TYPE_NOT_SET: break; - case filter_fuzzer::Action::kAdvanceTimeMicroseconds: + case filter_fuzzer::Action::kAdvanceTimeMicroseconds: { + MutexLock lock(&g_now_mu); g_now = gpr_time_add( g_now, gpr_time_from_micros(action.advance_time_microseconds(), GPR_TIMESPAN)); break; + } case filter_fuzzer::Action::kCancel: calls_.erase(action.call()); break; @@ -588,8 +592,11 @@ DEFINE_PROTO_FUZZER(const filter_fuzzer::Msg& msg) { char* grpc_trace_fuzzer = gpr_getenv("GRPC_TRACE_FUZZER"); if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log); gpr_free(grpc_trace_fuzzer); - g_now = {1, 0, GPR_CLOCK_MONOTONIC}; - grpc_core::TestOnlySetProcessEpoch(g_now); + { + grpc_core::MutexLock lock(&g_now_mu); + g_now = {1, 0, GPR_CLOCK_MONOTONIC}; + grpc_core::TestOnlySetProcessEpoch(g_now); + } gpr_now_impl = now_impl; grpc_init(); grpc_timer_manager_set_threading(false); diff --git a/test/core/promise/sleep_test.cc b/test/core/promise/sleep_test.cc index 3e97353ce3e..ff475e9d83b 100644 --- a/test/core/promise/sleep_test.cc +++ b/test/core/promise/sleep_test.cc @@ -25,6 +25,7 @@ #include +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" #include "src/core/lib/promise/race.h" @@ -37,12 +38,15 @@ TEST(Sleep, Zzzz) { ExecCtx exec_ctx; absl::Notification done; Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1); + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); // Sleep for one second then set done to true. - auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(), - [&done](absl::Status r) { - EXPECT_EQ(r, absl::OkStatus()); - done.Notify(); - }); + auto activity = MakeActivity( + Sleep(done_time), InlineWakeupScheduler(), + [&done](absl::Status r) { + EXPECT_EQ(r, absl::OkStatus()); + done.Notify(); + }, + engine.get()); done.WaitForNotification(); exec_ctx.InvalidateNow(); EXPECT_GE(ExecCtx::Get()->Now(), done_time); @@ -52,12 +56,15 @@ TEST(Sleep, AlreadyDone) { ExecCtx exec_ctx; absl::Notification done; Timestamp done_time = ExecCtx::Get()->Now() - Duration::Seconds(1); + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); // Sleep for no time at all then set done to true. - auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(), - [&done](absl::Status r) { - EXPECT_EQ(r, absl::OkStatus()); - done.Notify(); - }); + auto activity = MakeActivity( + Sleep(done_time), InlineWakeupScheduler(), + [&done](absl::Status r) { + EXPECT_EQ(r, absl::OkStatus()); + done.Notify(); + }, + engine.get()); done.WaitForNotification(); } @@ -65,13 +72,16 @@ TEST(Sleep, Cancel) { ExecCtx exec_ctx; absl::Notification done; Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1); + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); // Sleep for one second but race it to complete immediately auto activity = MakeActivity( Race(Sleep(done_time), [] { return absl::CancelledError(); }), - InlineWakeupScheduler(), [&done](absl::Status r) { + InlineWakeupScheduler(), + [&done](absl::Status r) { EXPECT_EQ(r, absl::CancelledError()); done.Notify(); - }); + }, + engine.get()); done.WaitForNotification(); exec_ctx.InvalidateNow(); EXPECT_LT(ExecCtx::Get()->Now(), done_time); @@ -84,11 +94,14 @@ TEST(Sleep, MoveSemantics) { Timestamp done_time = ExecCtx::Get()->Now() + Duration::Milliseconds(111); Sleep donor(done_time); Sleep sleeper = std::move(donor); - auto activity = MakeActivity(std::move(sleeper), InlineWakeupScheduler(), - [&done](absl::Status r) { - EXPECT_EQ(r, absl::OkStatus()); - done.Notify(); - }); + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); + auto activity = MakeActivity( + std::move(sleeper), InlineWakeupScheduler(), + [&done](absl::Status r) { + EXPECT_EQ(r, absl::OkStatus()); + done.Notify(); + }, + engine.get()); done.WaitForNotification(); exec_ctx.InvalidateNow(); EXPECT_GE(ExecCtx::Get()->Now(), done_time); @@ -100,12 +113,14 @@ TEST(Sleep, StressTest) { ExecCtx exec_ctx; std::vector> notifications; std::vector activities; + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); gpr_log(GPR_INFO, "Starting %d sleeps for 1sec", kNumActivities); for (int i = 0; i < kNumActivities; i++) { auto notification = std::make_shared(); auto activity = MakeActivity( Sleep(exec_ctx.Now() + Duration::Seconds(1)), ExecCtxWakeupScheduler(), - [notification](absl::Status /*r*/) { notification->Notify(); }); + [notification](absl::Status /*r*/) { notification->Notify(); }, + engine.get()); notifications.push_back(std::move(notification)); activities.push_back(std::move(activity)); } diff --git a/test/core/transport/bdp_estimator_test.cc b/test/core/transport/bdp_estimator_test.cc index 81ba915dff6..496ae3678c6 100644 --- a/test/core/transport/bdp_estimator_test.cc +++ b/test/core/transport/bdp_estimator_test.cc @@ -38,22 +38,17 @@ extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); namespace grpc_core { namespace testing { namespace { -int g_clock = 123; -Mutex mu_; +std::atomic g_clock{123}; gpr_timespec fake_gpr_now(gpr_clock_type clock_type) { - MutexLock lock(&mu_); gpr_timespec ts; - ts.tv_sec = g_clock; + ts.tv_sec = g_clock.load(); ts.tv_nsec = 0; ts.clock_type = clock_type; return ts; } -void inc_time(void) { - MutexLock lock(&mu_); - g_clock += 30; -} +void inc_time(void) { g_clock.fetch_add(30); } } // namespace TEST(BdpEstimatorTest, NoOp) { BdpEstimator est("test"); } diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index da1930a52f2..bcda1a666f9 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2437,6 +2437,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "default_engine_methods_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,