Move client-side deadline handling to client_channel filter.

pull/7970/head
Mark D. Roth 8 years ago
parent fb475c93c4
commit 72f6da8bac
  1. 41
      src/core/ext/client_config/client_channel.c
  2. 244
      src/core/lib/channel/deadline_filter.c
  3. 41
      src/core/lib/channel/deadline_filter.h
  4. 3
      src/core/lib/surface/init.c

@ -45,6 +45,7 @@
#include "src/core/ext/client_config/subchannel.h" #include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
@ -377,6 +378,15 @@ typedef enum {
for initial metadata before trying to create a call object, for initial metadata before trying to create a call object,
and handling cancellation gracefully. */ and handling cancellation gracefully. */
typedef struct client_channel_call_data { typedef struct client_channel_call_data {
// State for handling deadlines.
// The code in deadline_filter.c requires this to be the first field.
// FIXME
// TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
// and this struct both independently store a pointer to the call
// stack and each has its own mutex. If/when we have time, find a way
// to avoid this without breaking either abstraction.
grpc_deadline_state deadline_state;
/** either 0 for no call, 1 for cancelled, or a pointer to a /** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */ grpc_subchannel_call */
gpr_atm subchannel_call; gpr_atm subchannel_call;
@ -465,7 +475,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
gpr_atm_no_barrier_store(&calld->subchannel_call, 1); gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
"Failed to create subchannel", &error, 1)); "Failed to create subchannel", &error, 1));
} else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) { } else if (GET_CALL(calld) == CANCELLED_CALL) {
/* already cancelled before subchannel became ready */ /* already cancelled before subchannel became ready */
fail_locked(exec_ctx, calld, fail_locked(exec_ctx, calld,
GRPC_ERROR_CREATE_REFERENCING( GRPC_ERROR_CREATE_REFERENCING(
@ -511,7 +521,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata, grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags, uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready); grpc_closure *on_ready, grpc_error *error);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
@ -522,7 +532,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
} else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags, cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) { cpa->connected_subchannel, cpa->on_ready,
GRPC_ERROR_NONE)) {
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
} }
gpr_free(cpa); gpr_free(cpa);
@ -532,7 +543,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata, grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags, uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) { grpc_closure *on_ready, grpc_error *error) {
GPR_TIMER_BEGIN("pick_subchannel", 0); GPR_TIMER_BEGIN("pick_subchannel", 0);
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
@ -554,7 +565,8 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (cpa->connected_subchannel == connected_subchannel) { if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL; cpa->connected_subchannel = NULL;
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, grpc_exec_ctx_sched(exec_ctx, cpa->on_ready,
GRPC_ERROR_CREATE("Pick cancelled"), NULL); GRPC_ERROR_CREATE_REFERENCING(
"Pick cancelled", &error, 1), NULL);
} }
} }
gpr_mu_unlock(&chand->mu); gpr_mu_unlock(&chand->mu);
@ -609,6 +621,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) { grpc_transport_stream_op *op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op); GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
/* try to (atomically) get the call */ /* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld); grpc_subchannel_call *call = GET_CALL(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
@ -645,20 +658,23 @@ retry:
if (op->cancel_error != GRPC_ERROR_NONE) { if (op->cancel_error != GRPC_ERROR_NONE) {
if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
(gpr_atm)(uintptr_t)CANCELLED_CALL)) { (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
gpr_log(GPR_INFO, "CANCELLED_CALL");
goto retry; goto retry;
} else { } else {
switch (calld->creation_phase) { switch (calld->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
gpr_log(GPR_INFO, "GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING");
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
break; break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
gpr_log(GPR_INFO, "GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL");
pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel,
NULL); NULL, GRPC_ERROR_REF(op->cancel_error));
break; break;
} }
gpr_mu_unlock(&calld->mu); gpr_mu_unlock(&calld->mu);
grpc_transport_stream_op_finish_with_failure(exec_ctx, op, grpc_transport_stream_op_finish_with_failure(
GRPC_ERROR_CANCELLED); exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0); GPR_TIMER_END("cc_start_transport_stream_op", 0);
return; return;
} }
@ -672,7 +688,8 @@ retry:
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags, op->send_initial_metadata_flags,
&calld->connected_subchannel, &calld->next_step)) { &calld->connected_subchannel, &calld->next_step,
GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
} }
@ -705,6 +722,11 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
grpc_call_element_args *args) { grpc_call_element_args *args) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
grpc_deadline_state_init(&calld->deadline_state, args->call_stack);
// FIXME: remove
calld->deadline_state.is_client = true;
gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_atm_rel_store(&calld->subchannel_call, 0);
gpr_mu_init(&calld->mu); gpr_mu_init(&calld->mu);
calld->connected_subchannel = NULL; calld->connected_subchannel = NULL;
@ -723,6 +745,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *and_free_memory) { void *and_free_memory) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state);
grpc_subchannel_call *call = GET_CALL(calld); grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) { if (call != NULL && call != CANCELLED_CALL) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");

@ -40,131 +40,107 @@
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
// Used for both client and server filters. //
typedef struct channel_data { // grpc_deadline_state
} channel_data; //
// Call data used for both client and server filter.
typedef struct base_call_data {
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
// Guards access to timer_pending and timer.
gpr_mu timer_mu;
// True if the timer callback is currently pending.
bool timer_pending;
// The deadline timer.
grpc_timer timer;
// Closure to invoke when the call is complete.
// We use this to cancel the timer.
grpc_closure on_complete;
// The original on_complete closure, which we chain to after our own
// closure is invoked.
grpc_closure* next_on_complete;
} base_call_data;
// Additional call data used only for the server filter.
typedef struct server_call_data {
base_call_data base; // Must be first.
// The closure for receiving initial metadata.
grpc_closure recv_initial_metadata_ready;
// Received initial metadata batch.
grpc_metadata_batch* recv_initial_metadata;
// The original recv_initial_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* next_recv_initial_metadata_ready;
} server_call_data;
// Constructor for channel_data. Used for both client and server filters.
static void init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
}
// Destructor for channel_data. Used for both client and server filters.
static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem) {
}
// Constructor for call_data. Used for both client and server filters.
static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_call_element_args* args) {
base_call_data* calld = elem->call_data;
// Note: size of call data is different between client and server.
memset(calld, 0, elem->filter->sizeof_call_data);
calld->call_stack = args->call_stack;
gpr_mu_init(&calld->timer_mu);
return GRPC_ERROR_NONE;
}
// Destructor for call_data. Used for both client and server filters.
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
const grpc_call_final_info* final_info,
void* and_free_memory) {
base_call_data* calld = elem->call_data;
gpr_mu_destroy(&calld->timer_mu);
}
// Timer callback. // Timer callback.
static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg, static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
grpc_call_element* elem = arg; grpc_call_element* elem = arg;
base_call_data* calld = elem->call_data; grpc_deadline_state* deadline_state = elem->call_data;
gpr_mu_lock(&calld->timer_mu); gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
calld->timer_pending = false; gpr_mu_lock(&deadline_state->timer_mu);
gpr_mu_unlock(&calld->timer_mu); deadline_state->timer_pending = false;
gpr_mu_unlock(&deadline_state->timer_mu);
if (error != GRPC_ERROR_CANCELLED) { if (error != GRPC_ERROR_CANCELLED) {
grpc_call_element_send_cancel(exec_ctx, elem); gpr_log(GPR_INFO, "DEADLINE_EXCEEDED");
// FIXME: change grpc_call_element_send_cancel_with_message to not call close
// grpc_call_element_send_cancel(exec_ctx, elem);
grpc_transport_stream_op op;
memset(&op, 0, sizeof(op));
op.cancel_error = grpc_error_set_int(
GRPC_ERROR_CREATE("Deadline Exceeded"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
elem->filter->start_transport_stream_op(exec_ctx, elem, &op);
} }
GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline_timer"); GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
} }
// Starts the deadline timer. // Starts the deadline timer.
static void start_timer_if_needed(grpc_exec_ctx *exec_ctx, static void start_timer_if_needed(grpc_exec_ctx *exec_ctx,
grpc_call_element* elem, grpc_call_element* elem,
gpr_timespec deadline) { gpr_timespec deadline) {
base_call_data* calld = elem->call_data; grpc_deadline_state* deadline_state = elem->call_data;
gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
// Take a reference to the call stack, to be owned by the timer. // Take a reference to the call stack, to be owned by the timer.
GRPC_CALL_STACK_REF(calld->call_stack, "deadline_timer"); GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
gpr_mu_lock(&calld->timer_mu); gpr_mu_lock(&deadline_state->timer_mu);
calld->timer_pending = true; gpr_log(GPR_INFO, "STARTING TIMER -- is_client=%d", deadline_state->is_client);
grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem, deadline_state->timer_pending = true;
gpr_now(GPR_CLOCK_MONOTONIC)); grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback,
gpr_mu_unlock(&calld->timer_mu); elem, gpr_now(GPR_CLOCK_MONOTONIC));
gpr_mu_unlock(&deadline_state->timer_mu);
} }
} }
// Cancels the deadline timer. // Cancels the deadline timer.
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
base_call_data* calld) { grpc_deadline_state* deadline_state) {
gpr_mu_lock(&calld->timer_mu); gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
if (calld->timer_pending) { gpr_mu_lock(&deadline_state->timer_mu);
grpc_timer_cancel(exec_ctx, &calld->timer); if (deadline_state->timer_pending) {
calld->timer_pending = false; gpr_log(GPR_INFO, "CANCELLING TIMER -- is_client=%d", deadline_state->is_client);
grpc_timer_cancel(exec_ctx, &deadline_state->timer);
deadline_state->timer_pending = false;
} }
gpr_mu_unlock(&calld->timer_mu); gpr_mu_unlock(&deadline_state->timer_mu);
} }
// Callback run when the call is complete. // Callback run when the call is complete.
static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
base_call_data* calld = arg; grpc_deadline_state* deadline_state = arg;
cancel_timer_if_needed(exec_ctx, calld); gpr_log(GPR_INFO, "==> %s(), is_client=%d, next_on_complete->cb=%p", __func__, deadline_state->is_client, deadline_state->next_on_complete->cb);
cancel_timer_if_needed(exec_ctx, deadline_state);
// Invoke the next callback. // Invoke the next callback.
calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error); deadline_state->next_on_complete->cb(
exec_ctx, deadline_state->next_on_complete->cb_arg, error);
} }
// Method for starting a call op for client filter. // Inject our own on_complete callback into op.
static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
grpc_call_element* elem, grpc_transport_stream_op* op) {
grpc_transport_stream_op* op) { gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
base_call_data* calld = elem->call_data; deadline_state->next_on_complete = op->on_complete;
// If the call is cancelled or closed, cancel the timer. grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
op->on_complete = &deadline_state->on_complete;
}
void grpc_deadline_state_init(grpc_deadline_state* deadline_state,
grpc_call_stack* call_stack) {
gpr_log(GPR_INFO, "==> %s()", __func__);
memset(deadline_state, 0, sizeof(*deadline_state));
deadline_state->call_stack = call_stack;
gpr_mu_init(&deadline_state->timer_mu);
}
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
cancel_timer_if_needed(exec_ctx, deadline_state);
gpr_mu_destroy(&deadline_state->timer_mu);
}
void grpc_deadline_state_client_start_transport_stream_op(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op* op) {
gpr_log(GPR_INFO, "==> %s(): op=%p {on_complete=%p, cancel_error=%p, close_error=%p, send_initial_metadata=%p, send_trailing_metadata=%p, send_message=%p, recv_initial_metadata_ready=%p, recv_trailing_metadata=%p}", __func__, op, op->on_complete, op->cancel_error, op->close_error, op->send_initial_metadata, op->send_trailing_metadata, op->send_message, op->recv_initial_metadata_ready, op->recv_trailing_metadata);
grpc_deadline_state* deadline_state = elem->call_data;
if (op->cancel_error != GRPC_ERROR_NONE || if (op->cancel_error != GRPC_ERROR_NONE ||
op->close_error != GRPC_ERROR_NONE) { op->close_error != GRPC_ERROR_NONE) {
cancel_timer_if_needed(exec_ctx, calld); cancel_timer_if_needed(exec_ctx, deadline_state);
} else { } else {
// If we're sending initial metadata, get the deadline from the metadata // If we're sending initial metadata, get the deadline from the metadata
// and start the timer if needed. // and start the timer if needed.
@ -175,11 +151,77 @@ static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
// Make sure we know when the call is complete, so that we can cancel // Make sure we know when the call is complete, so that we can cancel
// the timer. // the timer.
if (op->recv_trailing_metadata != NULL) { if (op->recv_trailing_metadata != NULL) {
calld->next_on_complete = op->on_complete; inject_on_complete_cb(deadline_state, op);
grpc_closure_init(&calld->on_complete, on_complete, calld);
op->on_complete = &calld->on_complete;
} }
} }
}
//
// filter code
//
// Used for both client and server filters.
typedef struct channel_data {
} channel_data;
// Constructor for channel_data. Used for both client and server filters.
static void init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
}
// Destructor for channel_data. Used for both client and server filters.
static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem) {
}
// Call data used for both client and server filter.
typedef struct base_call_data {
grpc_deadline_state deadline_state;
} base_call_data;
// Additional call data used only for the server filter.
typedef struct server_call_data {
base_call_data base; // Must be first.
// The closure for receiving initial metadata.
grpc_closure recv_initial_metadata_ready;
// Received initial metadata batch.
grpc_metadata_batch* recv_initial_metadata;
// The original recv_initial_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* next_recv_initial_metadata_ready;
} server_call_data;
// Constructor for call_data. Used for both client and server filters.
static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_call_element_args* args) {
gpr_log(GPR_INFO, "==> %s() -- call_data_size=%lu", __func__, elem->filter->sizeof_call_data);
base_call_data* calld = elem->call_data;
// Note: size of call data is different between client and server.
memset(calld, 0, elem->filter->sizeof_call_data);
grpc_deadline_state_init(&calld->deadline_state, args->call_stack);
calld->deadline_state.is_client = elem->filter->sizeof_call_data == sizeof(base_call_data);
return GRPC_ERROR_NONE;
}
// Destructor for call_data. Used for both client and server filters.
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
const grpc_call_final_info* final_info,
void* and_free_memory) {
base_call_data* calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state);
}
// Method for starting a call op for client filter.
static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
gpr_log(GPR_INFO, "==> %s()", __func__);
grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
// Chain to next filter. // Chain to next filter.
grpc_call_next_op(exec_ctx, elem, op); grpc_call_next_op(exec_ctx, elem, op);
} }
@ -201,11 +243,11 @@ static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem, grpc_call_element* elem,
grpc_transport_stream_op* op) { grpc_transport_stream_op* op) {
gpr_log(GPR_INFO, "==> %s(): op=%p {on_complete=%p, cancel_error=%p, close_error=%p, send_initial_metadata=%p, send_trailing_metadata=%p, send_message=%p, recv_initial_metadata_ready=%p, recv_trailing_metadata=%p}", __func__, op, op->on_complete, op->cancel_error, op->close_error, op->send_initial_metadata, op->send_trailing_metadata, op->send_message, op->recv_initial_metadata_ready, op->recv_trailing_metadata);
server_call_data* calld = elem->call_data; server_call_data* calld = elem->call_data;
// If the call is cancelled or closed, cancel the timer.
if (op->cancel_error != GRPC_ERROR_NONE || if (op->cancel_error != GRPC_ERROR_NONE ||
op->close_error != GRPC_ERROR_NONE) { op->close_error != GRPC_ERROR_NONE) {
cancel_timer_if_needed(exec_ctx, &calld->base); cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
} else { } else {
// If we're receiving initial metadata, we need to get the deadline // If we're receiving initial metadata, we need to get the deadline
// from the recv_initial_metadata_ready callback. So we inject our // from the recv_initial_metadata_ready callback. So we inject our
@ -223,9 +265,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
// the client never sends trailing metadata, because this is the // the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side. // hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata != NULL) { if (op->recv_trailing_metadata != NULL) {
calld->base.next_on_complete = op->on_complete; inject_on_complete_cb(&calld->base.deadline_state, op);
grpc_closure_init(&calld->base.on_complete, on_complete, calld);
op->on_complete = &calld->base.on_complete;
} }
} }
// Chain to next filter. // Chain to next filter.

@ -33,7 +33,48 @@
#define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H #define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/timer.h"
// State used for filters that enforce call deadlines.
// Should be the first field in the filter's call_data.
typedef struct grpc_deadline_state {
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
// Guards access to timer_pending and timer.
gpr_mu timer_mu;
// True if the timer callback is currently pending.
bool timer_pending;
// The deadline timer.
grpc_timer timer;
// Closure to invoke when the call is complete.
// We use this to cancel the timer.
grpc_closure on_complete;
// The original on_complete closure, which we chain to after our own
// closure is invoked.
grpc_closure* next_on_complete;
// FIXME: remove
bool is_client;
} grpc_deadline_state;
void grpc_deadline_state_init(grpc_deadline_state* call_data,
grpc_call_stack* call_stack);
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* call_data);
// To be used in a filter's start_transport_stream_op() method to
// enforce call deadlines.
// It is the caller's responsibility to chain to the next filter if
// necessary after this function returns.
// REQUIRES: The first field in elem is a grpc_deadline_state struct.
void grpc_deadline_state_client_start_transport_stream_op(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op* op);
// Deadline filters for direct client channels and server channels.
// Note: Deadlines for non-direct client channels are handled by the
// client_channel filter.
extern const grpc_channel_filter grpc_client_deadline_filter; extern const grpc_channel_filter grpc_client_deadline_filter;
extern const grpc_channel_filter grpc_server_deadline_filter; extern const grpc_channel_filter grpc_server_deadline_filter;

@ -98,9 +98,6 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder,
} }
static void register_builtin_channel_init() { static void register_builtin_channel_init() {
grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
prepend_filter, (void *)&grpc_client_deadline_filter);
grpc_channel_init_register_stage( grpc_channel_init_register_stage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
prepend_filter, (void *)&grpc_client_deadline_filter); prepend_filter, (void *)&grpc_client_deadline_filter);

Loading…
Cancel
Save