[tsan] Flake fix in chaotic good connector

pull/37657/head
Craig Tiller 3 months ago
parent 1ed28f882f
commit 2aba75edaf
  1. 26
      src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc
  2. 2
      src/core/ext/transport/chaotic_good/client/chaotic_good_connector.h

@ -134,9 +134,12 @@ auto ChaoticGoodConnector::WaitForDataEndpointSetup(
endpoint) mutable { endpoint) mutable {
ExecCtx exec_ctx; ExecCtx exec_ctx;
if (!endpoint.ok() || self->handshake_mgr_ == nullptr) { if (!endpoint.ok() || self->handshake_mgr_ == nullptr) {
ExecCtx::Run(DEBUG_LOCATION, MutexLock lock(&self->mu_);
std::exchange(self->notify_, nullptr), if (self->notify_ != nullptr) {
GRPC_ERROR_CREATE("connect endpoint failed")); ExecCtx::Run(DEBUG_LOCATION,
std::exchange(self->notify_, nullptr),
GRPC_ERROR_CREATE("connect endpoint failed"));
}
return; return;
} }
auto* chaotic_good_ext = auto* chaotic_good_ext =
@ -240,9 +243,9 @@ void ChaoticGoodConnector::Connect(const Args& args, Result* result,
GRPC_ERROR_CREATE("connector shutdown")); GRPC_ERROR_CREATE("connector shutdown"));
return; return;
} }
notify_ = notify;
} }
args_ = args; args_ = args;
notify_ = notify;
resolved_addr_ = EventEngine::ResolvedAddress( resolved_addr_ = EventEngine::ResolvedAddress(
reinterpret_cast<const sockaddr*>(args_.address->addr), reinterpret_cast<const sockaddr*>(args_.address->addr),
args_.address->len); args_.address->len);
@ -253,11 +256,14 @@ void ChaoticGoodConnector::Connect(const Args& args, Result* result,
endpoint) mutable { endpoint) mutable {
ExecCtx exec_ctx; ExecCtx exec_ctx;
if (!endpoint.ok() || self->handshake_mgr_ == nullptr) { if (!endpoint.ok() || self->handshake_mgr_ == nullptr) {
auto endpoint_status = endpoint.status(); MutexLock lock(&self->mu_);
auto error = GRPC_ERROR_CREATE_REFERENCING("connect endpoint failed", if (self->notify_ != nullptr) {
&endpoint_status, 1); auto endpoint_status = endpoint.status();
ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr), auto error = GRPC_ERROR_CREATE_REFERENCING(
error); "connect endpoint failed", &endpoint_status, 1);
ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr),
error);
}
return; return;
} }
auto* chaotic_good_ext = auto* chaotic_good_ext =
@ -320,8 +326,8 @@ void ChaoticGoodConnector::OnHandshakeDone(
[self = RefAsSubclass<ChaoticGoodConnector>()](absl::Status status) { [self = RefAsSubclass<ChaoticGoodConnector>()](absl::Status status) {
GRPC_TRACE_LOG(chaotic_good, INFO) GRPC_TRACE_LOG(chaotic_good, INFO)
<< "ChaoticGoodConnector::OnHandshakeDone: " << status; << "ChaoticGoodConnector::OnHandshakeDone: " << status;
MutexLock lock(&self->mu_);
if (status.ok()) { if (status.ok()) {
MutexLock lock(&self->mu_);
self->result_->transport = new ChaoticGoodClientTransport( self->result_->transport = new ChaoticGoodClientTransport(
std::move(self->control_endpoint_), std::move(self->control_endpoint_),
std::move(self->data_endpoint_), self->args_.channel_args, std::move(self->data_endpoint_), self->args_.channel_args,

@ -83,7 +83,7 @@ class ChaoticGoodConnector : public SubchannelConnector {
Mutex mu_; Mutex mu_;
Args args_; Args args_;
Result* result_ ABSL_GUARDED_BY(mu_); Result* result_ ABSL_GUARDED_BY(mu_);
grpc_closure* notify_ = nullptr; grpc_closure* notify_ ABSL_GUARDED_BY(mu_) = nullptr;
bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false; bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
absl::StatusOr<grpc_event_engine::experimental::EventEngine::ResolvedAddress> absl::StatusOr<grpc_event_engine::experimental::EventEngine::ResolvedAddress>
resolved_addr_; resolved_addr_;

Loading…
Cancel
Save