[thready_tsan] Grab bag of improvements (#36886)

- ensure ordering of `OnAccept` and `Shutdown` callbacks from thready event engine (previously these could be reordered and this caused spurious failures)
- enable thready_tsan for one C++ e2e test
- don't filter thready_tsan for local builds (only CI)

Closes #36886

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36886 from ctiller:erm e3b88e7d86
PiperOrigin-RevId: 642702724
pull/36447/head^2
Craig Tiller 6 months ago committed by Copybara-Service
parent 9954dbacfc
commit fdac9ebfdf
  1. 38
      src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
  2. 1
      test/cpp/end2end/BUILD
  3. 1
      tools/bazel.rc

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
namespace grpc_event_engine {
@ -39,20 +40,45 @@ ThreadyEventEngine::CreateListener(
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
struct AcceptState {
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
int pending_accepts_ ABSL_GUARDED_BY(mu_) = 0;
};
auto accept_state = std::make_shared<AcceptState>();
return impl_->CreateListener(
[this, on_accept = std::make_shared<Listener::AcceptCallback>(
std::move(on_accept))](std::unique_ptr<Endpoint> endpoint,
MemoryAllocator memory_allocator) {
[this, accept_state,
on_accept = std::make_shared<Listener::AcceptCallback>(
std::move(on_accept))](std::unique_ptr<Endpoint> endpoint,
MemoryAllocator memory_allocator) {
{
grpc_core::MutexLock lock(&accept_state->mu_);
++accept_state->pending_accepts_;
}
Asynchronously(
[on_accept, endpoint = std::move(endpoint),
[on_accept, accept_state, endpoint = std::move(endpoint),
memory_allocator = std::move(memory_allocator)]() mutable {
(*on_accept)(std::move(endpoint), std::move(memory_allocator));
{
grpc_core::MutexLock lock(&accept_state->mu_);
--accept_state->pending_accepts_;
if (accept_state->pending_accepts_ == 0) {
accept_state->cv_.Signal();
}
}
});
},
[this,
[this, accept_state,
on_shutdown = std::move(on_shutdown)](absl::Status status) mutable {
Asynchronously([on_shutdown = std::move(on_shutdown),
Asynchronously([accept_state, on_shutdown = std::move(on_shutdown),
status = std::move(status)]() mutable {
while (true) {
grpc_core::MutexLock lock(&accept_state->mu_);
if (accept_state->pending_accepts_ == 0) {
break;
}
accept_state->cv_.Wait(&accept_state->mu_);
}
on_shutdown(std::move(status));
});
},

@ -112,6 +112,7 @@ grpc_cc_test(
tags = [
"cpp_end2end_test",
"no_test_ios",
"thready_tsan",
],
deps = [
"//:gpr",

@ -133,7 +133,6 @@ build:thready_tsan --copt=-DGPR_NO_DIRECT_SYSCALLS
build:thready_tsan --copt=-DGRPC_TSAN
build:thready_tsan --copt=-DGRPC_MAXIMIZE_THREADYNESS
build:thready_tsan --linkopt=-fsanitize=thread
build:thready_tsan --test_tag_filters=thready_tsan
build:thready_tsan --action_env=TSAN_OPTIONS=suppressions=test/core/test_util/tsan_suppressions.txt:halt_on_error=1:second_deadlock_stack=1
build:tsan_macos --strip=never

Loading…
Cancel
Save