Fix tsan failure in retry timer cancellation code. (#25961)

* Fix tsan failure in retry timer cancellation code.

* fix build and clang-tidy
reviewable/pr25978/r1
Mark D. Roth 4 years ago committed by GitHub
parent 338349726d
commit fd3bd70939
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      src/core/ext/filters/client_channel/client_channel.cc
  2. 16
      src/core/ext/filters/client_channel/client_channel.h
  3. 90
      src/core/ext/filters/client_channel/retry_filter.cc

@ -344,7 +344,8 @@ class DynamicTerminationFilter::CallData {
calld->call_context_, calld->path_,
calld->call_start_time_, calld->deadline_,
calld->arena_, calld->call_combiner_};
calld->lb_call_ = client_channel->CreateLoadBalancedCall(args, pollent);
calld->lb_call_ =
client_channel->CreateLoadBalancedCall(args, pollent, nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
@ -1132,9 +1133,11 @@ ClientChannel::~ClientChannel() {
}
RefCountedPtr<ClientChannel::LoadBalancedCall>
ClientChannel::CreateLoadBalancedCall(const grpc_call_element_args& args,
grpc_polling_entity* pollent) {
return args.arena->New<LoadBalancedCall>(this, args, pollent);
ClientChannel::CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete) {
return args.arena->New<LoadBalancedCall>(this, args, pollent,
on_call_destruction_complete);
}
namespace {
@ -2480,7 +2483,7 @@ class ClientChannel::LoadBalancedCall::LbCallState
ClientChannel::LoadBalancedCall::LoadBalancedCall(
ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent)
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
? "LoadBalancedCall"
: nullptr),
@ -2492,7 +2495,8 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall(
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
call_context_(args.context),
pollent_(pollent) {}
pollent_(pollent),
on_call_destruction_complete_(on_call_destruction_complete) {}
ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
grpc_slice_unref_internal(path_);
@ -2506,6 +2510,10 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
GPR_ASSERT(pending_batches_[i] == nullptr);
}
if (on_call_destruction_complete_ != nullptr) {
ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
GRPC_ERROR_NONE);
}
}
size_t ClientChannel::LoadBalancedCall::GetBatchIndex(
@ -2774,6 +2782,10 @@ void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
"chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
this, subchannel_call_.get(), grpc_error_string(error));
}
if (on_call_destruction_complete_ != nullptr) {
subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
on_call_destruction_complete_ = nullptr;
}
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
PendingBatchesFail(error, YieldCallCombiner);
} else {

@ -133,7 +133,8 @@ class ClientChannel {
AsyncConnectivityStateWatcherInterface* watcher);
RefCountedPtr<LoadBalancedCall> CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent);
const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete);
private:
class CallData;
@ -356,8 +357,15 @@ class ClientChannel {
class ClientChannel::LoadBalancedCall
: public RefCounted<LoadBalancedCall, PolymorphicRefCount, kUnrefCallDtor> {
public:
// If on_call_destruction_complete is non-null, then it will be
// invoked once the LoadBalancedCall is completely destroyed.
// If it is null, then the caller is responsible for checking whether
// the LB call has a subchannel call and ensuring that the
// on_call_destruction_complete closure passed down from the surface
// is not invoked until after the subchannel call stack is destroyed.
LoadBalancedCall(ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent);
grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete);
~LoadBalancedCall() override;
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
@ -436,6 +444,8 @@ class ClientChannel::LoadBalancedCall
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
grpc_polling_entity* pollent_;
grpc_closure* on_call_destruction_complete_;
// Set when we get a cancel_stream op.
grpc_error* cancel_error_ = GRPC_ERROR_NONE;
@ -443,8 +453,6 @@ class ClientChannel::LoadBalancedCall
// Set when we fail inside the LB call.
grpc_error* failure_error_ = GRPC_ERROR_NONE;
grpc_polling_entity* pollent_ = nullptr;
grpc_closure pick_closure_;
// Accessed while holding ClientChannel::data_plane_mu_.

@ -201,6 +201,7 @@ class RetryFilter::CallData {
private:
class Canceller;
class CallStackDestructionBarrier;
// Pending batches stored in call data.
struct PendingBatch {
@ -456,6 +457,8 @@ class RetryFilter::CallData {
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;
// TODO(roth): As part of implementing hedging, we will need to maintain a
// list of all pending attempts, so that we can cancel them all if the call
// gets cancelled.
@ -525,6 +528,58 @@ class RetryFilter::CallData {
grpc_metadata_batch send_trailing_metadata_;
};
//
// RetryFilter::CallData::CallStackDestructionBarrier
//
// A class to track the existence of LoadBalancedCall call stacks that
// we've created. We wait until all such call stacks have been
// destroyed before we return the on_call_stack_destruction closure up
// to the surface.
//
// The parent RetryFilter::CallData object holds a ref to this object.
// When it is destroyed, it will store the on_call_stack_destruction
// closure from the surface in this object and then release its ref.
// We also take a ref to this object for each LB call we create, and
// those refs are not released until the LB call stack is destroyed.
// When this object is destroyed, it will invoke the
// on_call_stack_destruction closure from the surface.
class RetryFilter::CallData::CallStackDestructionBarrier
: public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount,
kUnrefCallDtor> {
public:
CallStackDestructionBarrier() {}
~CallStackDestructionBarrier() override {
// TODO(yashkt) : This can potentially be a Closure::Run
ExecCtx::Run(DEBUG_LOCATION, on_call_stack_destruction_, GRPC_ERROR_NONE);
}
// Set the closure from the surface. This closure will be invoked
// when this object is destroyed.
void set_on_call_stack_destruction(grpc_closure* on_call_stack_destruction) {
on_call_stack_destruction_ = on_call_stack_destruction;
}
// Invoked to get an on_call_stack_destruction closure for a new LB call.
grpc_closure* MakeLbCallDestructionClosure(CallData* calld) {
Ref().release(); // Ref held by callback.
grpc_closure* on_lb_call_destruction_complete =
calld->arena_->New<grpc_closure>();
GRPC_CLOSURE_INIT(on_lb_call_destruction_complete,
OnLbCallDestructionComplete, this, nullptr);
return on_lb_call_destruction_complete;
}
private:
static void OnLbCallDestructionComplete(void* arg, grpc_error* /*error*/) {
auto* self = static_cast<CallStackDestructionBarrier*>(arg);
self->Unref();
}
grpc_closure* on_call_stack_destruction_ = nullptr;
};
//
// RetryFilter::CallData::Canceller
//
@ -1631,24 +1686,17 @@ void RetryFilter::CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
auto* calld = static_cast<CallData*>(elem->call_data);
RefCountedPtr<SubchannelCall> subchannel_call;
// TODO(roth): As part of implementing hedging, the logic around
// then_schedule_closure may need to get more complex. We may
// need to have a separate then_schedule_closure for each LB call, and
// wait until all of those have been invoked before we invoke the
// then_schedule_closure passed in here.
if (GPR_LIKELY(calld->committed_call_ != nullptr)) {
subchannel_call = calld->committed_call_->subchannel_call();
} else if (GPR_LIKELY(calld->call_attempt_ != nullptr)) {
subchannel_call = calld->call_attempt_->lb_call()->subchannel_call();
}
// Save our ref to the CallStackDestructionBarrier until after our
// dtor is invoked.
RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier =
std::move(calld->call_stack_destruction_barrier_);
calld->~CallData();
if (GPR_LIKELY(subchannel_call != nullptr)) {
subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
} else {
// TODO(yashkt) : This can potentially be a Closure::Run
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
}
// Now set the callback in the CallStackDestructionBarrier object,
// right before we release our ref to it (implicitly upon returning).
// The callback will be invoked when the CallStackDestructionBarrier
// is destroyed.
call_stack_destruction_barrier->set_on_call_stack_destruction(
then_schedule_closure);
}
void RetryFilter::CallData::StartTransportStreamOpBatch(
@ -1701,6 +1749,8 @@ RetryFilter::CallData::CallData(RetryFilter* chand,
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
call_context_(args.context),
call_stack_destruction_barrier_(
arena_->New<CallStackDestructionBarrier>()),
pending_send_initial_metadata_(false),
pending_send_message_(false),
pending_send_trailing_metadata_(false),
@ -1788,7 +1838,11 @@ RetryFilter::CallData::CreateLoadBalancedCall() {
grpc_call_element_args args = {owning_call_, nullptr, call_context_,
path_, call_start_time_, deadline_,
arena_, call_combiner_};
return chand_->client_channel_->CreateLoadBalancedCall(args, pollent_);
return chand_->client_channel_->CreateLoadBalancedCall(
args, pollent_,
// This callback holds a ref to the CallStackDestructionBarrier
// object until the LB call is destroyed.
call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this));
}
void RetryFilter::CallData::CreateCallAttempt() {

Loading…
Cancel
Save