|
|
|
@ -45,9 +45,18 @@ typedef struct channel_data { |
|
|
|
|
|
|
|
|
|
// Call data used for both client and server filter.
|
|
|
|
|
typedef struct base_call_data { |
|
|
|
|
// We take a reference to the call stack for the timer callback.
|
|
|
|
|
grpc_call_stack* call_stack; |
|
|
|
|
// True if the timer callback is currently pending.
|
|
|
|
|
bool timer_pending; |
|
|
|
|
// The deadline timer.
|
|
|
|
|
grpc_timer timer; |
|
|
|
|
// Closure to invoke when the call is complete.
|
|
|
|
|
// We use this to cancel the timer.
|
|
|
|
|
grpc_closure on_complete; |
|
|
|
|
// The original on_complete closure, which we chain to after our own
|
|
|
|
|
// closure is invoked.
|
|
|
|
|
grpc_closure* next_on_complete; |
|
|
|
|
} base_call_data; |
|
|
|
|
|
|
|
|
|
// Additional call data used only for the server filter.
|
|
|
|
@ -89,16 +98,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx, |
|
|
|
|
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, |
|
|
|
|
const grpc_call_final_info* final_info, |
|
|
|
|
void* and_free_memory) { |
|
|
|
|
base_call_data* calld = elem->call_data; |
|
|
|
|
gpr_log(GPR_INFO, "==> destroy_call_elem()"); |
|
|
|
|
// FIXME: this is not working -- timer holds a ref, so we won't get
|
|
|
|
|
// called until after timer pops
|
|
|
|
|
if (calld->timer_pending) |
|
|
|
|
{ |
|
|
|
|
gpr_log(GPR_INFO, "CANCELLING TIMER"); |
|
|
|
|
grpc_timer_cancel(exec_ctx, &calld->timer); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Timer callback.
|
|
|
|
@ -108,13 +107,10 @@ static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
base_call_data* calld = elem->call_data; |
|
|
|
|
calld->timer_pending = false; |
|
|
|
|
if (error != GRPC_ERROR_CANCELLED) { |
|
|
|
|
gpr_log(GPR_INFO, "DEADLINE EXCEEDED"); |
|
|
|
|
gpr_slice message = gpr_slice_from_static_string("Deadline Exceeded"); |
|
|
|
|
grpc_call_element_send_cancel_with_message( |
|
|
|
|
exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &message); |
|
|
|
|
} |
|
|
|
|
else gpr_log(GPR_INFO, "TIMER CANCELLED"); |
|
|
|
|
gpr_log(GPR_INFO, "UNREF"); |
|
|
|
|
GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -126,7 +122,6 @@ static void start_timer_if_needed(grpc_exec_ctx *exec_ctx, |
|
|
|
|
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); |
|
|
|
|
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { |
|
|
|
|
// Take a reference to the call stack, to be owned by the timer.
|
|
|
|
|
gpr_log(GPR_INFO, "REF"); |
|
|
|
|
GRPC_CALL_STACK_REF(calld->call_stack, "deadline"); |
|
|
|
|
grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem, |
|
|
|
|
gpr_now(GPR_CLOCK_MONOTONIC)); |
|
|
|
@ -134,16 +129,35 @@ gpr_log(GPR_INFO, "REF"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Callback run when the call is complete.
|
|
|
|
|
static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
|
base_call_data* calld = arg; |
|
|
|
|
if (calld->timer_pending) { |
|
|
|
|
grpc_timer_cancel(exec_ctx, &calld->timer); |
|
|
|
|
calld->timer_pending = false; |
|
|
|
|
} |
|
|
|
|
// Invoke the next callback.
|
|
|
|
|
calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Method for starting a call op for client filter.
|
|
|
|
|
static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem, |
|
|
|
|
grpc_transport_stream_op* op) { |
|
|
|
|
base_call_data* calld = elem->call_data; |
|
|
|
|
// If we're sending initial metadata, get the deadline from the metadata
|
|
|
|
|
// and start the timer if needed.
|
|
|
|
|
if (op->send_initial_metadata != NULL) { |
|
|
|
|
start_timer_if_needed(exec_ctx, elem, |
|
|
|
|
op->send_initial_metadata->deadline); |
|
|
|
|
} |
|
|
|
|
// Make sure we know when the call is complete, so that we can cancel
|
|
|
|
|
// the timer.
|
|
|
|
|
if (op->recv_trailing_metadata != NULL) { |
|
|
|
|
calld->next_on_complete = op->on_complete; |
|
|
|
|
grpc_closure_init(&calld->on_complete, on_complete, calld); |
|
|
|
|
op->on_complete = &calld->on_complete; |
|
|
|
|
} |
|
|
|
|
// Chain to next filter.
|
|
|
|
|
grpc_call_next_op(exec_ctx, elem, op); |
|
|
|
|
} |
|
|
|
@ -176,6 +190,13 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, |
|
|
|
|
recv_initial_metadata_ready, elem); |
|
|
|
|
op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; |
|
|
|
|
} |
|
|
|
|
// Make sure we know when the call is complete, so that we can cancel
|
|
|
|
|
// the timer.
|
|
|
|
|
if (op->send_trailing_metadata != NULL) { |
|
|
|
|
calld->base.next_on_complete = op->on_complete; |
|
|
|
|
grpc_closure_init(&calld->base.on_complete, on_complete, calld); |
|
|
|
|
op->on_complete = &calld->base.on_complete; |
|
|
|
|
} |
|
|
|
|
// Chain to next filter.
|
|
|
|
|
grpc_call_next_op(exec_ctx, elem, op); |
|
|
|
|
} |
|
|
|
|