Revert "Reland: Make GetDefaultEventEngine return a shared_ptr (#30563)" (#30573)

This reverts commit ee7c0a8e4c.
pull/30320/merge
AJ Heller 2 years ago committed by GitHub
parent 009dadbb74
commit d025c1732f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      BUILD
  2. 36
      CMakeLists.txt
  3. 9
      build_autogenerated.yaml
  4. 18
      include/grpc/event_engine/event_engine.h
  5. 38
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  6. 15
      src/core/ext/filters/channel_idle/channel_idle_filter.h
  7. 9
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  8. 15
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  9. 8
      src/core/ext/filters/client_channel/subchannel.cc
  10. 2
      src/core/ext/filters/client_channel/subchannel.h
  11. 16
      src/core/ext/xds/xds_client.cc
  12. 7
      src/core/ext/xds/xds_client.h
  13. 4
      src/core/lib/channel/promise_based_filter.cc
  14. 13
      src/core/lib/channel/promise_based_filter.h
  15. 39
      src/core/lib/event_engine/default_event_engine.cc
  16. 11
      src/core/lib/event_engine/default_event_engine.h
  17. 9
      src/core/lib/iomgr/resolve_address_posix.cc
  18. 4
      src/core/lib/iomgr/resolve_address_posix.h
  19. 9
      src/core/lib/iomgr/resolve_address_windows.cc
  20. 4
      src/core/lib/iomgr/resolve_address_windows.h
  21. 4
      src/core/lib/promise/activity.h
  22. 17
      src/core/lib/promise/sleep.cc
  23. 3
      src/core/lib/promise/sleep.h
  24. 12
      src/cpp/server/orca/orca_service.cc
  25. 4
      src/php/bin/run_tests.sh
  26. 13
      src/php/tests/MemoryLeakTest/ignore_leaks.supp
  27. 1
      test/core/channel/channel_args_test.cc
  28. 12
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  29. 37
      test/core/end2end/fuzzers/api_fuzzer.cc
  30. 9
      test/core/end2end/goaway_server_test.cc
  31. 13
      test/core/event_engine/BUILD
  32. 129
      test/core/event_engine/default_engine_methods_test.cc
  33. 79
      test/core/event_engine/factory_test.cc
  34. 1
      test/core/event_engine/fuzzing_event_engine/BUILD
  35. 40
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  36. 8
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  37. 27
      test/core/event_engine/posix/lock_free_event_test.cc
  38. 4
      test/core/event_engine/smoke_test.cc
  39. 17
      test/core/filters/filter_fuzzer.cc
  40. 33
      test/core/promise/sleep_test.cc
  41. 11
      test/core/transport/bdp_estimator_test.cc
  42. 24
      tools/run_tests/generated/tests.json

@ -1268,7 +1268,6 @@ grpc_cc_library(
external_deps = ["absl/status"], external_deps = ["absl/status"],
deps = [ deps = [
"activity", "activity",
"context",
"default_event_engine", "default_event_engine",
"event_engine_base_hdrs", "event_engine_base_hdrs",
"exec_ctx", "exec_ctx",
@ -2767,13 +2766,9 @@ grpc_cc_library(
], ],
external_deps = ["absl/functional:any_invocable"], external_deps = ["absl/functional:any_invocable"],
deps = [ deps = [
"context",
"default_event_engine_factory", "default_event_engine_factory",
"event_engine_base_hdrs", "event_engine_base_hdrs",
"event_engine_trace",
"gpr", "gpr",
"grpc_trace",
"no_destruct",
], ],
) )
@ -3717,7 +3712,6 @@ grpc_cc_library(
"closure", "closure",
"config", "config",
"debug_location", "debug_location",
"default_event_engine",
"exec_ctx", "exec_ctx",
"exec_ctx_wakeup_scheduler", "exec_ctx_wakeup_scheduler",
"gpr", "gpr",

36
CMakeLists.txt generated

@ -921,7 +921,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx crl_ssl_transport_security_test) add_dependencies(buildtests_cxx crl_ssl_transport_security_test)
endif() endif()
add_dependencies(buildtests_cxx default_engine_methods_test)
add_dependencies(buildtests_cxx delegating_channel_test) add_dependencies(buildtests_cxx delegating_channel_test)
add_dependencies(buildtests_cxx destroy_grpclb_channel_with_active_connect_stress_test) add_dependencies(buildtests_cxx destroy_grpclb_channel_with_active_connect_stress_test)
add_dependencies(buildtests_cxx dns_resolver_cooldown_test) add_dependencies(buildtests_cxx dns_resolver_cooldown_test)
@ -8693,41 +8692,6 @@ endif()
endif() endif()
if(gRPC_BUILD_TESTS) if(gRPC_BUILD_TESTS)
add_executable(default_engine_methods_test
test/core/event_engine/default_engine_methods_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(default_engine_methods_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(default_engine_methods_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(delegating_channel_test add_executable(delegating_channel_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc

@ -5378,15 +5378,6 @@ targets:
- linux - linux
- posix - posix
- mac - mac
- name: default_engine_methods_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/event_engine/default_engine_methods_test.cc
deps:
- grpc_test_util
- name: delegating_channel_test - name: delegating_channel_test
gtest: true gtest: true
build: test build: test

@ -72,7 +72,7 @@ namespace experimental {
/// server->Wait(); /// server->Wait();
/// ///
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
class EventEngine : public std::enable_shared_from_this<EventEngine> { class EventEngine {
public: public:
/// A duration between two events. /// A duration between two events.
/// ///
@ -427,24 +427,16 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// Replace gRPC's default EventEngine factory. /// Replace gRPC's default EventEngine factory.
/// ///
/// Applications may call \a SetEventEngineFactory time to replace the default /// Applications may call \a SetDefaultEventEngineFactory at any time to replace
/// factory used within gRPC. EventEngines will be created when necessary, when /// the default factory used within gRPC. EventEngines will be created when
/// they are otherwise not provided by the application. /// necessary, when they are otherwise not provided by the application.
/// ///
/// To be certain that none of the gRPC-provided built-in EventEngines are /// To be certain that none of the gRPC-provided built-in EventEngines are
/// created, applications must set a custom EventEngine factory method *before* /// created, applications must set a custom EventEngine factory method *before*
/// grpc is initialized. /// grpc is initialized.
void SetEventEngineFactory( void SetDefaultEventEngineFactory(
absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory); absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory);
/// Revert to using gRPC's default EventEngine factory.
///
/// Applications that have called \a SetEventEngineFactory can unregister their
/// custom factory, reverting to use gRPC's built-in default EventEngines. This
/// has no effect on any EventEngines that were already created using the custom
/// factory.
void RevertToDefaultEventEngineFactory();
/// Create an EventEngine using the default factory. /// Create an EventEngine using the default factory.
std::unique_ptr<EventEngine> CreateEventEngine(); std::unique_ptr<EventEngine> CreateEventEngine();

@ -26,7 +26,6 @@
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
@ -35,7 +34,6 @@
#include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
@ -54,9 +52,6 @@
namespace grpc_core { namespace grpc_core {
namespace { namespace {
using ::grpc_event_engine::experimental::EventEngine;
// TODO(ctiller): The idle filter was disabled in client channel by default // TODO(ctiller): The idle filter was disabled in client channel by default
// due to b/143502997. Now the bug is fixed enable the filter by default. // due to b/143502997. Now the bug is fixed enable the filter by default.
const auto kDefaultIdleTimeout = Duration::Infinity(); const auto kDefaultIdleTimeout = Duration::Infinity();
@ -124,19 +119,15 @@ struct MaxAgeFilter::Config {
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create( absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) { const ChannelArgs& args, ChannelFilter::Args filter_args) {
// TODO(hork): pull EventEngine from args ClientIdleFilter filter(filter_args.channel_stack(),
ClientIdleFilter filter( GetClientIdleTimeout(args));
filter_args.channel_stack(), GetClientIdleTimeout(args),
grpc_event_engine::experimental::GetDefaultEventEngine());
return absl::StatusOr<ClientIdleFilter>(std::move(filter)); return absl::StatusOr<ClientIdleFilter>(std::move(filter));
} }
absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create( absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) { const ChannelArgs& args, ChannelFilter::Args filter_args) {
// TODO(hork): pull EventEngine from args
MaxAgeFilter filter(filter_args.channel_stack(), MaxAgeFilter filter(filter_args.channel_stack(),
Config::FromChannelArgs(args), Config::FromChannelArgs(args));
grpc_event_engine::experimental::GetDefaultEventEngine());
return absl::StatusOr<MaxAgeFilter>(std::move(filter)); return absl::StatusOr<MaxAgeFilter>(std::move(filter));
} }
@ -203,14 +194,12 @@ void MaxAgeFilter::PostInit() {
[this] { [this] {
return Sleep(ExecCtx::Get()->Now() + max_connection_age_grace_); return Sleep(ExecCtx::Get()->Now() + max_connection_age_grace_);
}), }),
ExecCtxWakeupScheduler(), ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) {
[channel_stack, this](absl::Status status) {
// OnDone -- close the connection if the promise completed // OnDone -- close the connection if the promise completed
// successfully. // successfully.
// (if it did not, it was cancelled) // (if it did not, it was cancelled)
if (status.ok()) CloseChannel(); if (status.ok()) CloseChannel();
}, }));
engine_.get()));
} }
} }
@ -266,12 +255,10 @@ void ChannelIdleFilter::StartIdleTimer() {
} }
}); });
}); });
activity_.Set(MakeActivity( activity_.Set(MakeActivity(std::move(promise), ExecCtxWakeupScheduler{},
std::move(promise), ExecCtxWakeupScheduler{},
[channel_stack, this](absl::Status status) { [channel_stack, this](absl::Status status) {
if (status.ok()) CloseChannel(); if (status.ok()) CloseChannel();
}, }));
engine_.get()));
} }
void ChannelIdleFilter::CloseChannel() { void ChannelIdleFilter::CloseChannel() {
@ -313,13 +300,10 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
}); });
} }
MaxAgeFilter::MaxAgeFilter( MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack,
grpc_channel_stack* channel_stack, const Config& max_age_config, const Config& max_age_config)
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine) : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle),
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle,
engine),
max_connection_age_(max_age_config.max_connection_age), max_connection_age_(max_age_config.max_connection_age),
max_connection_age_grace_(max_age_config.max_connection_age_grace), max_connection_age_grace_(max_age_config.max_connection_age_grace) {}
engine_(engine) {}
} // namespace grpc_core } // namespace grpc_core

@ -22,7 +22,6 @@
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h> #include <grpc/impl/codegen/connectivity_state.h>
#include "src/core/ext/filters/channel_idle/idle_filter_state.h" #include "src/core/ext/filters/channel_idle/idle_filter_state.h"
@ -59,12 +58,10 @@ class ChannelIdleFilter : public ChannelFilter {
using SingleSetActivityPtr = using SingleSetActivityPtr =
SingleSetPtr<Activity, typename ActivityPtr::deleter_type>; SingleSetPtr<Activity, typename ActivityPtr::deleter_type>;
ChannelIdleFilter( ChannelIdleFilter(grpc_channel_stack* channel_stack,
grpc_channel_stack* channel_stack, Duration client_idle_timeout, Duration client_idle_timeout)
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine)
: channel_stack_(channel_stack), : channel_stack_(channel_stack),
client_idle_timeout_(client_idle_timeout), client_idle_timeout_(client_idle_timeout) {}
engine_(engine) {}
grpc_channel_stack* channel_stack() { return channel_stack_; }; grpc_channel_stack* channel_stack() { return channel_stack_; };
@ -90,7 +87,6 @@ class ChannelIdleFilter : public ChannelFilter {
std::make_shared<IdleFilterState>(false)}; std::make_shared<IdleFilterState>(false)};
SingleSetActivityPtr activity_; SingleSetActivityPtr activity_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
class ClientIdleFilter final : public ChannelIdleFilter { class ClientIdleFilter final : public ChannelIdleFilter {
@ -131,16 +127,13 @@ class MaxAgeFilter final : public ChannelIdleFilter {
MaxAgeFilter* filter_; MaxAgeFilter* filter_;
}; };
MaxAgeFilter( MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config);
grpc_channel_stack* channel_stack, const Config& max_age_config,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine);
void Shutdown() override; void Shutdown() override;
SingleSetActivityPtr max_age_activity_; SingleSetActivityPtr max_age_activity_;
Duration max_connection_age_; Duration max_connection_age_;
Duration max_connection_age_grace_; Duration max_connection_age_grace_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -158,6 +158,7 @@ const char kGrpcLbAddressAttributeKey[] = "grpclb";
namespace { namespace {
using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
constexpr absl::string_view kGrpclb = "grpclb"; constexpr absl::string_view kGrpclb = "grpclb";
@ -265,7 +266,6 @@ class GrpcLb : public LoadBalancingPolicy {
bool client_load_report_is_due_ = false; bool client_load_report_is_due_ = false;
// The closure used for the completion of sending the load report. // The closure used for the completion of sending the load report.
grpc_closure client_load_report_done_closure_; grpc_closure client_load_report_done_closure_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
class SubchannelWrapper : public DelegatingSubchannel { class SubchannelWrapper : public DelegatingSubchannel {
@ -849,8 +849,7 @@ GrpcLb::BalancerCallState::BalancerCallState(
: InternallyRefCounted<BalancerCallState>( : InternallyRefCounted<BalancerCallState>(
GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState" GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState"
: nullptr), : nullptr),
grpclb_policy_(std::move(parent_grpclb_policy)), grpclb_policy_(std::move(parent_grpclb_policy)) {
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
GPR_ASSERT(grpclb_policy_ != nullptr); GPR_ASSERT(grpclb_policy_ != nullptr);
GPR_ASSERT(!grpclb_policy()->shutting_down_); GPR_ASSERT(!grpclb_policy()->shutting_down_);
// Init the LB call. Note that the LB call will progress every time there's // Init the LB call. Note that the LB call will progress every time there's
@ -908,7 +907,7 @@ void GrpcLb::BalancerCallState::Orphan() {
// call, then the following cancellation will be a no-op. // call, then the following cancellation will be a no-op.
grpc_call_cancel_internal(lb_call_); grpc_call_cancel_internal(lb_call_);
if (client_load_report_handle_.has_value() && if (client_load_report_handle_.has_value() &&
engine_->Cancel(client_load_report_handle_.value())) { GetDefaultEventEngine()->Cancel(client_load_report_handle_.value())) {
Unref(DEBUG_LOCATION, "client_load_report cancelled"); Unref(DEBUG_LOCATION, "client_load_report cancelled");
} }
// Note that the initial ref is hold by lb_on_balancer_status_received_ // Note that the initial ref is hold by lb_on_balancer_status_received_
@ -994,7 +993,7 @@ void GrpcLb::BalancerCallState::StartQuery() {
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
client_load_report_handle_ = client_load_report_handle_ =
engine_->RunAfter(client_stats_report_interval_, [this] { GetDefaultEventEngine()->RunAfter(client_stats_report_interval_, [this] {
ApplicationCallbackExecCtx callback_exec_ctx; ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx; ExecCtx exec_ctx;
MaybeSendClientLoadReport(); MaybeSendClientLoadReport();

@ -68,6 +68,7 @@ TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb");
namespace { namespace {
using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
constexpr absl::string_view kWeightedTarget = "weighted_target_experimental"; constexpr absl::string_view kWeightedTarget = "weighted_target_experimental";
@ -196,7 +197,6 @@ class WeightedTargetLb : public LoadBalancingPolicy {
RefCountedPtr<WeightedChild> weighted_child_; RefCountedPtr<WeightedChild> weighted_child_;
absl::optional<EventEngine::TaskHandle> timer_handle_; absl::optional<EventEngine::TaskHandle> timer_handle_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
// Methods for dealing with the child policy. // Methods for dealing with the child policy.
@ -453,10 +453,9 @@ void WeightedTargetLb::UpdateStateLocked() {
WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer( WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child) RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
: weighted_child_(std::move(weighted_child)), : weighted_child_(std::move(weighted_child)) {
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { timer_handle_ = GetDefaultEventEngine()->RunAfter(
timer_handle_ = kChildRetentionInterval, [self = Ref()]() mutable {
engine_->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable {
self->weighted_child_->weighted_target_policy_->work_serializer()->Run( self->weighted_child_->weighted_target_policy_->work_serializer()->Run(
[self = std::move(self)] { self->OnTimerLocked(); }, [self = std::move(self)] { self->OnTimerLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
@ -472,7 +471,7 @@ void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
weighted_child_->weighted_target_policy_.get(), weighted_child_->weighted_target_policy_.get(),
weighted_child_.get(), weighted_child_->name_.c_str()); weighted_child_.get(), weighted_child_->name_.c_str());
} }
engine_->Cancel(*timer_handle_); GetDefaultEventEngine()->Cancel(*timer_handle_);
} }
Unref(); Unref();
} }
@ -682,7 +681,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override { LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<WeightedTargetLb>(std::move(args)); return MakeOrphanable<WeightedTargetLb>(std::move(args));
} // namespace }
absl::string_view name() const override { return kWeightedTarget; } absl::string_view name() const override { return kWeightedTarget; }
@ -766,7 +765,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
} }
return child_config; return child_config;
} }
}; // namespace grpc_core };
} // namespace } // namespace

@ -83,6 +83,7 @@
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
namespace grpc_core { namespace grpc_core {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
TraceFlag grpc_trace_subchannel(false, "subchannel"); TraceFlag grpc_trace_subchannel(false, "subchannel");
DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
@ -630,8 +631,7 @@ Subchannel::Subchannel(SubchannelKey key,
args_(args), args_(args),
pollset_set_(grpc_pollset_set_create()), pollset_set_(grpc_pollset_set_create()),
connector_(std::move(connector)), connector_(std::move(connector)),
backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)), backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)) {
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
// A grpc_init is added here to ensure that grpc_shutdown does not happen // A grpc_init is added here to ensure that grpc_shutdown does not happen
// until the subchannel is destroyed. Subchannels can persist longer than // until the subchannel is destroyed. Subchannels can persist longer than
// channels because they maybe reused/shared among multiple channels. As a // channels because they maybe reused/shared among multiple channels. As a
@ -766,7 +766,7 @@ void Subchannel::ResetBackoff() {
MutexLock lock(&mu_); MutexLock lock(&mu_);
backoff_.Reset(); backoff_.Reset();
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
engine_->Cancel(retry_timer_handle_)) { GetDefaultEventEngine()->Cancel(retry_timer_handle_)) {
OnRetryTimerLocked(); OnRetryTimerLocked();
} else if (state_ == GRPC_CHANNEL_CONNECTING) { } else if (state_ == GRPC_CHANNEL_CONNECTING) {
next_attempt_time_ = ExecCtx::Get()->Now(); next_attempt_time_ = ExecCtx::Get()->Now();
@ -909,7 +909,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
time_until_next_attempt.millis()); time_until_next_attempt.millis());
SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error)); grpc_error_to_absl_status(error));
retry_timer_handle_ = engine_->RunAfter( retry_timer_handle_ = GetDefaultEventEngine()->RunAfter(
time_until_next_attempt, time_until_next_attempt,
[self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
{ {

@ -23,7 +23,6 @@
#include <deque> #include <deque>
#include <map> #include <map>
#include <memory>
#include <string> #include <string>
#include "absl/base/thread_annotations.h" #include "absl/base/thread_annotations.h"
@ -420,7 +419,6 @@ class Subchannel : public DualRefCounted<Subchannel> {
// Data producer map. // Data producer map.
std::map<UniqueTypeName, DataProducerInterface*> data_producer_map_ std::map<UniqueTypeName, DataProducerInterface*> data_producer_map_
ABSL_GUARDED_BY(mu_); ABSL_GUARDED_BY(mu_);
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -57,6 +57,7 @@
namespace grpc_core { namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
TraceFlag grpc_xds_client_trace(false, "xds_client"); TraceFlag grpc_xds_client_trace(false, "xds_client");
TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount"); TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
@ -190,7 +191,7 @@ class XdsClient::ChannelState::AdsCallState
if (state.resource != nullptr) return; if (state.resource != nullptr) return;
// Start timer. // Start timer.
ads_calld_ = std::move(ads_calld); ads_calld_ = std::move(ads_calld);
timer_handle_ = ads_calld_->xds_client()->engine()->RunAfter( timer_handle_ = GetDefaultEventEngine()->RunAfter(
ads_calld_->xds_client()->request_timeout_, ads_calld_->xds_client()->request_timeout_,
[self = Ref(DEBUG_LOCATION, "timer")]() { [self = Ref(DEBUG_LOCATION, "timer")]() {
ApplicationCallbackExecCtx callback_exec_ctx; ApplicationCallbackExecCtx callback_exec_ctx;
@ -213,7 +214,7 @@ class XdsClient::ChannelState::AdsCallState
// TODO(roth): Find a way to write a test for this case. // TODO(roth): Find a way to write a test for this case.
timer_start_needed_ = false; timer_start_needed_ = false;
if (timer_handle_.has_value()) { if (timer_handle_.has_value()) {
ads_calld_->xds_client()->engine()->Cancel(*timer_handle_); GetDefaultEventEngine()->Cancel(*timer_handle_);
timer_handle_.reset(); timer_handle_.reset();
} }
} }
@ -564,7 +565,7 @@ void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
shutting_down_ = true; shutting_down_ = true;
calld_.reset(); calld_.reset();
if (timer_handle_.has_value()) { if (timer_handle_.has_value()) {
chand()->xds_client()->engine()->Cancel(*timer_handle_); GetDefaultEventEngine()->Cancel(*timer_handle_);
timer_handle_.reset(); timer_handle_.reset();
} }
this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned"); this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
@ -607,7 +608,7 @@ void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
chand()->xds_client(), chand()->server_.server_uri.c_str(), chand()->xds_client(), chand()->server_.server_uri.c_str(),
timeout.millis()); timeout.millis());
} }
timer_handle_ = chand()->xds_client()->engine()->RunAfter( timer_handle_ = GetDefaultEventEngine()->RunAfter(
timeout, timeout,
[self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() { [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
ApplicationCallbackExecCtx callback_exec_ctx; ApplicationCallbackExecCtx callback_exec_ctx;
@ -1105,7 +1106,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
if (timer_handle_.has_value() && if (timer_handle_.has_value() &&
xds_client()->engine()->Cancel(*timer_handle_)) { GetDefaultEventEngine()->Cancel(*timer_handle_)) {
timer_handle_.reset(); timer_handle_.reset();
Unref(DEBUG_LOCATION, "Orphan"); Unref(DEBUG_LOCATION, "Orphan");
} }
@ -1113,7 +1114,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
void XdsClient::ChannelState::LrsCallState::Reporter:: void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked() { ScheduleNextReportLocked() {
timer_handle_ = xds_client()->engine()->RunAfter(report_interval_, [this]() { timer_handle_ = GetDefaultEventEngine()->RunAfter(report_interval_, [this]() {
ApplicationCallbackExecCtx callback_exec_ctx; ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx; ExecCtx exec_ctx;
if (OnNextReportTimer()) { if (OnNextReportTimer()) {
@ -1384,8 +1385,7 @@ XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
transport_factory_(std::move(transport_factory)), transport_factory_(std::move(transport_factory)),
request_timeout_(resource_request_timeout), request_timeout_(resource_request_timeout),
xds_federation_enabled_(XdsFederationEnabled()), xds_federation_enabled_(XdsFederationEnabled()),
api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_), api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_) {
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
} }

@ -32,8 +32,6 @@
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "upb/def.hpp" #include "upb/def.hpp"
#include <grpc/event_engine/event_engine.h>
#include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
@ -148,10 +146,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
// implementation. // implementation.
std::string DumpClientConfigBinary(); std::string DumpClientConfigBinary();
grpc_event_engine::experimental::EventEngine* engine() {
return engine_.get();
}
private: private:
struct XdsResourceKey { struct XdsResourceKey {
std::string id; std::string id;
@ -307,7 +301,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
const bool xds_federation_enabled_; const bool xds_federation_enabled_;
XdsApi api_; XdsApi api_;
WorkSerializer work_serializer_; WorkSerializer work_serializer_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
Mutex mu_; Mutex mu_;

@ -27,7 +27,6 @@
#include <grpc/status.h> #include <grpc/status.h>
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice.h"
@ -45,8 +44,7 @@ BaseCallData::BaseCallData(grpc_call_element* elem,
arena_(args->arena), arena_(args->arena),
call_combiner_(args->call_combiner), call_combiner_(args->call_combiner),
deadline_(args->deadline), deadline_(args->deadline),
context_(args->context), context_(args->context) {
event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
if (flags & kFilterExaminesServerInitialMetadata) { if (flags & kFilterExaminesServerInitialMetadata) {
server_initial_metadata_latch_ = arena_->New<Latch<ServerMetadata*>>(); server_initial_metadata_latch_ = arena_->New<Latch<ServerMetadata*>>();
} }

@ -25,14 +25,12 @@
#include <stdlib.h> #include <stdlib.h>
#include <atomic> #include <atomic>
#include <memory>
#include <new> #include <new>
#include <utility> #include <utility>
#include "absl/container/inlined_vector.h" #include "absl/container/inlined_vector.h"
#include "absl/meta/type_traits.h" #include "absl/meta/type_traits.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
@ -41,7 +39,6 @@
#include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h" #include "src/core/lib/channel/context.h"
#include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/call_combiner.h"
@ -154,9 +151,7 @@ class BaseCallData : public Activity, private Wakeable {
: public promise_detail::Context<Arena>, : public promise_detail::Context<Arena>,
public promise_detail::Context<grpc_call_context_element>, public promise_detail::Context<grpc_call_context_element>,
public promise_detail::Context<grpc_polling_entity>, public promise_detail::Context<grpc_polling_entity>,
public promise_detail::Context<CallFinalization>, public promise_detail::Context<CallFinalization> {
public promise_detail::Context<
grpc_event_engine::experimental::EventEngine> {
public: public:
explicit ScopedContext(BaseCallData* call_data) explicit ScopedContext(BaseCallData* call_data)
: promise_detail::Context<Arena>(call_data->arena_), : promise_detail::Context<Arena>(call_data->arena_),
@ -164,9 +159,8 @@ class BaseCallData : public Activity, private Wakeable {
call_data->context_), call_data->context_),
promise_detail::Context<grpc_polling_entity>( promise_detail::Context<grpc_polling_entity>(
call_data->pollent_.load(std::memory_order_acquire)), call_data->pollent_.load(std::memory_order_acquire)),
promise_detail::Context<CallFinalization>(&call_data->finalization_), promise_detail::Context<CallFinalization>(&call_data->finalization_) {
promise_detail::Context<grpc_event_engine::experimental::EventEngine>( }
call_data->event_engine_.get()) {}
}; };
class Flusher { class Flusher {
@ -273,7 +267,6 @@ class BaseCallData : public Activity, private Wakeable {
grpc_call_context_element* const context_; grpc_call_context_element* const context_;
std::atomic<grpc_polling_entity*> pollent_{nullptr}; std::atomic<grpc_polling_entity*> pollent_{nullptr};
Latch<ServerMetadata*>* server_initial_metadata_latch_ = nullptr; Latch<ServerMetadata*>* server_initial_metadata_latch_ = nullptr;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
}; };
class ClientCallData : public BaseCallData { class ClientCallData : public BaseCallData {

@ -23,11 +23,7 @@
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine_factory.h" #include "src/core/lib/event_engine/default_event_engine_factory.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine { namespace grpc_event_engine {
namespace experimental { namespace experimental {
@ -35,22 +31,14 @@ namespace experimental {
namespace { namespace {
std::atomic<absl::AnyInvocable<std::unique_ptr<EventEngine>()>*> std::atomic<absl::AnyInvocable<std::unique_ptr<EventEngine>()>*>
g_event_engine_factory{nullptr}; g_event_engine_factory{nullptr};
grpc_core::NoDestruct<grpc_core::Mutex> g_mu; std::atomic<EventEngine*> g_event_engine{nullptr};
grpc_core::NoDestruct<std::weak_ptr<EventEngine>> g_event_engine;
} // namespace } // namespace
void SetEventEngineFactory( void SetDefaultEventEngineFactory(
absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory) { absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory) {
delete g_event_engine_factory.exchange( delete g_event_engine_factory.exchange(
new absl::AnyInvocable<std::unique_ptr<EventEngine>()>( new absl::AnyInvocable<std::unique_ptr<EventEngine>()>(
std::move(factory))); std::move(factory)));
// Forget any previous EventEngines
grpc_core::MutexLock lock(&*g_mu);
g_event_engine->reset();
}
void RevertToDefaultEventEngineFactory() {
delete g_event_engine_factory.exchange(nullptr);
} }
std::unique_ptr<EventEngine> CreateEventEngine() { std::unique_ptr<EventEngine> CreateEventEngine() {
@ -60,22 +48,23 @@ std::unique_ptr<EventEngine> CreateEventEngine() {
return DefaultEventEngineFactory(); return DefaultEventEngineFactory();
} }
std::shared_ptr<EventEngine> GetDefaultEventEngine() { EventEngine* GetDefaultEventEngine() {
grpc_core::MutexLock lock(&*g_mu); EventEngine* engine = g_event_engine.load(std::memory_order_acquire);
if (std::shared_ptr<EventEngine> engine = g_event_engine->lock()) { if (engine == nullptr) {
GRPC_EVENT_ENGINE_TRACE("DefaultEventEngine::%p use_count:%ld", auto* created = CreateEventEngine().release();
engine.get(), engine.use_count()); if (g_event_engine.compare_exchange_strong(engine, created,
return engine; std::memory_order_acq_rel,
std::memory_order_acquire)) {
engine = created;
} else {
delete created;
}
} }
std::shared_ptr<EventEngine> engine{CreateEventEngine()};
GRPC_EVENT_ENGINE_TRACE("Created DefaultEventEngine::%p", engine.get());
*g_event_engine = engine;
return engine; return engine;
} }
void ResetDefaultEventEngine() { void ResetDefaultEventEngine() {
grpc_core::MutexLock lock(&*g_mu); delete g_event_engine.exchange(nullptr, std::memory_order_acq_rel);
g_event_engine->reset();
} }
} // namespace experimental } // namespace experimental

@ -17,17 +17,8 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <memory>
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include "src/core/lib/promise/context.h"
namespace grpc_core {
template <>
struct ContextType<grpc_event_engine::experimental::EventEngine> {};
} // namespace grpc_core
namespace grpc_event_engine { namespace grpc_event_engine {
namespace experimental { namespace experimental {
@ -35,7 +26,7 @@ namespace experimental {
/// ///
/// The concept of a global EventEngine may go away in a post-iomgr world. /// The concept of a global EventEngine may go away in a post-iomgr world.
/// Strongly consider whether you could use \a CreateEventEngine instead. /// Strongly consider whether you could use \a CreateEventEngine instead.
std::shared_ptr<EventEngine> GetDefaultEventEngine(); EventEngine* GetDefaultEventEngine();
/// Reset the default event engine /// Reset the default event engine
void ResetDefaultEventEngine(); void ResetDefaultEventEngine();

@ -46,6 +46,8 @@
namespace grpc_core { namespace grpc_core {
namespace { namespace {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
class NativeDNSRequest { class NativeDNSRequest {
public: public:
NativeDNSRequest( NativeDNSRequest(
@ -78,9 +80,6 @@ class NativeDNSRequest {
} // namespace } // namespace
NativeDNSResolver::NativeDNSResolver()
: engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {}
NativeDNSResolver* NativeDNSResolver::GetOrCreate() { NativeDNSResolver* NativeDNSResolver::GetOrCreate() {
static NativeDNSResolver* instance = new NativeDNSResolver(); static NativeDNSResolver* instance = new NativeDNSResolver();
return instance; return instance;
@ -184,7 +183,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupSRV(
absl::string_view /* name */, Duration /* timeout */, absl::string_view /* name */, Duration /* timeout */,
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) { absl::string_view /* name_server */) {
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Native resolver does not support looking up SRV records")); "The Native resolver does not support looking up SRV records"));
}); });
@ -197,7 +196,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupTXT(
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) { absl::string_view /* name_server */) {
// Not supported // Not supported
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Native resolver does not support looking up TXT records")); "The Native resolver does not support looking up TXT records"));
}); });

@ -57,10 +57,6 @@ class NativeDNSResolver : public DNSResolver {
// NativeDNSResolver does not support cancellation. // NativeDNSResolver does not support cancellation.
bool Cancel(TaskHandle handle) override; bool Cancel(TaskHandle handle) override;
private:
NativeDNSResolver();
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -49,6 +49,8 @@
namespace grpc_core { namespace grpc_core {
namespace { namespace {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
class NativeDNSRequest { class NativeDNSRequest {
public: public:
NativeDNSRequest( NativeDNSRequest(
@ -81,9 +83,6 @@ class NativeDNSRequest {
} // namespace } // namespace
NativeDNSResolver::NativeDNSResolver()
: engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {}
NativeDNSResolver* NativeDNSResolver::GetOrCreate() { NativeDNSResolver* NativeDNSResolver::GetOrCreate() {
static NativeDNSResolver* instance = new NativeDNSResolver(); static NativeDNSResolver* instance = new NativeDNSResolver();
return instance; return instance;
@ -168,7 +167,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupSRV(
absl::string_view /* name */, Duration /* deadline */, absl::string_view /* name */, Duration /* deadline */,
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) { absl::string_view /* name_server */) {
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Native resolver does not support looking up SRV records")); "The Native resolver does not support looking up SRV records"));
}); });
@ -181,7 +180,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupTXT(
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) { absl::string_view /* name_server */) {
// Not supported // Not supported
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Native resolver does not support looking up TXT records")); "The Native resolver does not support looking up TXT records"));
}); });

@ -57,10 +57,6 @@ class NativeDNSResolver : public DNSResolver {
// NativeDNSResolver does not support cancellation. // NativeDNSResolver does not support cancellation.
bool Cancel(TaskHandle handle) override; bool Cancel(TaskHandle handle) override;
private:
NativeDNSResolver();
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -473,8 +473,8 @@ class PromiseActivity final : public FreestandingActivity,
// Notification that we're no longer executing - it's ok to destruct the // Notification that we're no longer executing - it's ok to destruct the
// promise. // promise.
void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
GPR_ASSERT(!absl::exchange(done_, true)); GPR_ASSERT(!done_);
ScopedContext contexts(this); done_ = true;
Destruct(&promise_holder_.promise); Destruct(&promise_holder_.promise);
} }

@ -19,17 +19,15 @@
#include <utility> #include <utility>
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep #include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
namespace grpc_core { namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::GetDefaultEventEngine;
Sleep::Sleep(Timestamp deadline) : deadline_(deadline) {} Sleep::Sleep(Timestamp deadline) : deadline_(deadline) {}
@ -52,12 +50,9 @@ Poll<absl::Status> Sleep::operator()() {
} }
Sleep::ActiveClosure::ActiveClosure(Timestamp deadline) Sleep::ActiveClosure::ActiveClosure(Timestamp deadline)
: waker_(Activity::current()->MakeOwningWaker()) { : waker_(Activity::current()->MakeOwningWaker()),
auto engine = GetContext<EventEngine>(); timer_handle_(GetDefaultEventEngine()->RunAfter(
GPR_ASSERT(engine != nullptr && deadline - ExecCtx::Get()->Now(), this)) {}
"An EventEngine context is required for Promise Sleep");
timer_handle_ = engine->RunAfter(deadline - ExecCtx::Get()->Now(), this);
}
void Sleep::ActiveClosure::Run() { void Sleep::ActiveClosure::Run() {
ApplicationCallbackExecCtx callback_exec_ctx; ApplicationCallbackExecCtx callback_exec_ctx;
@ -74,7 +69,7 @@ void Sleep::ActiveClosure::Cancel() {
// If we cancel correctly then we must own both refs still and can simply // If we cancel correctly then we must own both refs still and can simply
// delete without unreffing twice, otherwise try unreffing since this may be // delete without unreffing twice, otherwise try unreffing since this may be
// the last owned ref. // the last owned ref.
if (GetContext<EventEngine>()->Cancel(timer_handle_) || refs_.Unref()) { if (GetDefaultEventEngine()->Cancel(timer_handle_) || refs_.Unref()) {
delete this; delete this;
} }
} }

@ -66,7 +66,8 @@ class Sleep final {
Waker waker_; Waker waker_;
// One ref dropped by Run(), the other by Cancel(). // One ref dropped by Run(), the other by Cancel().
RefCount refs_{2}; RefCount refs_{2};
grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_; const grpc_event_engine::experimental::EventEngine::TaskHandle
timer_handle_;
}; };
Timestamp deadline_; Timestamp deadline_;

@ -17,7 +17,6 @@
#include <stddef.h> #include <stddef.h>
#include <map> #include <map>
#include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
@ -55,6 +54,7 @@ namespace grpc {
namespace experimental { namespace experimental {
using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
// //
// OrcaService::Reactor // OrcaService::Reactor
@ -64,9 +64,7 @@ class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
public grpc_core::RefCounted<Reactor> { public grpc_core::RefCounted<Reactor> {
public: public:
explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer) explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer)
: RefCounted("OrcaService::Reactor"), : RefCounted("OrcaService::Reactor"), service_(service) {
service_(service),
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
// Get slice from request. // Get slice from request.
Slice slice; Slice slice;
GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok()); GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok());
@ -129,7 +127,7 @@ class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc::internal::MutexLock lock(&timer_mu_); grpc::internal::MutexLock lock(&timer_mu_);
if (cancelled_) return false; if (cancelled_) return false;
timer_handle_ = engine_->RunAfter( timer_handle_ = GetDefaultEventEngine()->RunAfter(
report_interval_, report_interval_,
[self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); }); [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
return true; return true;
@ -138,7 +136,8 @@ class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
bool MaybeCancelTimer() { bool MaybeCancelTimer() {
grpc::internal::MutexLock lock(&timer_mu_); grpc::internal::MutexLock lock(&timer_mu_);
cancelled_ = true; cancelled_ = true;
if (timer_handle_.has_value() && engine_->Cancel(*timer_handle_)) { if (timer_handle_.has_value() &&
GetDefaultEventEngine()->Cancel(*timer_handle_)) {
timer_handle_.reset(); timer_handle_.reset();
return true; return true;
} }
@ -162,7 +161,6 @@ class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
grpc_core::Duration report_interval_; grpc_core::Duration report_interval_;
ByteBuffer response_; ByteBuffer response_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
// //

@ -47,12 +47,8 @@ if [ -x "$(command -v valgrind)" ]; then
# TODO(jtattermusch): reenable the test once https://github.com/grpc/grpc/issues/29098 is fixed. # TODO(jtattermusch): reenable the test once https://github.com/grpc/grpc/issues/29098 is fixed.
if [ "$(uname -m)" != "aarch64" ]; then if [ "$(uname -m)" != "aarch64" ]; then
$(which valgrind) --error-exitcode=10 --leak-check=yes \ $(which valgrind) --error-exitcode=10 --leak-check=yes \
-v \
--num-callers=30 \
--suppressions=../tests/MemoryLeakTest/ignore_leaks.supp \
$VALGRIND_UNDEF_VALUE_ERRORS \ $VALGRIND_UNDEF_VALUE_ERRORS \
$(which php) $extension_dir -d max_execution_time=300 \ $(which php) $extension_dir -d max_execution_time=300 \
../tests/MemoryLeakTest/MemoryLeakTest.php ../tests/MemoryLeakTest/MemoryLeakTest.php
fi fi
fi fi

@ -1,13 +0,0 @@
{
static Posix NativeDNSResolver
Memcheck:Leak
match-leak-kinds: possible
...
fun:pthread_create@@GLIBC_2.2.5
...
fun:_ZN17grpc_event_engine12experimental21GetDefaultEventEngineEv
fun:_ZN9grpc_core17NativeDNSResolverC1Ev
fun:_ZN9grpc_core17NativeDNSResolver11GetOrCreateEv
...
}

@ -28,7 +28,6 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"

@ -18,7 +18,6 @@
#include <cstring> #include <cstring>
#include <functional> #include <functional>
#include <memory>
#include <gtest/gtest.h> #include <gtest/gtest.h>
@ -63,12 +62,12 @@ static struct iomgr_args {
namespace { namespace {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
grpc_core::DNSResolver* g_default_dns_resolver; grpc_core::DNSResolver* g_default_dns_resolver;
class TestDNSResolver : public grpc_core::DNSResolver { class TestDNSResolver : public grpc_core::DNSResolver {
public: public:
TestDNSResolver()
: engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {}
// Wrapper around default resolve_address in order to count the number of // Wrapper around default resolve_address in order to count the number of
// times we incur in a system-level name resolution. // times we incur in a system-level name resolution.
TaskHandle LookupHostname( TaskHandle LookupHostname(
@ -114,7 +113,7 @@ class TestDNSResolver : public grpc_core::DNSResolver {
absl::string_view /* name */, grpc_core::Duration /* timeout */, absl::string_view /* name */, grpc_core::Duration /* timeout */,
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) override { absl::string_view /* name_server */) override {
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Testing DNS resolver does not support looking up SRV records")); "The Testing DNS resolver does not support looking up SRV records"));
}); });
@ -127,7 +126,7 @@ class TestDNSResolver : public grpc_core::DNSResolver {
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) override { absl::string_view /* name_server */) override {
// Not supported // Not supported
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Testing DNS resolver does not support looking up TXT records")); "The Testing DNS resolver does not support looking up TXT records"));
}); });
@ -136,9 +135,6 @@ class TestDNSResolver : public grpc_core::DNSResolver {
// Not cancellable // Not cancellable
bool Cancel(TaskHandle /*handle*/) override { return false; } bool Cancel(TaskHandle /*handle*/) override { return false; }
private:
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
} // namespace } // namespace

@ -138,7 +138,6 @@ static void finish_resolve(void* arg, grpc_error_handle error) {
namespace { namespace {
using ::grpc_event_engine::experimental::FuzzingEventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine; using ::grpc_event_engine::experimental::GetDefaultEventEngine;
class FuzzerDNSResolver : public grpc_core::DNSResolver { class FuzzerDNSResolver : public grpc_core::DNSResolver {
@ -179,8 +178,8 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver {
}; };
// Gets the singleton instance, possibly creating it first // Gets the singleton instance, possibly creating it first
static FuzzerDNSResolver* GetOrCreate(FuzzingEventEngine* engine) { static FuzzerDNSResolver* GetOrCreate() {
static FuzzerDNSResolver* instance = new FuzzerDNSResolver(engine); static FuzzerDNSResolver* instance = new FuzzerDNSResolver();
return instance; return instance;
} }
@ -207,7 +206,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver {
absl::string_view /* name */, grpc_core::Duration /* timeout */, absl::string_view /* name */, grpc_core::Duration /* timeout */,
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) override { absl::string_view /* name_server */) override {
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Fuzzing DNS resolver does not support looking up SRV records")); "The Fuzzing DNS resolver does not support looking up SRV records"));
}); });
@ -220,7 +219,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver {
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) override { absl::string_view /* name_server */) override {
// Not supported // Not supported
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Fuzing DNS resolver does not support looking up TXT records")); "The Fuzing DNS resolver does not support looking up TXT records"));
}); });
@ -229,10 +228,6 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver {
// FuzzerDNSResolver does not support cancellation. // FuzzerDNSResolver does not support cancellation.
bool Cancel(TaskHandle /*handle*/) override { return false; } bool Cancel(TaskHandle /*handle*/) override { return false; }
private:
explicit FuzzerDNSResolver(FuzzingEventEngine* engine) : engine_(engine) {}
FuzzingEventEngine* engine_;
}; };
} // namespace } // namespace
@ -823,22 +818,21 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log); if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log);
gpr_free(grpc_trace_fuzzer); gpr_free(grpc_trace_fuzzer);
grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable); grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable);
grpc_event_engine::experimental::SetDefaultEventEngineFactory(
grpc_event_engine::experimental::SetEventEngineFactory(
[actions = msg.event_engine_actions()]() { [actions = msg.event_engine_actions()]() {
return absl::make_unique<FuzzingEventEngine>( return absl::make_unique<
FuzzingEventEngine::Options(), actions); grpc_event_engine::experimental::FuzzingEventEngine>(
grpc_event_engine::experimental::FuzzingEventEngine::Options(),
actions);
}); });
auto engine = grpc_event_engine::experimental::GetDefaultEventEngine();
std::dynamic_pointer_cast<FuzzingEventEngine>(GetDefaultEventEngine());
FuzzingEventEngine::SetGlobalNowImplEngine(engine.get());
grpc_init(); grpc_init();
grpc_timer_manager_set_threading(false); grpc_timer_manager_set_threading(false);
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_core::Executor::SetThreadingAll(false); grpc_core::Executor::SetThreadingAll(false);
} }
grpc_core::SetDNSResolver(FuzzerDNSResolver::GetOrCreate(engine.get())); grpc_core::SetDNSResolver(FuzzerDNSResolver::GetOrCreate());
grpc_dns_lookup_hostname_ares = my_dns_lookup_ares; grpc_dns_lookup_hostname_ares = my_dns_lookup_ares;
grpc_cancel_ares_request = my_cancel_ares_request; grpc_cancel_ares_request = my_cancel_ares_request;
@ -875,10 +869,14 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
while (action_index < msg.actions_size() || g_channel != nullptr || while (action_index < msg.actions_size() || g_channel != nullptr ||
g_server != nullptr || pending_channel_watches > 0 || g_server != nullptr || pending_channel_watches > 0 ||
pending_pings > 0 || ActiveCall() != nullptr) { pending_pings > 0 || ActiveCall() != nullptr) {
engine->Tick(); static_cast<grpc_event_engine::experimental::FuzzingEventEngine*>(
grpc_event_engine::experimental::GetDefaultEventEngine())
->Tick();
if (action_index == msg.actions_size()) { if (action_index == msg.actions_size()) {
engine->FuzzingDone(); static_cast<grpc_event_engine::experimental::FuzzingEventEngine*>(
grpc_event_engine::experimental::GetDefaultEventEngine())
->FuzzingDone();
if (g_channel != nullptr) { if (g_channel != nullptr) {
grpc_channel_destroy(g_channel); grpc_channel_destroy(g_channel);
g_channel = nullptr; g_channel = nullptr;
@ -1225,5 +1223,4 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
grpc_resource_quota_unref(g_resource_quota); grpc_resource_quota_unref(g_resource_quota);
grpc_shutdown_blocking(); grpc_shutdown_blocking();
FuzzingEventEngine::UnsetGlobalNowImplEngine(engine.get());
} }

@ -82,12 +82,12 @@ static void set_resolve_port(int port) {
namespace { namespace {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
grpc_core::DNSResolver* g_default_dns_resolver; grpc_core::DNSResolver* g_default_dns_resolver;
class TestDNSResolver : public grpc_core::DNSResolver { class TestDNSResolver : public grpc_core::DNSResolver {
public: public:
TestDNSResolver()
: engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {}
TaskHandle LookupHostname( TaskHandle LookupHostname(
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)> std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_resolved, on_resolved,
@ -114,7 +114,7 @@ class TestDNSResolver : public grpc_core::DNSResolver {
absl::string_view /* name */, grpc_core::Duration /* timeout */, absl::string_view /* name */, grpc_core::Duration /* timeout */,
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) override { absl::string_view /* name_server */) override {
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Testing DNS resolver does not support looking up SRV records")); "The Testing DNS resolver does not support looking up SRV records"));
}); });
@ -127,7 +127,7 @@ class TestDNSResolver : public grpc_core::DNSResolver {
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) override { absl::string_view /* name_server */) override {
// Not supported // Not supported
engine_->Run([on_resolved] { GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError( on_resolved(absl::UnimplementedError(
"The Testing DNS resolver does not support looking up TXT records")); "The Testing DNS resolver does not support looking up TXT records"));
}); });
@ -159,7 +159,6 @@ class TestDNSResolver : public grpc_core::DNSResolver {
std::move(addrs)); std::move(addrs));
} }
} }
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
} // namespace } // namespace

@ -61,19 +61,6 @@ grpc_cc_test(
], ],
) )
grpc_cc_test(
name = "default_engine_methods_test",
srcs = ["default_engine_methods_test.cc"],
external_deps = ["gtest"],
deps = [
"//:default_event_engine",
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test( grpc_cc_test(
name = "smoke_test", name = "smoke_test",
srcs = ["smoke_test.cc"], srcs = ["smoke_test.cc"],

@ -1,129 +0,0 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/sync.h"
#include "test/core/util/test_config.h"
namespace {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
class DefaultEngineTest : public testing::Test {
protected:
// Does nothing, fills space that a nullptr could not
class FakeEventEngine : public EventEngine {
public:
FakeEventEngine() = default;
~FakeEventEngine() override = default;
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback /* on_accept */,
absl::AnyInvocable<void(absl::Status)> /* on_shutdown */,
const grpc_event_engine::experimental::EndpointConfig& /* config */,
std::unique_ptr<
grpc_event_engine::experimental::
MemoryAllocatorFactory> /* memory_allocator_factory */)
override {
return absl::UnimplementedError("test");
};
ConnectionHandle Connect(
OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */,
const grpc_event_engine::experimental::EndpointConfig& /* args */,
grpc_event_engine::experimental::MemoryAllocator /* memory_allocator */,
Duration /* timeout */) override {
return {-1, -1};
};
bool CancelConnect(ConnectionHandle /* handle */) override {
return false;
};
bool IsWorkerThread() override { return false; };
std::unique_ptr<DNSResolver> GetDNSResolver(
const DNSResolver::ResolverOptions& /* options */) override {
return nullptr;
};
void Run(Closure* /* closure */) override{};
void Run(absl::AnyInvocable<void()> /* closure */) override{};
TaskHandle RunAfter(Duration /* when */, Closure* /* closure */) override {
return {-1, -1};
}
TaskHandle RunAfter(Duration /* when */,
absl::AnyInvocable<void()> /* closure */) override {
return {-1, -1};
}
bool Cancel(TaskHandle /* handle */) override { return false; };
};
};
TEST_F(DefaultEngineTest, SharedPtrGlobalEventEngineLifetimesAreValid) {
int create_count = 0;
grpc_event_engine::experimental::SetEventEngineFactory([&create_count] {
++create_count;
return absl::make_unique<FakeEventEngine>();
});
std::shared_ptr<EventEngine> ee2;
{
std::shared_ptr<EventEngine> ee1 = GetDefaultEventEngine();
ASSERT_EQ(1, create_count);
ee2 = GetDefaultEventEngine();
ASSERT_EQ(1, create_count);
ASSERT_EQ(ee2.use_count(), 2);
}
// Ensure the first shared_ptr did not delete the global
ASSERT_TRUE(ee2.unique());
ASSERT_FALSE(ee2->IsWorkerThread()); // useful for ASAN
// destroy the global engine via the last shared_ptr, and create a new one.
ee2.reset();
ee2 = GetDefaultEventEngine();
ASSERT_EQ(2, create_count);
ASSERT_TRUE(ee2.unique());
grpc_event_engine::experimental::RevertToDefaultEventEngineFactory();
}
TEST_F(DefaultEngineTest, StressTestSharedPtr) {
constexpr int thread_count = 13;
constexpr absl::Duration spin_time = absl::Seconds(3);
std::vector<std::thread> threads;
threads.reserve(thread_count);
for (int i = 0; i < thread_count; i++) {
threads.emplace_back([&spin_time] {
auto timeout = absl::Now() + spin_time;
do {
GetDefaultEventEngine().reset();
} while (timeout > absl::Now());
});
}
for (auto& thd : threads) {
thd.join();
}
}
} // namespace
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}

@ -1,79 +0,0 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <gtest/gtest.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "test/core/event_engine/util/aborting_event_engine.h"
#include "test/core/util/test_config.h"
namespace {
using ::grpc_event_engine::experimental::AbortingEventEngine;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::EventEngineFactoryReset;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
using ::grpc_event_engine::experimental::SetEventEngineFactory;
class EventEngineFactoryTest : public testing::Test {
public:
EventEngineFactoryTest() = default;
~EventEngineFactoryTest() { EventEngineFactoryReset(); }
};
TEST_F(EventEngineFactoryTest, CustomFactoryIsUsed) {
int counter{0};
SetEventEngineFactory([&counter] {
++counter;
return absl::make_unique<AbortingEventEngine>();
});
auto ee1 = GetDefaultEventEngine();
ASSERT_EQ(counter, 1);
auto ee2 = GetDefaultEventEngine();
ASSERT_EQ(counter, 1);
ASSERT_EQ(ee1, ee2);
}
TEST_F(EventEngineFactoryTest, FactoryResetWorks) {
// eliminate a global default if one has been created already.
EventEngineFactoryReset();
int counter{0};
SetEventEngineFactory([&counter]() -> std::unique_ptr<EventEngine> {
// called at most twice;
EXPECT_LE(++counter, 2);
return absl::make_unique<AbortingEventEngine>();
});
auto custom_ee = GetDefaultEventEngine();
ASSERT_EQ(counter, 1);
auto same_ee = GetDefaultEventEngine();
ASSERT_EQ(custom_ee, same_ee);
ASSERT_EQ(counter, 1);
EventEngineFactoryReset();
auto default_ee = GetDefaultEventEngine();
ASSERT_NE(custom_ee, default_ee);
}
} // namespace
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}

@ -27,7 +27,6 @@ grpc_cc_library(
hdrs = ["fuzzing_event_engine.h"], hdrs = ["fuzzing_event_engine.h"],
deps = [ deps = [
":fuzzing_event_engine_proto", ":fuzzing_event_engine_proto",
"//:default_event_engine",
"//:event_engine_base_hdrs", "//:event_engine_base_hdrs",
"//:time", "//:time",
], ],

@ -31,12 +31,16 @@ namespace experimental {
namespace { namespace {
const intptr_t kTaskHandleSalt = 12345; const intptr_t kTaskHandleSalt = 12345;
FuzzingEventEngine* g_fuzzing_event_engine = nullptr; FuzzingEventEngine* g_fuzzing_event_engine = nullptr;
gpr_timespec (*g_orig_gpr_now_impl)(gpr_clock_type clock_type);
} // namespace } // namespace
FuzzingEventEngine::FuzzingEventEngine( FuzzingEventEngine::FuzzingEventEngine(
Options options, const fuzzing_event_engine::Actions& actions) Options options, const fuzzing_event_engine::Actions& actions)
: final_tick_length_(options.final_tick_length) { : final_tick_length_(options.final_tick_length) {
GPR_ASSERT(g_fuzzing_event_engine == nullptr);
g_fuzzing_event_engine = this;
gpr_now_impl = GlobalNowImpl;
tick_increments_.clear(); tick_increments_.clear();
task_delays_.clear(); task_delays_.clear();
tasks_by_id_.clear(); tasks_by_id_.clear();
@ -53,8 +57,7 @@ FuzzingEventEngine::FuzzingEventEngine(
grpc_core::TestOnlySetProcessEpoch(NowAsTimespec(GPR_CLOCK_MONOTONIC)); grpc_core::TestOnlySetProcessEpoch(NowAsTimespec(GPR_CLOCK_MONOTONIC));
auto update_delay = [](std::map<intptr_t, Duration>* map, auto update_delay = [](std::map<intptr_t, Duration>* map,
const fuzzing_event_engine::Delay& delay, fuzzing_event_engine::Delay delay, Duration max) {
Duration max) {
auto& value = (*map)[delay.id()]; auto& value = (*map)[delay.id()];
if (delay.delay_us() > static_cast<uint64_t>(max.count() / GPR_NS_PER_US)) { if (delay.delay_us() > static_cast<uint64_t>(max.count() / GPR_NS_PER_US)) {
value = max; value = max;
@ -81,6 +84,11 @@ void FuzzingEventEngine::FuzzingDone() {
tick_increments_.clear(); tick_increments_.clear();
} }
FuzzingEventEngine::~FuzzingEventEngine() {
GPR_ASSERT(g_fuzzing_event_engine == this);
g_fuzzing_event_engine = nullptr;
}
gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) { gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) {
// TODO(ctiller): add a facility to track realtime and monotonic clocks // TODO(ctiller): add a facility to track realtime and monotonic clocks
// separately to simulate divergence. // separately to simulate divergence.
@ -90,6 +98,12 @@ gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) {
return {secs.count(), static_cast<int32_t>((d - secs).count()), clock_type}; return {secs.count(), static_cast<int32_t>((d - secs).count()), clock_type};
} }
gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) {
GPR_ASSERT(g_fuzzing_event_engine != nullptr);
grpc_core::MutexLock lock(&g_fuzzing_event_engine->mu_);
return g_fuzzing_event_engine->NowAsTimespec(clock_type);
}
void FuzzingEventEngine::Tick() { void FuzzingEventEngine::Tick() {
std::vector<absl::AnyInvocable<void()>> to_run; std::vector<absl::AnyInvocable<void()>> to_run;
{ {
@ -193,25 +207,5 @@ bool FuzzingEventEngine::Cancel(TaskHandle handle) {
return true; return true;
} }
gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) {
GPR_ASSERT(g_fuzzing_event_engine != nullptr);
grpc_core::MutexLock lock(&g_fuzzing_event_engine->mu_);
return g_fuzzing_event_engine->NowAsTimespec(clock_type);
}
void FuzzingEventEngine::SetGlobalNowImplEngine(FuzzingEventEngine* engine) {
GPR_ASSERT(g_fuzzing_event_engine == nullptr);
g_fuzzing_event_engine = engine;
g_orig_gpr_now_impl = gpr_now_impl;
gpr_now_impl = GlobalNowImpl;
}
void FuzzingEventEngine::UnsetGlobalNowImplEngine(FuzzingEventEngine* engine) {
GPR_ASSERT(g_fuzzing_event_engine == engine);
g_fuzzing_event_engine = nullptr;
gpr_now_impl = g_orig_gpr_now_impl;
g_orig_gpr_now_impl = nullptr;
}
} // namespace experimental } // namespace experimental
} // namespace grpc_event_engine } // namespace grpc_event_engine

@ -40,7 +40,7 @@ class FuzzingEventEngine : public EventEngine {
}; };
explicit FuzzingEventEngine(Options options, explicit FuzzingEventEngine(Options options,
const fuzzing_event_engine::Actions& actions); const fuzzing_event_engine::Actions& actions);
~FuzzingEventEngine() override = default; ~FuzzingEventEngine() override;
void FuzzingDone(); void FuzzingDone();
void Tick(); void Tick();
@ -76,11 +76,6 @@ class FuzzingEventEngine : public EventEngine {
Time Now() ABSL_LOCKS_EXCLUDED(mu_); Time Now() ABSL_LOCKS_EXCLUDED(mu_);
static void SetGlobalNowImplEngine(FuzzingEventEngine* engine)
ABSL_LOCKS_EXCLUDED(mu_);
static void UnsetGlobalNowImplEngine(FuzzingEventEngine* engine)
ABSL_LOCKS_EXCLUDED(mu_);
private: private:
struct Task { struct Task {
Task(intptr_t id, absl::AnyInvocable<void()> closure) Task(intptr_t id, absl::AnyInvocable<void()> closure)
@ -93,6 +88,7 @@ class FuzzingEventEngine : public EventEngine {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
static gpr_timespec GlobalNowImpl(gpr_clock_type clock_type) static gpr_timespec GlobalNowImpl(gpr_clock_type clock_type)
ABSL_LOCKS_EXCLUDED(mu_); ABSL_LOCKS_EXCLUDED(mu_);
const Duration final_tick_length_; const Duration final_tick_length_;
grpc_core::Mutex mu_; grpc_core::Mutex mu_;

@ -26,21 +26,20 @@
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::posix_engine::Scheduler; using ::grpc_event_engine::posix_engine::Scheduler;
namespace { namespace {
class TestScheduler : public Scheduler { class TestScheduler : public Scheduler {
public: public:
explicit TestScheduler(std::shared_ptr<EventEngine> engine) explicit TestScheduler(grpc_event_engine::experimental::EventEngine* engine)
: engine_(std::move(engine)) {} : engine_(engine) {}
void Run( void Run(
grpc_event_engine::experimental::EventEngine::Closure* closure) override { grpc_event_engine::experimental::EventEngine::Closure* closure) override {
engine_->Run(closure); engine_->Run(closure);
} }
private: private:
std::shared_ptr<EventEngine> engine_; grpc_event_engine::experimental::EventEngine* engine_;
}; };
TestScheduler* g_scheduler; TestScheduler* g_scheduler;
@ -57,8 +56,8 @@ TEST(LockFreeEventTest, BasicTest) {
event.InitEvent(); event.InitEvent();
grpc_core::MutexLock lock(&mu); grpc_core::MutexLock lock(&mu);
// Set NotifyOn first and then SetReady // Set NotifyOn first and then SetReady
event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( event.NotifyOn(
[&mu, &cv](const absl::Status& status) { IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
grpc_core::MutexLock lock(&mu); grpc_core::MutexLock lock(&mu);
EXPECT_TRUE(status.ok()); EXPECT_TRUE(status.ok());
cv.Signal(); cv.Signal();
@ -68,8 +67,8 @@ TEST(LockFreeEventTest, BasicTest) {
// SetReady first first and then call NotifyOn // SetReady first first and then call NotifyOn
event.SetReady(); event.SetReady();
event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( event.NotifyOn(
[&mu, &cv](const absl::Status& status) { IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
grpc_core::MutexLock lock(&mu); grpc_core::MutexLock lock(&mu);
EXPECT_TRUE(status.ok()); EXPECT_TRUE(status.ok());
cv.Signal(); cv.Signal();
@ -77,8 +76,8 @@ TEST(LockFreeEventTest, BasicTest) {
EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10))); EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
// Set NotifyOn and then call SetShutdown // Set NotifyOn and then call SetShutdown
event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( event.NotifyOn(
[&mu, &cv](const absl::Status& status) { IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
grpc_core::MutexLock lock(&mu); grpc_core::MutexLock lock(&mu);
EXPECT_FALSE(status.ok()); EXPECT_FALSE(status.ok());
EXPECT_EQ(status, absl::CancelledError("Shutdown")); EXPECT_EQ(status, absl::CancelledError("Shutdown"));
@ -112,7 +111,7 @@ TEST(LockFreeEventTest, MultiThreadedTest) {
active++; active++;
if (thread_id == 0) { if (thread_id == 0) {
event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure(
[&mu, &cv, &signalled](const absl::Status& status) { [&mu, &cv, &signalled](absl::Status status) {
grpc_core::MutexLock lock(&mu); grpc_core::MutexLock lock(&mu);
EXPECT_TRUE(status.ok()); EXPECT_TRUE(status.ok());
signalled = true; signalled = true;
@ -146,7 +145,9 @@ TEST(LockFreeEventTest, MultiThreadedTest) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
g_scheduler = new TestScheduler( grpc_event_engine::experimental::EventEngine* engine =
grpc_event_engine::experimental::GetDefaultEventEngine()); grpc_event_engine::experimental::GetDefaultEventEngine();
EXPECT_NE(engine, nullptr);
g_scheduler = new TestScheduler(engine);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }

@ -27,13 +27,13 @@ using ::testing::MockFunction;
class EventEngineSmokeTest : public testing::Test {}; class EventEngineSmokeTest : public testing::Test {};
TEST_F(EventEngineSmokeTest, SetEventEngineFactoryLinks) { TEST_F(EventEngineSmokeTest, SetDefaultEventEngineFactoryLinks) {
// See https://github.com/grpc/grpc/pull/28707 // See https://github.com/grpc/grpc/pull/28707
testing::MockFunction< testing::MockFunction<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()> std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
factory; factory;
EXPECT_CALL(factory, Call()).Times(1); EXPECT_CALL(factory, Call()).Times(1);
grpc_event_engine::experimental::SetEventEngineFactory( grpc_event_engine::experimental::SetDefaultEventEngineFactory(
factory.AsStdFunction()); factory.AsStdFunction());
EXPECT_EQ(nullptr, grpc_event_engine::experimental::CreateEventEngine()); EXPECT_EQ(nullptr, grpc_event_engine::experimental::CreateEventEngine());
} }

@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include <atomic>
#include <map> #include <map>
#include "absl/memory/memory.h" #include "absl/memory/memory.h"
@ -37,15 +36,14 @@ bool squelch = true;
static void dont_log(gpr_log_func_args* /*args*/) {} static void dont_log(gpr_log_func_args* /*args*/) {}
static grpc_core::Mutex g_now_mu; static gpr_timespec g_now;
static gpr_timespec g_now ABSL_GUARDED_BY(g_now_mu);
extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
static gpr_timespec now_impl(gpr_clock_type clock_type) { static gpr_timespec now_impl(gpr_clock_type clock_type) {
GPR_ASSERT(clock_type != GPR_TIMESPAN); GPR_ASSERT(clock_type != GPR_TIMESPAN);
grpc_core::MutexLock lock(&g_now_mu); gpr_timespec ts = g_now;
g_now.clock_type = clock_type; ts.clock_type = clock_type;
return g_now; return ts;
} }
namespace grpc_core { namespace grpc_core {
@ -282,13 +280,11 @@ class MainLoop {
switch (action.type_case()) { switch (action.type_case()) {
case filter_fuzzer::Action::TYPE_NOT_SET: case filter_fuzzer::Action::TYPE_NOT_SET:
break; break;
case filter_fuzzer::Action::kAdvanceTimeMicroseconds: { case filter_fuzzer::Action::kAdvanceTimeMicroseconds:
MutexLock lock(&g_now_mu);
g_now = gpr_time_add( g_now = gpr_time_add(
g_now, gpr_time_from_micros(action.advance_time_microseconds(), g_now, gpr_time_from_micros(action.advance_time_microseconds(),
GPR_TIMESPAN)); GPR_TIMESPAN));
break; break;
}
case filter_fuzzer::Action::kCancel: case filter_fuzzer::Action::kCancel:
calls_.erase(action.call()); calls_.erase(action.call());
break; break;
@ -592,11 +588,8 @@ DEFINE_PROTO_FUZZER(const filter_fuzzer::Msg& msg) {
char* grpc_trace_fuzzer = gpr_getenv("GRPC_TRACE_FUZZER"); char* grpc_trace_fuzzer = gpr_getenv("GRPC_TRACE_FUZZER");
if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log); if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log);
gpr_free(grpc_trace_fuzzer); gpr_free(grpc_trace_fuzzer);
{
grpc_core::MutexLock lock(&g_now_mu);
g_now = {1, 0, GPR_CLOCK_MONOTONIC}; g_now = {1, 0, GPR_CLOCK_MONOTONIC};
grpc_core::TestOnlySetProcessEpoch(g_now); grpc_core::TestOnlySetProcessEpoch(g_now);
}
gpr_now_impl = now_impl; gpr_now_impl = now_impl;
grpc_init(); grpc_init();
grpc_timer_manager_set_threading(false); grpc_timer_manager_set_threading(false);

@ -25,7 +25,6 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
#include "src/core/lib/promise/race.h" #include "src/core/lib/promise/race.h"
@ -38,15 +37,12 @@ TEST(Sleep, Zzzz) {
ExecCtx exec_ctx; ExecCtx exec_ctx;
absl::Notification done; absl::Notification done;
Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1); Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1);
auto engine = grpc_event_engine::experimental::GetDefaultEventEngine();
// Sleep for one second then set done to true. // Sleep for one second then set done to true.
auto activity = MakeActivity( auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(),
Sleep(done_time), InlineWakeupScheduler(),
[&done](absl::Status r) { [&done](absl::Status r) {
EXPECT_EQ(r, absl::OkStatus()); EXPECT_EQ(r, absl::OkStatus());
done.Notify(); done.Notify();
}, });
engine.get());
done.WaitForNotification(); done.WaitForNotification();
exec_ctx.InvalidateNow(); exec_ctx.InvalidateNow();
EXPECT_GE(ExecCtx::Get()->Now(), done_time); EXPECT_GE(ExecCtx::Get()->Now(), done_time);
@ -56,15 +52,12 @@ TEST(Sleep, AlreadyDone) {
ExecCtx exec_ctx; ExecCtx exec_ctx;
absl::Notification done; absl::Notification done;
Timestamp done_time = ExecCtx::Get()->Now() - Duration::Seconds(1); Timestamp done_time = ExecCtx::Get()->Now() - Duration::Seconds(1);
auto engine = grpc_event_engine::experimental::GetDefaultEventEngine();
// Sleep for no time at all then set done to true. // Sleep for no time at all then set done to true.
auto activity = MakeActivity( auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(),
Sleep(done_time), InlineWakeupScheduler(),
[&done](absl::Status r) { [&done](absl::Status r) {
EXPECT_EQ(r, absl::OkStatus()); EXPECT_EQ(r, absl::OkStatus());
done.Notify(); done.Notify();
}, });
engine.get());
done.WaitForNotification(); done.WaitForNotification();
} }
@ -72,16 +65,13 @@ TEST(Sleep, Cancel) {
ExecCtx exec_ctx; ExecCtx exec_ctx;
absl::Notification done; absl::Notification done;
Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1); Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1);
auto engine = grpc_event_engine::experimental::GetDefaultEventEngine();
// Sleep for one second but race it to complete immediately // Sleep for one second but race it to complete immediately
auto activity = MakeActivity( auto activity = MakeActivity(
Race(Sleep(done_time), [] { return absl::CancelledError(); }), Race(Sleep(done_time), [] { return absl::CancelledError(); }),
InlineWakeupScheduler(), InlineWakeupScheduler(), [&done](absl::Status r) {
[&done](absl::Status r) {
EXPECT_EQ(r, absl::CancelledError()); EXPECT_EQ(r, absl::CancelledError());
done.Notify(); done.Notify();
}, });
engine.get());
done.WaitForNotification(); done.WaitForNotification();
exec_ctx.InvalidateNow(); exec_ctx.InvalidateNow();
EXPECT_LT(ExecCtx::Get()->Now(), done_time); EXPECT_LT(ExecCtx::Get()->Now(), done_time);
@ -94,14 +84,11 @@ TEST(Sleep, MoveSemantics) {
Timestamp done_time = ExecCtx::Get()->Now() + Duration::Milliseconds(111); Timestamp done_time = ExecCtx::Get()->Now() + Duration::Milliseconds(111);
Sleep donor(done_time); Sleep donor(done_time);
Sleep sleeper = std::move(donor); Sleep sleeper = std::move(donor);
auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); auto activity = MakeActivity(std::move(sleeper), InlineWakeupScheduler(),
auto activity = MakeActivity(
std::move(sleeper), InlineWakeupScheduler(),
[&done](absl::Status r) { [&done](absl::Status r) {
EXPECT_EQ(r, absl::OkStatus()); EXPECT_EQ(r, absl::OkStatus());
done.Notify(); done.Notify();
}, });
engine.get());
done.WaitForNotification(); done.WaitForNotification();
exec_ctx.InvalidateNow(); exec_ctx.InvalidateNow();
EXPECT_GE(ExecCtx::Get()->Now(), done_time); EXPECT_GE(ExecCtx::Get()->Now(), done_time);
@ -113,14 +100,12 @@ TEST(Sleep, StressTest) {
ExecCtx exec_ctx; ExecCtx exec_ctx;
std::vector<std::shared_ptr<absl::Notification>> notifications; std::vector<std::shared_ptr<absl::Notification>> notifications;
std::vector<ActivityPtr> activities; std::vector<ActivityPtr> activities;
auto engine = grpc_event_engine::experimental::GetDefaultEventEngine();
gpr_log(GPR_INFO, "Starting %d sleeps for 1sec", kNumActivities); gpr_log(GPR_INFO, "Starting %d sleeps for 1sec", kNumActivities);
for (int i = 0; i < kNumActivities; i++) { for (int i = 0; i < kNumActivities; i++) {
auto notification = std::make_shared<absl::Notification>(); auto notification = std::make_shared<absl::Notification>();
auto activity = MakeActivity( auto activity = MakeActivity(
Sleep(exec_ctx.Now() + Duration::Seconds(1)), ExecCtxWakeupScheduler(), Sleep(exec_ctx.Now() + Duration::Seconds(1)), ExecCtxWakeupScheduler(),
[notification](absl::Status /*r*/) { notification->Notify(); }, [notification](absl::Status /*r*/) { notification->Notify(); });
engine.get());
notifications.push_back(std::move(notification)); notifications.push_back(std::move(notification));
activities.push_back(std::move(activity)); activities.push_back(std::move(activity));
} }

@ -38,17 +38,22 @@ extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
namespace grpc_core { namespace grpc_core {
namespace testing { namespace testing {
namespace { namespace {
std::atomic<int> g_clock{123}; int g_clock = 123;
Mutex mu_;
gpr_timespec fake_gpr_now(gpr_clock_type clock_type) { gpr_timespec fake_gpr_now(gpr_clock_type clock_type) {
MutexLock lock(&mu_);
gpr_timespec ts; gpr_timespec ts;
ts.tv_sec = g_clock.load(); ts.tv_sec = g_clock;
ts.tv_nsec = 0; ts.tv_nsec = 0;
ts.clock_type = clock_type; ts.clock_type = clock_type;
return ts; return ts;
} }
void inc_time(void) { g_clock.fetch_add(30); } void inc_time(void) {
MutexLock lock(&mu_);
g_clock += 30;
}
} // namespace } // namespace
TEST(BdpEstimatorTest, NoOp) { BdpEstimator est("test"); } TEST(BdpEstimatorTest, NoOp) { BdpEstimator est("test"); }

@ -2437,30 +2437,6 @@
], ],
"uses_polling": true "uses_polling": true
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "default_engine_methods_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,

Loading…
Cancel
Save