Add locking. Add cancellation check. Use grpc_call_element_send_cancel().

pull/7970/head
Mark D. Roth 8 years ago
parent d59a5fc9ee
commit 1bbe6cb143
  1. 105
      src/core/lib/channel/deadline_filter.c
  2. 4
      src/core/lib/transport/transport.c

@ -35,6 +35,7 @@
#include <string.h> #include <string.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
@ -47,6 +48,8 @@ typedef struct channel_data {
typedef struct base_call_data { typedef struct base_call_data {
// We take a reference to the call stack for the timer callback. // We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack; grpc_call_stack* call_stack;
// Guards access to timer_pending and timer.
gpr_mu timer_mu;
// True if the timer callback is currently pending. // True if the timer callback is currently pending.
bool timer_pending; bool timer_pending;
// The deadline timer. // The deadline timer.
@ -91,6 +94,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx,
// Note: size of call data is different between client and server. // Note: size of call data is different between client and server.
memset(calld, 0, elem->filter->sizeof_call_data); memset(calld, 0, elem->filter->sizeof_call_data);
calld->call_stack = args->call_stack; calld->call_stack = args->call_stack;
gpr_mu_init(&calld->timer_mu);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
@ -98,6 +102,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx,
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
const grpc_call_final_info* final_info, const grpc_call_final_info* final_info,
void* and_free_memory) { void* and_free_memory) {
base_call_data* calld = elem->call_data;
gpr_mu_destroy(&calld->timer_mu);
} }
// Timer callback. // Timer callback.
@ -105,13 +111,13 @@ static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
grpc_call_element* elem = arg; grpc_call_element* elem = arg;
base_call_data* calld = elem->call_data; base_call_data* calld = elem->call_data;
gpr_mu_lock(&calld->timer_mu);
calld->timer_pending = false; calld->timer_pending = false;
gpr_mu_unlock(&calld->timer_mu);
if (error != GRPC_ERROR_CANCELLED) { if (error != GRPC_ERROR_CANCELLED) {
gpr_slice message = gpr_slice_from_static_string("Deadline Exceeded"); grpc_call_element_send_cancel(exec_ctx, elem);
grpc_call_element_send_cancel_with_message(
exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &message);
} }
GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline"); GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline_timer");
} }
// Starts the deadline timer. // Starts the deadline timer.
@ -122,20 +128,30 @@ static void start_timer_if_needed(grpc_exec_ctx *exec_ctx,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
// Take a reference to the call stack, to be owned by the timer. // Take a reference to the call stack, to be owned by the timer.
GRPC_CALL_STACK_REF(calld->call_stack, "deadline"); GRPC_CALL_STACK_REF(calld->call_stack, "deadline_timer");
gpr_mu_lock(&calld->timer_mu);
calld->timer_pending = true;
grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem, grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem,
gpr_now(GPR_CLOCK_MONOTONIC)); gpr_now(GPR_CLOCK_MONOTONIC));
calld->timer_pending = true; gpr_mu_unlock(&calld->timer_mu);
} }
} }
// Callback run when the call is complete. // Cancels the deadline timer.
static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
base_call_data* calld = arg; base_call_data* calld) {
gpr_mu_lock(&calld->timer_mu);
if (calld->timer_pending) { if (calld->timer_pending) {
grpc_timer_cancel(exec_ctx, &calld->timer); grpc_timer_cancel(exec_ctx, &calld->timer);
calld->timer_pending = false; calld->timer_pending = false;
} }
gpr_mu_unlock(&calld->timer_mu);
}
// Callback run when the call is complete.
static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
base_call_data* calld = arg;
cancel_timer_if_needed(exec_ctx, calld);
// Invoke the next callback. // Invoke the next callback.
calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error); calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error);
} }
@ -145,18 +161,24 @@ static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem, grpc_call_element* elem,
grpc_transport_stream_op* op) { grpc_transport_stream_op* op) {
base_call_data* calld = elem->call_data; base_call_data* calld = elem->call_data;
// If we're sending initial metadata, get the deadline from the metadata // If the call is cancelled or closed, cancel the timer.
// and start the timer if needed. if (op->cancel_error != GRPC_ERROR_NONE ||
if (op->send_initial_metadata != NULL) { op->close_error != GRPC_ERROR_NONE) {
start_timer_if_needed(exec_ctx, elem, cancel_timer_if_needed(exec_ctx, calld);
op->send_initial_metadata->deadline); } else {
} // If we're sending initial metadata, get the deadline from the metadata
// Make sure we know when the call is complete, so that we can cancel // and start the timer if needed.
// the timer. if (op->send_initial_metadata != NULL) {
if (op->recv_trailing_metadata != NULL) { start_timer_if_needed(exec_ctx, elem,
calld->next_on_complete = op->on_complete; op->send_initial_metadata->deadline);
grpc_closure_init(&calld->on_complete, on_complete, calld); }
op->on_complete = &calld->on_complete; // Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata != NULL) {
calld->next_on_complete = op->on_complete;
grpc_closure_init(&calld->on_complete, on_complete, calld);
op->on_complete = &calld->on_complete;
}
} }
// Chain to next filter. // Chain to next filter.
grpc_call_next_op(exec_ctx, elem, op); grpc_call_next_op(exec_ctx, elem, op);
@ -180,22 +202,31 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem, grpc_call_element* elem,
grpc_transport_stream_op* op) { grpc_transport_stream_op* op) {
server_call_data* calld = elem->call_data; server_call_data* calld = elem->call_data;
// If we're receiving initial metadata, we need to get the deadline // If the call is cancelled or closed, cancel the timer.
// from the recv_initial_metadata_ready callback. So we inject our if (op->cancel_error != GRPC_ERROR_NONE ||
// own callback into that hook. op->close_error != GRPC_ERROR_NONE) {
if (op->recv_initial_metadata_ready != NULL) { cancel_timer_if_needed(exec_ctx, &calld->base);
calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready; } else {
calld->recv_initial_metadata = op->recv_initial_metadata; // If we're receiving initial metadata, we need to get the deadline
grpc_closure_init(&calld->recv_initial_metadata_ready, // from the recv_initial_metadata_ready callback. So we inject our
recv_initial_metadata_ready, elem); // own callback into that hook.
op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; if (op->recv_initial_metadata_ready != NULL) {
} calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
// Make sure we know when the call is complete, so that we can cancel calld->recv_initial_metadata = op->recv_initial_metadata;
// the timer. grpc_closure_init(&calld->recv_initial_metadata_ready,
if (op->send_trailing_metadata != NULL) { recv_initial_metadata_ready, elem);
calld->base.next_on_complete = op->on_complete; op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
grpc_closure_init(&calld->base.on_complete, on_complete, calld); }
op->on_complete = &calld->base.on_complete; // Make sure we know when the call is complete, so that we can cancel
// the timer.
// Note that we trigger this on recv_trailing_metadata, even though
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata != NULL) {
calld->base.next_on_complete = op->on_complete;
grpc_closure_init(&calld->base.on_complete, on_complete, calld);
op->on_complete = &calld->base.on_complete;
}
} }
// Chain to next filter. // Chain to next filter.
grpc_call_next_op(exec_ctx, elem, op); grpc_call_next_op(exec_ctx, elem, op);

@ -220,6 +220,10 @@ void grpc_transport_stream_op_add_cancellation_with_message(
error = GRPC_ERROR_CREATE("Call cancelled"); error = GRPC_ERROR_CREATE("Call cancelled");
} }
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
// TODO(ctiller): We are intentionally setting close_error instead of
// cancel_error here. This is an ugly hack and should be replaced
// by a more general-purpose mechanism that allows us to control
// cancel/close behavior.
add_error(op, &op->close_error, error); add_error(op, &op->close_error, error);
} }

Loading…
Cancel
Save