passes //test/cpp/end2end:rls_end2end_test

pull/36709/head
AJ Heller 10 months ago
parent 5a7bc2f957
commit fa162d08be
  1. 8
      src/core/lib/event_engine/trace.h
  2. 10
      src/core/lib/event_engine/windows/win_socket.cc
  3. 150
      src/core/lib/event_engine/windows/windows_engine.cc
  4. 113
      src/core/lib/event_engine/windows/windows_engine.h
  5. 17
      test/core/event_engine/event_engine_test_utils.cc
  6. 8
      test/core/event_engine/event_engine_test_utils.h
  7. 4
      test/cpp/end2end/rls_end2end_test.cc

@ -27,22 +27,22 @@ extern grpc_core::TraceFlag grpc_event_engine_endpoint_trace;
#define GRPC_EVENT_ENGINE_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { \
gpr_log(GPR_DEBUG, "(event_engine) " format, __VA_ARGS__); \
gpr_log(GPR_ERROR, "(event_engine) " format, __VA_ARGS__); \
}
#define GRPC_EVENT_ENGINE_ENDPOINT_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_endpoint_trace)) { \
gpr_log(GPR_DEBUG, "(event_engine endpoint) " format, __VA_ARGS__); \
gpr_log(GPR_ERROR, "(event_engine endpoint) " format, __VA_ARGS__); \
}
#define GRPC_EVENT_ENGINE_POLLER_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_poller_trace)) { \
gpr_log(GPR_DEBUG, "(event_engine poller) " format, __VA_ARGS__); \
gpr_log(GPR_ERROR, "(event_engine poller) " format, __VA_ARGS__); \
}
#define GRPC_EVENT_ENGINE_DNS_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_dns_trace)) { \
gpr_log(GPR_DEBUG, "(event_engine dns) " format, __VA_ARGS__); \
gpr_log(GPR_ERROR, "(event_engine dns) " format, __VA_ARGS__); \
}
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_TRACE_H

@ -69,13 +69,15 @@ void WinSocket::Shutdown() {
sizeof(guid), &DisconnectEx, sizeof(DisconnectEx),
&ioctl_num_bytes, NULL, NULL);
if (status == 0) {
DisconnectEx(socket_, NULL, 0, 0);
} else {
if (status != 0) {
char* utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_INFO, "Unable to retrieve DisconnectEx pointer : %s",
gpr_log(GPR_ERROR, "Unable to retrieve DisconnectEx pointer : %s",
utf8_message);
gpr_free(utf8_message);
} else if (DisconnectEx(socket_, NULL, 0, 0) != 0) {
char* utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "DisconnectEx failed : %s", utf8_message);
gpr_free(utf8_message);
}
closesocket(socket_);
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p socket closed", this);

@ -80,47 +80,55 @@ WindowsEventEngine::ConnectionState::ConnectionState(
CHECK(socket_ != nullptr);
connection_handle_ = ConnectionHandle{reinterpret_cast<intptr_t>(this),
engine_->aba_token_.fetch_add(1)};
LOG(ERROR) << "DO NOT SUBMIT: ConnectionState owned engine::"
<< engine_.use_count();
}
void WindowsEventEngine::ConnectionState::Start(Duration timeout) {
socket_->NotifyOnWrite(MakeOnConnectedCallback());
timer_handle_ = engine_->RunAfter(timeout, MakeDeadlineTimerCallback());
}
void WindowsEventEngine::ConnectionState::RunUserCallback(
absl::StatusOr<std::unique_ptr<Endpoint>> error) ABSL_LOCKS_EXCLUDED(mu_) {
EventEngine::OnConnectCallback cb;
{
grpc_core::MutexLock lock(&mu_);
cb = std::exchange(on_connect_user_callback_, nullptr);
}
cb(std::move(error));
}
EventEngine::Closure*
WindowsEventEngine::ConnectionState::MakeOnConnectedCallback() {
on_connected_cb_ =
std::make_unique<OnConnectedCallback>(engine_, shared_from_this());
return on_connected_cb_.get();
std::make_unique<OnConnectedCallback>(engine_.get(), shared_from_this());
socket_->NotifyOnWrite(on_connected_cb_.get());
deadline_timer_cb_ = std::make_unique<DeadlineTimerCallback>(
engine_.get(), shared_from_this());
timer_handle_ = engine_->RunAfter(timeout, deadline_timer_cb_.get());
}
EventEngine::Closure*
WindowsEventEngine::ConnectionState::MakeDeadlineTimerCallback() {
deadline_timer_cb_ =
std::make_unique<DeadlineTimerCallback>(engine_, shared_from_this());
return deadline_timer_cb_.get();
EventEngine::OnConnectCallback
WindowsEventEngine::ConnectionState::TakeCallback() {
grpc_core::MutexLock lock(&mu_);
return std::exchange(on_connect_user_callback_, nullptr);
}
std::unique_ptr<WindowsEndpoint>
WindowsEventEngine::ConnectionState::MakeEndpoint(ThreadPool* thread_pool) {
WindowsEventEngine::ConnectionState::FinishConnectingAndMakeEndpoint(
ThreadPool* thread_pool) {
ChannelArgsEndpointConfig cfg;
// DO NOT SUBMIT: both callbacks should have been called or destroyed.
// Need to assert that the deadline timer is cancelled
if (!engine_->Cancel(timer_handle_)) {
LOG(ERROR) << "DO NOT SUBMIT: deadline timer may still run";
} else {
LOG(ERROR) << "DO NOT SUBMIT: deadline timer cancelled, refs are released";
deadline_timer_cb_.reset();
}
return std::make_unique<WindowsEndpoint>(address_, std::move(socket_),
std::move(allocator_), cfg,
thread_pool, engine_);
}
void WindowsEventEngine::ConnectionState::AbortOnConnect() {
LOG(ERROR) << "DO NOT SUBMIT: AbortOnConnect";
on_connected_cb_.reset();
}
void WindowsEventEngine::ConnectionState::AbortDeadlineTimer() {
LOG(ERROR) << "DO NOT SUBMIT: AbortDeadlineTimer";
deadline_timer_cb_.reset();
}
void WindowsEventEngine::ConnectionState::OnConnectedCallback::Run() {
CHECK_NE(connection_state_, nullptr)
LOG(ERROR) << "DO NOT SUBMIT: OnConnectedCallback::Run";
DCHECK_NE(connection_state_, nullptr)
<< "ConnectionState::OnConnectedCallback::" << this
<< " has already run. It should only ever run once.";
bool has_run;
@ -131,16 +139,24 @@ void WindowsEventEngine::ConnectionState::OnConnectedCallback::Run() {
// This could race with the deadline timer. If so, the OnConnectCompleted
// callback should not run, and the refs should be released.
if (has_run) {
LOG(ERROR) << "DO NOT SUBMIT: OnConnectedCallback resetting "
"connection_state_";
connection_state_.reset();
engine_.reset();
return;
}
engine_->OnConnectCompleted(std::move(connection_state_));
// Reset to release the ref.
engine_.reset();
}
// DO NOT SUBMIT: Need to get the deadline timer to release its ref if it's
// never going to be called. bazel test --test_output=streamed
// --config=windows_dbg --runs_per_test=1 --test_env=grpc_verbosity=debug
// --test_env=GRPC_EXPERIMENTS=event_engine_client
// --test_env=GRPC_TRACE=event_engine
// --test_filter='*RlsEnd2endTest.ConnectivityStateTransientFailure*'
// --test_arg=--v=10 //test/cpp/end2end:rls_end2end_test
void WindowsEventEngine::ConnectionState::DeadlineTimerCallback::Run() {
LOG(ERROR) << "DO NOT SUBMIT: DeadlineTimerCallback::Run";
CHECK_NE(connection_state_, nullptr)
<< "ConnectionState::DeadlineTimerCallback::" << this
<< " has already run. It should only ever run once.";
@ -152,13 +168,12 @@ void WindowsEventEngine::ConnectionState::DeadlineTimerCallback::Run() {
// This could race with the deadline timer. If so, the OnConnectCompleted
// callback should not run, and the refs should be released.
if (has_run) {
LOG(ERROR) << "DO NOT SUBMIT: DeadlineTimerCallback resetting "
"connection_state_";
connection_state_.reset();
engine_.reset();
return;
}
engine_->OnDeadlineTimerFired(std::move(connection_state_));
// Reset to release the ref.
engine_.reset();
}
// ---- IOCPWorkClosure ----
@ -358,7 +373,6 @@ bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
void WindowsEventEngine::OnConnectCompleted(
std::shared_ptr<ConnectionState> state) {
absl::StatusOr<std::unique_ptr<WindowsEndpoint>> endpoint;
EventEngine::OnConnectCallback cb;
{
// Connection attempt complete!
grpc_core::MutexLock lock(&state->mu());
@ -376,20 +390,26 @@ void WindowsEventEngine::OnConnectCompleted(
"Not accepting connection since the deadline timer has fired");
return;
}
// Release refs held by the deadline timer.
state->AbortDeadlineTimer();
const auto& overlapped_result = state->socket()->write_info()->result();
if (!overlapped_result.error_status.ok()) {
state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
endpoint = overlapped_result.error_status;
LOG(ERROR) << "DO NOT SUBMIT: error on connect: " << endpoint.status();
} else if (overlapped_result.wsa_error != 0) {
state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
endpoint = GRPC_WSA_ERROR(overlapped_result.wsa_error, "ConnectEx");
LOG(ERROR) << "DO NOT SUBMIT: error on connect: " << endpoint.status();
} else {
endpoint = state->MakeEndpoint(thread_pool_.get());
endpoint = state->FinishConnectingAndMakeEndpoint(thread_pool_.get());
}
}
// This code should be running in a thread pool thread already, so the
// callback can be run directly.
state->RunUserCallback(std::move(endpoint));
auto cb = state->TakeCallback();
state.reset();
cb(std::move(endpoint));
}
void WindowsEventEngine::OnDeadlineTimerFired(
@ -400,8 +420,9 @@ void WindowsEventEngine::OnDeadlineTimerFired(
cancelled = CancelConnectFromDeadlineTimer(connection_state.get());
}
if (cancelled) {
connection_state->RunUserCallback(
absl::DeadlineExceededError("Connection timed out"));
auto cb = connection_state->TakeCallback();
connection_state.reset();
cb(absl::DeadlineExceededError("Connection timed out"));
return;
}
GRPC_EVENT_ENGINE_TRACE(
@ -509,29 +530,39 @@ EventEngine::ConnectionHandle WindowsEventEngine::Connect(
address.size(), nullptr, 0, nullptr, info->overlapped());
// It wouldn't be unusual to get a success immediately. But we'll still get an
// IOCP notification, so let's ignore it.
if (!success) {
int last_error = WSAGetLastError();
if (last_error != ERROR_IO_PENDING) {
if (!Cancel(connection_state->timer_handle())) {
return EventEngine::ConnectionHandle::kInvalid;
}
connection_state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx");
{
grpc_core::MutexLock connection_handle_lock(&connection_mu_);
CHECK(known_connection_handles_.erase(
connection_state->connection_handle()) == 1);
}
Run([connection_state = std::move(connection_state),
status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable {
connection_state->RunUserCallback(status);
});
return EventEngine::ConnectionHandle::kInvalid;
}
if (success) return connection_state->connection_handle();
// Otherwise, we need to handle an error or IO Event.
int last_error = WSAGetLastError();
if (last_error == ERROR_IO_PENDING) {
// Overlapped I/O operation is in progress.
return connection_state->connection_handle();
}
return connection_state->connection_handle();
// Abort the connection.
// The on-connect callback won't run, so we must clean up its state.
connection_state->AbortOnConnect();
{
grpc_core::MutexLock connection_handle_lock(&connection_mu_);
CHECK(known_connection_handles_.erase(
connection_state->connection_handle()) == 1);
}
connection_state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx");
if (!Cancel(connection_state->timer_handle())) {
// The deadline timer will run, or is running.
return EventEngine::ConnectionHandle::kInvalid;
}
// The deadline timer won't run, so we must clean up its state.
connection_state->AbortDeadlineTimer();
Run([connection_state = std::move(connection_state),
status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable {
auto cb = connection_state->TakeCallback();
connection_state.reset();
cb(std::move(status));
});
return EventEngine::ConnectionHandle::kInvalid;
}
bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
LOG(ERROR) << "DO NOT SUBMIT: CancelConnect for handle " << handle;
if (handle == EventEngine::ConnectionHandle::kInvalid) {
GRPC_EVENT_ENGINE_TRACE("%s",
"Attempted to cancel an invalid connection handle");
@ -540,17 +571,20 @@ bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
// Erase the connection handle, which may be unknown
{
grpc_core::MutexLock lock(&connection_mu_);
if (!known_connection_handles_.contains(handle)) {
if (known_connection_handles_.erase(handle) != 1) {
GRPC_EVENT_ENGINE_TRACE(
"Unknown connection handle: %s",
HandleToString<EventEngine::ConnectionHandle>(handle).c_str());
return false;
}
known_connection_handles_.erase(handle);
}
auto* connection_state = reinterpret_cast<ConnectionState*>(handle.keys[0]);
grpc_core::MutexLock state_lock(&connection_state->mu());
// The connection cannot be cancelled if the deadline timer is already firing.
if (!Cancel(connection_state->timer_handle())) return false;
// The deadline timer was cancelled, so we must clean up its state. The
// on-connect callback will run when the
connection_state->AbortDeadlineTimer();
return CancelConnectInternalStateLocked(connection_state);
}
@ -561,6 +595,8 @@ bool WindowsEventEngine::CancelConnectFromDeadlineTimer(
grpc_core::MutexLock lock(&connection_mu_);
if (known_connection_handles_.erase(
connection_state->connection_handle()) != 1) {
LOG(ERROR) << "DO NOT SUBMIT: could not find connection handle, "
"OnConnectCompleted must be running already";
return false;
}
}

@ -23,6 +23,7 @@
#include <memory>
#include <ostream>
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
@ -45,8 +46,6 @@
namespace grpc_event_engine {
namespace experimental {
// TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that
// EventEngine is shut down before we shut down iomgr.
class WindowsEventEngine : public EventEngine,
public grpc_core::KeepsGrpcInitialized {
public:
@ -106,9 +105,11 @@ class WindowsEventEngine : public EventEngine,
IOCP* poller() { return &iocp_; }
private:
// State of an active connection.
// Managed by a shared_ptr, owned exclusively by the timeout callback and the
// OnConnectCompleted callback herein.
// The state of an active connection.
//
// This object is managed by a shared_ptr, which is owned by:
// 1) the deadline timer callback, and
// 2) the OnConnectCompleted callback.
class ConnectionState : public std::enable_shared_from_this<ConnectionState> {
public:
ConnectionState(std::shared_ptr<WindowsEventEngine> engine,
@ -116,16 +117,34 @@ class WindowsEventEngine : public EventEngine,
EventEngine::ResolvedAddress address,
MemoryAllocator allocator,
EventEngine::OnConnectCallback on_connect_user_callback);
~ConnectionState() {
LOG(ERROR) << "DO NOT SUBMIT: ~ConnectionState. ee count::"
<< engine_.use_count() - 1;
}
// Starts the deadline timer, and sets up the socket to notify on writes.
//
// This cannot be done in the constructor since shared_from_this is required
// for the callbacks to hold a ref to this object.
void Start(Duration timeout);
// Runs the user callback and resets it to nullptr to ensure it only runs
// Returns the user's callback and resets it to nullptr to ensure it only runs
// once.
void RunUserCallback(absl::StatusOr<std::unique_ptr<Endpoint>> error)
ABSL_LOCKS_EXCLUDED(mu_);
std::unique_ptr<WindowsEndpoint> MakeEndpoint(ThreadPool* thread_pool);
// DO NOT SUBMIT: this probably should not be necessary.
OnConnectCallback TakeCallback() ABSL_LOCKS_EXCLUDED(mu_);
// Create an Endpoint, transfering held object ownership to the endpoint.
//
// This can only be called once, and the connection state is no longer valid
// after an endpoint has been created.
// DO NOT SUBMIT: is this a good API? Who should own this responsibility?
std::unique_ptr<WindowsEndpoint> FinishConnectingAndMakeEndpoint(
ThreadPool* thread_pool);
// Release all refs to the on-connect callback.
void AbortOnConnect();
// Release all refs to the deadline timer callback.
void AbortDeadlineTimer();
WinSocket* socket() { return socket_.get(); }
@ -140,68 +159,82 @@ class WindowsEventEngine : public EventEngine,
friend std::ostream& operator<<(std::ostream& out,
const ConnectionState& connection_state);
// Creates the notify on write callback.
// Holds refs to the ConnectionState and WindowsEventEngine
EventEngine::Closure* MakeOnConnectedCallback()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Creates the deadline timer callback.
// Holds refs to the ConnectionState and WindowsEventEngine
EventEngine::Closure* MakeDeadlineTimerCallback()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Stateful closure for the endpoint's on-connect callback.
//
// Once created, this closure must be Run or deleted to release the held
// refs.
class OnConnectedCallback : public EventEngine::Closure {
public:
OnConnectedCallback(std::shared_ptr<WindowsEventEngine> engine,
OnConnectedCallback(WindowsEventEngine* engine,
std::shared_ptr<ConnectionState> connection_state)
: engine_(std::move(engine)),
connection_state_(std::move(connection_state)) {}
: engine_(engine), connection_state_(std::move(connection_state)) {}
~OnConnectedCallback() override {
LOG(ERROR) << "DO NOT SUBMIT: ~OnConnectedCallback";
}
// Runs the WindowsEngine's OnConnectCompleted if the deadline timer
// Runs the WindowsEventEngine's OnConnectCompleted if the deadline timer
// hasn't fired first.
void Run();
private:
std::shared_ptr<WindowsEventEngine> engine_;
WindowsEventEngine* engine_;
std::shared_ptr<ConnectionState> connection_state_;
};
// Stateful closure for the deadline timer.
//
// Once created, this closure must be Run or deleted to release the held
// refs.
class DeadlineTimerCallback : public EventEngine::Closure {
public:
DeadlineTimerCallback(std::shared_ptr<WindowsEventEngine> engine,
DeadlineTimerCallback(WindowsEventEngine* engine,
std::shared_ptr<ConnectionState> connection_state)
: engine_(std::move(engine)),
connection_state_(std::move(connection_state)) {}
: engine_(engine), connection_state_(std::move(connection_state)) {}
~DeadlineTimerCallback() override {
LOG(ERROR) << "DO NOT SUBMIT: ~DeadlineTimerCallback";
}
// Runs the WindowsEngine's OnDeadlineTimerFired if the deadline timer
// hasn't fired first.
// Runs the WindowsEventEngine's OnDeadlineTimerFired if the deadline
// timer hasn't fired first.
void Run();
private:
std::shared_ptr<WindowsEventEngine> engine_;
WindowsEventEngine* engine_;
std::shared_ptr<ConnectionState> connection_state_;
};
// everything is guarded by mu_;
grpc_core::Mutex mu_
ABSL_ACQUIRED_BEFORE(WindowsEventEngine::connection_mu_);
// Endpoint connection state.
std::unique_ptr<WinSocket> socket_ ABSL_GUARDED_BY(mu_);
EventEngine::ResolvedAddress address_ ABSL_GUARDED_BY(mu_);
MemoryAllocator allocator_ ABSL_GUARDED_BY(mu_);
std::shared_ptr<WindowsEventEngine> engine_ ABSL_GUARDED_BY(mu_);
EventEngine::OnConnectCallback on_connect_user_callback_
ABSL_GUARDED_BY(mu_);
EventEngine::ConnectionHandle connection_handle_ ABSL_GUARDED_BY(mu_);
EventEngine::TaskHandle timer_handle_ ABSL_GUARDED_BY(mu_) =
EventEngine::TaskHandle::kInvalid;
// This guarantees the EventEngine survives long enough to execute these
// deadline timer or on-connect callbacks.
std::shared_ptr<WindowsEventEngine> engine_ ABSL_GUARDED_BY(mu_);
// Owned closures. These hold refs to this object.
std::unique_ptr<OnConnectedCallback> on_connected_cb_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<DeadlineTimerCallback> deadline_timer_cb_
ABSL_GUARDED_BY(mu_);
// Their respective method handles.
EventEngine::ConnectionHandle connection_handle_ ABSL_GUARDED_BY(mu_) =
EventEngine::ConnectionHandle::kInvalid;
EventEngine::TaskHandle timer_handle_ ABSL_GUARDED_BY(mu_) =
EventEngine::TaskHandle::kInvalid;
// Flag to ensure that only one of the even closures will complete its
// responsibilities.
bool has_run_ ABSL_GUARDED_BY(mu_) = false;
};
// Required for the function to see the private ConnectionState type.
friend std::ostream& operator<<(std::ostream& out,
const ConnectionState& connection_state);
struct TimerClosure;
// A poll worker which schedules itself unless kicked
class IOCPWorkClosure : public EventEngine::Closure {
public:
@ -217,13 +250,16 @@ class WindowsEventEngine : public EventEngine,
IOCP* iocp_;
};
// Called via IOCP notifications when a connection is ready to be processed.
// Either this or the deadline timer will run, never both.
void OnConnectCompleted(std::shared_ptr<ConnectionState> state);
// Called after a timeout when no connection has been established.
// Either this or the on-connect callback will run, never both.
void OnDeadlineTimerFired(std::shared_ptr<ConnectionState> state);
// CancelConnect called from within the deadline timer.
// In this case, the connection_state->mu_ is already locked, and timer
// cancellation is not possible.
// CancelConnect, called from within the deadline timer.
// Timer cancellation is not possible.
bool CancelConnectFromDeadlineTimer(ConnectionState* connection_state)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu_);
@ -232,7 +268,6 @@ class WindowsEventEngine : public EventEngine,
bool CancelConnectInternalStateLocked(ConnectionState* connection_state)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu_);
struct TimerClosure;
EventEngine::TaskHandle RunAfterInternal(Duration when,
absl::AnyInvocable<void()> cb);
grpc_core::Mutex task_mu_;

@ -38,6 +38,7 @@
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/resource_quota/memory_quota.h"
@ -88,6 +89,22 @@ void WaitForSingleOwner(std::shared_ptr<EventEngine> engine) {
}
}
void WaitForSingleOwner(std::shared_ptr<EventEngine> engine,
EventEngine::Duration timeout) {
int n = 0;
auto start = std::chrono::system_clock::now();
while (engine.use_count() > 1) {
++n;
if (n % 100 == 0) AsanAssertNoLeaks();
if (std::chrono::system_clock::now() - start > timeout) {
grpc_core::Crash("Timed out waiting for a single EventEngine owner");
}
GRPC_LOG_EVERY_N_SEC(2, GPR_INFO, "engine.use_count() = %ld",
engine.use_count());
absl::SleepFor(absl::Milliseconds(100));
}
}
void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data) {
buf->Append(Slice::FromCopiedString(data));
}

@ -52,6 +52,14 @@ std::string GetNextSendMessage();
// Usage: WaitForSingleOwner(std::move(engine))
void WaitForSingleOwner(std::shared_ptr<EventEngine> engine);
// Waits until the use_count of the EventEngine shared_ptr has reached 1
// and returns.
// Callers must give up their ref, or this method will block forever.
// This version will CRASH after the given timeout
// Usage: WaitForSingleOwner(std::move(engine), 30s)
void WaitForSingleOwner(std::shared_ptr<EventEngine> engine,
EventEngine::Duration timeout);
// A helper method to exchange data between two endpoints. It is assumed
// that both endpoints are connected. The data (specified as a string) is
// written by the sender_endpoint and read by the receiver_endpoint. It

@ -176,8 +176,8 @@ class RlsEnd2endTest : public ::testing::Test {
static void TearDownTestSuite() {
grpc_shutdown_blocking();
WaitForSingleOwner(
grpc_event_engine::experimental::GetDefaultEventEngine());
WaitForSingleOwner(grpc_event_engine::experimental::GetDefaultEventEngine(),
std::chrono::seconds(10));
grpc_core::CoreConfiguration::Reset();
}

Loading…
Cancel
Save