deadline filter: deadline_state no longer needs to be the first field in call data (#32265)

pull/31973/head^2
Mark D. Roth 2 years ago committed by GitHub
parent deb1e25543
commit 8ed8a3a054
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      src/core/ext/filters/client_channel/client_channel.cc
  2. 86
      src/core/ext/filters/deadline/deadline_filter.cc
  3. 10
      src/core/ext/filters/deadline/deadline_filter.h

@ -190,20 +190,16 @@ class ClientChannel::CallData {
void CreateDynamicCall(grpc_call_element* elem);
Arena* arena() const { return deadline_state_.arena; }
grpc_call_stack* owning_call() const { return deadline_state_.call_stack; }
CallCombiner* call_combiner() const { return deadline_state_.call_combiner; }
// State for handling deadlines.
// The code in deadline_filter.c requires this to be the first field.
// TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
// and this struct both independently store pointers to the call stack
// and call combiner. If/when we have time, find a way to avoid this
// without breaking the grpc_deadline_state abstraction.
grpc_deadline_state deadline_state_;
grpc_slice path_; // Request path.
gpr_cycle_counter call_start_time_;
Timestamp deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
grpc_polling_entity* pollent_ = nullptr;
@ -1828,9 +1824,6 @@ ClientChannel::CallData::CallData(grpc_call_element* elem,
path_(CSliceRef(args.path)),
call_start_time_(args.start_time),
deadline_(args.deadline),
arena_(args.arena),
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
call_context_(args.context) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: created call", &chand, this);
@ -1877,7 +1870,8 @@ void ClientChannel::CallData::StartTransportStreamOpBatch(
calld, grpc_transport_stream_op_batch_string(batch).c_str());
}
if (GPR_LIKELY(chand->deadline_checking_enabled_)) {
grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
grpc_deadline_state_client_start_transport_stream_op_batch(
&calld->deadline_state_, batch);
}
// Intercept recv_trailing_metadata to call CallDispatchController::Commit(),
// in case we wind up failing the call before we get down to the retry
@ -1912,7 +1906,7 @@ void ClientChannel::CallData::StartTransportStreamOpBatch(
}
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure(
batch, calld->cancel_error_, calld->call_combiner_);
batch, calld->cancel_error_, calld->call_combiner());
return;
}
// Handle cancellation.
@ -1931,7 +1925,7 @@ void ClientChannel::CallData::StartTransportStreamOpBatch(
calld->PendingBatchesFail(elem, calld->cancel_error_, NoYieldCallCombiner);
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure(
batch, calld->cancel_error_, calld->call_combiner_);
batch, calld->cancel_error_, calld->call_combiner());
return;
}
// Add the batch to the pending list.
@ -1954,7 +1948,7 @@ void ClientChannel::CallData::StartTransportStreamOpBatch(
"chand=%p calld=%p: saved batch, yielding call combiner", chand,
calld);
}
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
GRPC_CALL_COMBINER_STOP(calld->call_combiner(),
"batch does not include send_initial_metadata");
}
}
@ -2006,7 +2000,7 @@ void ClientChannel::CallData::FailPendingBatchInCallCombiner(
CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure(batch, error,
calld->call_combiner_);
calld->call_combiner());
}
// This is called via the call combiner, so access to calld is synchronized.
@ -2037,9 +2031,9 @@ void ClientChannel::CallData::PendingBatchesFail(
}
}
if (yield_call_combiner_predicate(closures)) {
closures.RunClosures(call_combiner_);
closures.RunClosures(call_combiner());
} else {
closures.RunClosuresWithoutYielding(call_combiner_);
closures.RunClosuresWithoutYielding(call_combiner());
}
}
@ -2082,7 +2076,7 @@ void ClientChannel::CallData::PendingBatchesResume(grpc_call_element* elem) {
}
}
// Note: This will release the call combiner.
closures.RunClosures(call_combiner_);
closures.RunClosures(call_combiner());
}
//
@ -2095,10 +2089,10 @@ class ClientChannel::CallData::ResolverQueuedCallCanceller {
public:
explicit ResolverQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) {
auto* calld = static_cast<CallData*>(elem->call_data);
GRPC_CALL_STACK_REF(calld->owning_call_, "ResolverQueuedCallCanceller");
GRPC_CALL_STACK_REF(calld->owning_call(), "ResolverQueuedCallCanceller");
GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
grpc_schedule_on_exec_ctx);
calld->call_combiner_->SetNotifyOnCancel(&closure_);
calld->call_combiner()->SetNotifyOnCancel(&closure_);
}
private:
@ -2123,7 +2117,7 @@ class ClientChannel::CallData::ResolverQueuedCallCanceller {
YieldCallCombinerIfPendingBatchesFound);
}
}
GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResolvingQueuedCallCanceller");
GRPC_CALL_STACK_UNREF(calld->owning_call(), "ResolvingQueuedCallCanceller");
delete self;
}
@ -2178,7 +2172,7 @@ grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
if (config_selector != nullptr) {
// Use the ConfigSelector to determine the config for the call.
auto call_config =
config_selector->GetCallConfig({&path_, initial_metadata, arena_});
config_selector->GetCallConfig({&path_, initial_metadata, arena()});
if (!call_config.ok()) {
return absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
call_config.status(), "ConfigSelector"));
@ -2189,7 +2183,7 @@ grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
// itself in the call context, so that it can be accessed by filters
// below us in the stack, and it will be cleaned up when the call ends.
auto* service_config_call_data =
arena_->New<ClientChannelServiceConfigCallData>(
arena()->New<ClientChannelServiceConfigCallData>(
std::move(call_config->service_config), call_config->method_configs,
std::move(call_config->call_attributes),
call_config->call_dispatch_controller, call_context_);
@ -2207,7 +2201,7 @@ grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
method_params->timeout();
if (per_method_deadline < deadline_) {
deadline_ = per_method_deadline;
grpc_deadline_state_reset(elem, deadline_);
grpc_deadline_state_reset(&deadline_state_, deadline_);
}
}
// If the service config set wait_for_ready and the application
@ -2367,9 +2361,9 @@ void ClientChannel::CallData::CreateDynamicCall(grpc_call_element* elem) {
path_,
call_start_time_,
deadline_,
arena_,
arena(),
call_context_,
call_combiner_};
call_combiner()};
grpc_error_handle error;
DynamicFilters* channel_stack = args.channel_stack.get();
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {

@ -51,9 +51,8 @@ namespace grpc_core {
// Allocated on the call arena.
class TimerState {
public:
TimerState(grpc_call_element* elem, Timestamp deadline) : elem_(elem) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem_->call_data);
TimerState(grpc_deadline_state* deadline_state, Timestamp deadline)
: deadline_state_(deadline_state) {
GRPC_CALL_STACK_REF(deadline_state->call_stack, "DeadlineTimerState");
GRPC_CLOSURE_INIT(&closure_, TimerCallback, this, nullptr);
grpc_timer_init(&timer_, deadline, &closure_);
@ -66,11 +65,10 @@ class TimerState {
// filter stack. Yields the call combiner when the batch returns.
static void YieldCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
TimerState* self = static_cast<TimerState*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(self->elem_->call_data);
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
GRPC_CALL_COMBINER_STOP(self->deadline_state_->call_combiner,
"got on_complete from cancel_stream batch");
GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "DeadlineTimerState");
GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack,
"DeadlineTimerState");
}
// This is called via the call combiner, so access to deadline_state is
@ -81,26 +79,26 @@ class TimerState {
GRPC_CLOSURE_INIT(&self->closure_, YieldCallCombiner, self, nullptr));
batch->cancel_stream = true;
batch->payload->cancel_stream.cancel_error = error;
self->elem_->filter->start_transport_stream_op_batch(self->elem_, batch);
grpc_call_element* elem = self->deadline_state_->elem;
elem->filter->start_transport_stream_op_batch(elem, batch);
}
// Timer callback.
static void TimerCallback(void* arg, grpc_error_handle error) {
TimerState* self = static_cast<TimerState*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(self->elem_->call_data);
if (error != absl::CancelledError()) {
error = grpc_error_set_int(GRPC_ERROR_CREATE("Deadline Exceeded"),
StatusIntProperty::kRpcStatus,
GRPC_STATUS_DEADLINE_EXCEEDED);
deadline_state->call_combiner->Cancel(error);
self->deadline_state_->call_combiner->Cancel(error);
GRPC_CLOSURE_INIT(&self->closure_, SendCancelOpInCallCombiner, self,
nullptr);
GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &self->closure_,
error,
GRPC_CALL_COMBINER_START(self->deadline_state_->call_combiner,
&self->closure_, error,
"deadline exceeded -- sending cancel_stream op");
} else {
GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "DeadlineTimerState");
GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack,
"DeadlineTimerState");
}
}
@ -111,7 +109,7 @@ class TimerState {
// need to call the dtor only after both (a) the timer callback
// finishes and (b) the filter sees the call completion and attempts
// to cancel the timer.
grpc_call_element* elem_;
grpc_deadline_state* deadline_state_;
grpc_timer timer_;
grpc_closure closure_;
};
@ -125,14 +123,13 @@ class TimerState {
// Starts the deadline timer.
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void start_timer_if_needed(grpc_call_element* elem,
static void start_timer_if_needed(grpc_deadline_state* deadline_state,
grpc_core::Timestamp deadline) {
if (deadline == grpc_core::Timestamp::InfFuture()) return;
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
GPR_ASSERT(deadline_state->timer_state == nullptr);
deadline_state->timer_state =
deadline_state->arena->New<grpc_core::TimerState>(elem, deadline);
deadline_state->arena->New<grpc_core::TimerState>(deadline_state,
deadline);
}
// Cancels the deadline timer.
@ -170,21 +167,22 @@ static void inject_recv_trailing_metadata_ready(
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
start_timer_after_init_state(grpc_call_element* elem,
start_timer_after_init_state(grpc_deadline_state* deadline_state,
grpc_core::Timestamp deadline)
: elem(elem), deadline(deadline) {}
~start_timer_after_init_state() { start_timer_if_needed(elem, deadline); }
: deadline_state(deadline_state), deadline(deadline) {}
~start_timer_after_init_state() {
start_timer_if_needed(deadline_state, deadline);
}
bool in_call_combiner = false;
grpc_call_element* elem;
grpc_deadline_state* deadline_state;
grpc_core::Timestamp deadline;
grpc_closure closure;
};
static void start_timer_after_init(void* arg, grpc_error_handle error) {
struct start_timer_after_init_state* state =
static_cast<struct start_timer_after_init_state*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(state->elem->call_data);
grpc_deadline_state* deadline_state = state->deadline_state;
if (!state->in_call_combiner) {
// We are initially called without holding the call combiner, so we
// need to bounce ourselves into it.
@ -201,7 +199,8 @@ static void start_timer_after_init(void* arg, grpc_error_handle error) {
grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
const grpc_call_element_args& args,
grpc_core::Timestamp deadline)
: call_stack(args.call_stack),
: elem(elem),
call_stack(args.call_stack),
call_combiner(args.call_combiner),
arena(args.arena) {
// Deadline will always be infinite on servers, so the timer will only be
@ -215,7 +214,7 @@ grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
struct start_timer_after_init_state* state =
new start_timer_after_init_state(elem, deadline);
new start_timer_after_init_state(this, deadline);
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, absl::OkStatus());
@ -224,18 +223,14 @@ grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
void grpc_deadline_state_reset(grpc_call_element* elem,
void grpc_deadline_state_reset(grpc_deadline_state* deadline_state,
grpc_core::Timestamp new_deadline) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
cancel_timer_if_needed(deadline_state);
start_timer_if_needed(elem, new_deadline);
start_timer_if_needed(deadline_state, new_deadline);
}
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
if (op->cancel_stream) {
cancel_timer_if_needed(deadline_state);
} else {
@ -261,14 +256,9 @@ static grpc_error_handle deadline_init_channel_elem(
// Destructor for channel_data. Used for both client and server filters.
static void deadline_destroy_channel_elem(grpc_channel_element* /*elem*/) {}
// Call data used for both client and server filter.
typedef struct base_call_data {
grpc_deadline_state deadline_state;
} base_call_data;
// Additional call data used only for the server filter.
typedef struct server_call_data {
base_call_data base; // Must be first.
struct server_call_data {
grpc_deadline_state deadline_state; // Must be first.
// The closure for receiving initial metadata.
grpc_closure recv_initial_metadata_ready;
// Received initial metadata batch.
@ -276,7 +266,7 @@ typedef struct server_call_data {
// The original recv_initial_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* next_recv_initial_metadata_ready;
} server_call_data;
};
// Constructor for call_data. Used for both client and server filters.
static grpc_error_handle deadline_init_call_elem(
@ -297,7 +287,8 @@ static void deadline_destroy_call_elem(
// Method for starting a call op for client filter.
static void deadline_client_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
grpc_deadline_state_client_start_transport_stream_op_batch(elem, op);
grpc_deadline_state_client_start_transport_stream_op_batch(
static_cast<grpc_deadline_state*>(elem->call_data), op);
// Chain to next filter.
grpc_call_next_op(elem, op);
}
@ -307,7 +298,8 @@ static void recv_initial_metadata_ready(void* arg, grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
start_timer_if_needed(
elem, calld->recv_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
&calld->deadline_state,
calld->recv_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
.value_or(grpc_core::Timestamp::InfFuture()));
// Invoke the next callback.
grpc_core::Closure::Run(DEBUG_LOCATION,
@ -319,7 +311,7 @@ static void deadline_server_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
if (op->cancel_stream) {
cancel_timer_if_needed(&calld->base.deadline_state);
cancel_timer_if_needed(&calld->deadline_state);
} else {
// If we're receiving initial metadata, we need to get the deadline
// from the recv_initial_metadata_ready callback. So we inject our
@ -341,7 +333,7 @@ static void deadline_server_start_transport_stream_op_batch(
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) {
inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
inject_recv_trailing_metadata_ready(&calld->deadline_state, op);
}
}
// Chain to next filter.
@ -355,7 +347,7 @@ const grpc_channel_filter grpc_client_deadline_filter = {
return next_promise_factory(std::move(call_args));
},
grpc_channel_next_op,
sizeof(base_call_data),
sizeof(grpc_deadline_state),
deadline_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
deadline_destroy_call_elem,

@ -41,6 +41,7 @@ struct grpc_deadline_state {
~grpc_deadline_state();
// We take a reference to the call stack for the timer callback.
grpc_call_element* elem;
grpc_call_stack* call_stack;
grpc_core::CallCombiner* call_combiner;
grpc_core::Arena* arena;
@ -53,11 +54,6 @@ struct grpc_deadline_state {
grpc_closure* original_recv_trailing_metadata_ready;
};
//
// NOTE: All of these functions require that the first field in
// elem->call_data is a grpc_deadline_state.
//
// Cancels the existing timer and starts a new one with new_deadline.
//
// Note: It is generally safe to call this with an earlier deadline
@ -67,7 +63,7 @@ struct grpc_deadline_state {
// deadline may result in the timer being called twice.
//
// Note: Must be called while holding the call combiner.
void grpc_deadline_state_reset(grpc_call_element* elem,
void grpc_deadline_state_reset(grpc_deadline_state* deadline_state,
grpc_core::Timestamp new_deadline);
// To be called from the client-side filter's start_transport_stream_op_batch()
@ -79,7 +75,7 @@ void grpc_deadline_state_reset(grpc_call_element* elem,
//
// Note: Must be called while holding the call combiner.
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op);
grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op);
// Should deadline checking be performed (according to channel args)
bool grpc_deadline_checking_enabled(const grpc_core::ChannelArgs& args);

Loading…
Cancel
Save