Merge pull request #24575 from markdroth/deadline_tsan_fix

Create a separate object for each deadline timer.
pull/24585/head
Mark D. Roth 4 years ago committed by GitHub
commit 96dc42e2a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/filters/client_channel/client_channel.cc
  2. 161
      src/core/ext/filters/deadline/deadline_filter.cc
  3. 18
      src/core/ext/filters/deadline/deadline_filter.h

@ -2141,7 +2141,7 @@ void ChannelData::RemoveConnectivityWatcher(
CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
const grpc_call_element_args& args)
: deadline_state_(elem, args.call_stack, args.call_combiner,
: deadline_state_(elem, args,
GPR_LIKELY(chand.deadline_checking_enabled())
? args.deadline
: GRPC_MILLIS_INF_FUTURE),

@ -32,53 +32,83 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel_init.h"
//
// grpc_deadline_state
//
namespace grpc_core {
// A fire-and-forget class representing a pending deadline timer.
// Allocated on the call arena.
class TimerState {
public:
TimerState(grpc_call_element* elem, grpc_millis deadline) : elem_(elem) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem_->call_data);
GRPC_CALL_STACK_REF(deadline_state->call_stack, "DeadlineTimerState");
GRPC_CLOSURE_INIT(&closure_, TimerCallback, this, nullptr);
grpc_timer_init(&timer_, deadline, &closure_);
}
// The on_complete callback used when sending a cancel_error batch down the
// filter stack. Yields the call combiner when the batch returns.
static void yield_call_combiner(void* arg, grpc_error* /*ignored*/) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
"got on_complete from cancel_stream batch");
GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer");
}
void Cancel() { grpc_timer_cancel(&timer_); }
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void send_cancel_op_in_call_combiner(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
GRPC_CLOSURE_INIT(&deadline_state->timer_callback, yield_call_combiner,
deadline_state, grpc_schedule_on_exec_ctx));
batch->cancel_stream = true;
batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
elem->filter->start_transport_stream_op_batch(elem, batch);
}
private:
~TimerState() {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem_->call_data);
GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "DeadlineTimerState");
}
// Timer callback.
static void timer_callback(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
if (error != GRPC_ERROR_CANCELLED) {
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
deadline_state->call_combiner->Cancel(GRPC_ERROR_REF(error));
GRPC_CLOSURE_INIT(&deadline_state->timer_callback,
send_cancel_op_in_call_combiner, elem,
grpc_schedule_on_exec_ctx);
GRPC_CALL_COMBINER_START(deadline_state->call_combiner,
&deadline_state->timer_callback, error,
"deadline exceeded -- sending cancel_stream op");
} else {
GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer");
// The on_complete callback used when sending a cancel_error batch down the
// filter stack. Yields the call combiner when the batch returns.
static void YieldCallCombiner(void* arg, grpc_error* /*ignored*/) {
TimerState* self = static_cast<TimerState*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(self->elem_->call_data);
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
"got on_complete from cancel_stream batch");
// Allocated on call arena, so not deleting, but do want to call the dtor.
self->~TimerState();
}
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void SendCancelOpInCallCombiner(void* arg, grpc_error* error) {
TimerState* self = static_cast<TimerState*>(arg);
grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
GRPC_CLOSURE_INIT(&self->closure_, YieldCallCombiner, self, nullptr));
batch->cancel_stream = true;
batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
self->elem_->filter->start_transport_stream_op_batch(self->elem_, batch);
}
}
// Timer callback.
static void TimerCallback(void* arg, grpc_error* error) {
TimerState* self = static_cast<TimerState*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(self->elem_->call_data);
if (error != GRPC_ERROR_CANCELLED) {
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
deadline_state->call_combiner->Cancel(GRPC_ERROR_REF(error));
GRPC_CLOSURE_INIT(&self->closure_, SendCancelOpInCallCombiner, self,
nullptr);
GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &self->closure_,
error,
"deadline exceeded -- sending cancel_stream op");
} else {
// Allocated on call arena, so not deleting, but do want to call the dtor.
self->~TimerState();
}
}
grpc_call_element* elem_;
grpc_timer timer_;
grpc_closure closure_;
};
} // namespace grpc_core
//
// grpc_deadline_state
//
// Starts the deadline timer.
// This is called via the call combiner, so access to deadline_state is
@ -90,41 +120,18 @@ static void start_timer_if_needed(grpc_call_element* elem,
}
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
grpc_closure* closure = nullptr;
switch (deadline_state->timer_state) {
case GRPC_DEADLINE_STATE_PENDING:
// Note: We do not start the timer if there is already a timer
return;
case GRPC_DEADLINE_STATE_FINISHED:
deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
// If we've already created and destroyed a timer, we always create a
// new closure: we have no other guarantee that the inlined closure is
// not in use (it may hold a pending call to timer_callback)
closure =
GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx);
break;
case GRPC_DEADLINE_STATE_INITIAL:
deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
closure =
GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
elem, grpc_schedule_on_exec_ctx);
break;
}
GPR_ASSERT(closure != nullptr);
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
grpc_timer_init(&deadline_state->timer, deadline, closure);
GPR_ASSERT(deadline_state->timer_state == nullptr);
deadline_state->timer_state =
deadline_state->arena->New<grpc_core::TimerState>(elem, deadline);
}
// Cancels the deadline timer.
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) {
deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED;
grpc_timer_cancel(&deadline_state->timer);
} else {
// timer was either in STATE_INITIAL (nothing to cancel)
// OR in STATE_FINISHED (again nothing to cancel)
if (deadline_state->timer_state != nullptr) {
deadline_state->timer_state->Cancel();
deadline_state->timer_state = nullptr;
}
}
@ -182,10 +189,11 @@ static void start_timer_after_init(void* arg, grpc_error* error) {
}
grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
grpc_call_stack* call_stack,
grpc_core::CallCombiner* call_combiner,
const grpc_call_element_args& args,
grpc_millis deadline)
: call_stack(call_stack), call_combiner(call_combiner) {
: call_stack(args.call_stack),
call_combiner(args.call_combiner),
arena(args.arena) {
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
if (deadline != GRPC_MILLIS_INF_FUTURE) {
@ -263,8 +271,7 @@ typedef struct server_call_data {
// Constructor for call_data. Used for both client and server filters.
static grpc_error* deadline_init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) grpc_deadline_state(
elem, args->call_stack, args->call_combiner, args->deadline);
new (elem->call_data) grpc_deadline_state(elem, *args, args->deadline);
return GRPC_ERROR_NONE;
}

@ -22,26 +22,22 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/timer.h"
enum grpc_deadline_timer_state {
GRPC_DEADLINE_STATE_INITIAL,
GRPC_DEADLINE_STATE_PENDING,
GRPC_DEADLINE_STATE_FINISHED
};
namespace grpc_core {
class TimerState;
} // namespace grpc_core
// State used for filters that enforce call deadlines.
// Must be the first field in the filter's call_data.
struct grpc_deadline_state {
grpc_deadline_state(grpc_call_element* elem, grpc_call_stack* call_stack,
grpc_core::CallCombiner* call_combiner,
grpc_millis deadline);
grpc_deadline_state(grpc_call_element* elem,
const grpc_call_element_args& args, grpc_millis deadline);
~grpc_deadline_state();
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
grpc_core::CallCombiner* call_combiner;
grpc_deadline_timer_state timer_state = GRPC_DEADLINE_STATE_INITIAL;
grpc_timer timer;
grpc_closure timer_callback;
grpc_core::Arena* arena;
grpc_core::TimerState* timer_state = nullptr;
// Closure to invoke when we receive trailing metadata.
// We use this to cancel the timer.
grpc_closure recv_trailing_metadata_ready;

Loading…
Cancel
Save