diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 61e012578e0..bbae37d7af8 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -45,6 +45,7 @@ #include "src/core/ext/client_config/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/deadline_filter.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -377,6 +378,15 @@ typedef enum { for initial metadata before trying to create a call object, and handling cancellation gracefully. */ typedef struct client_channel_call_data { + // State for handling deadlines. + // The code in deadline_filter.c requires this to be the first field. +// FIXME + // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state + // and this struct both independently store a pointer to the call + // stack and each has its own mutex. If/when we have time, find a way + // to avoid this without breaking either abstraction. + grpc_deadline_state deadline_state; + /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; @@ -465,7 +475,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, gpr_atm_no_barrier_store(&calld->subchannel_call, 1); fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( "Failed to create subchannel", &error, 1)); - } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) { + } else if (GET_CALL(calld) == CANCELLED_CALL) { /* already cancelled before subchannel became ready */ fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( @@ -511,7 +521,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready); + grpc_closure *on_ready, grpc_error *error); static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -522,7 +532,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready)) { + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); @@ -532,7 +543,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready) { + grpc_closure *on_ready, grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -554,7 +565,8 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE("Pick cancelled"), NULL); + GRPC_ERROR_CREATE_REFERENCING( + "Pick cancelled", &error, 1), NULL); } } gpr_mu_unlock(&chand->mu); @@ -609,6 +621,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); /* try to (atomically) get the call */ grpc_subchannel_call *call = GET_CALL(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); @@ -645,20 +658,23 @@ retry: if (op->cancel_error != GRPC_ERROR_NONE) { if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, (gpr_atm)(uintptr_t)CANCELLED_CALL)) { +gpr_log(GPR_INFO, "CANCELLED_CALL"); goto retry; } else { switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: +gpr_log(GPR_INFO, "GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING"); fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: +gpr_log(GPR_INFO, "GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL"); pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL); + NULL, GRPC_ERROR_REF(op->cancel_error)); break; } gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -672,7 +688,8 @@ retry: GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step)) { + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } @@ -705,6 +722,11 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { call_data *calld = elem->call_data; + grpc_deadline_state_init(&calld->deadline_state, args->call_stack); + +// FIXME: remove +calld->deadline_state.is_client = true; + gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_mu_init(&calld->mu); calld->connected_subchannel = NULL; @@ -723,6 +745,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_final_info *final_info, void *and_free_memory) { call_data *calld = elem->call_data; + grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 9dff1c4d636..e4f132624fe 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -40,131 +40,107 @@ #include "src/core/lib/iomgr/timer.h" -// Used for both client and server filters. -typedef struct channel_data { -} channel_data; - -// Call data used for both client and server filter. -typedef struct base_call_data { - // We take a reference to the call stack for the timer callback. - grpc_call_stack* call_stack; - // Guards access to timer_pending and timer. - gpr_mu timer_mu; - // True if the timer callback is currently pending. - bool timer_pending; - // The deadline timer. - grpc_timer timer; - // Closure to invoke when the call is complete. - // We use this to cancel the timer. - grpc_closure on_complete; - // The original on_complete closure, which we chain to after our own - // closure is invoked. - grpc_closure* next_on_complete; -} base_call_data; - -// Additional call data used only for the server filter. -typedef struct server_call_data { - base_call_data base; // Must be first. - // The closure for receiving initial metadata. - grpc_closure recv_initial_metadata_ready; - // Received initial metadata batch. - grpc_metadata_batch* recv_initial_metadata; - // The original recv_initial_metadata_ready closure, which we chain to - // after our own closure is invoked. - grpc_closure* next_recv_initial_metadata_ready; -} server_call_data; - -// Constructor for channel_data. Used for both client and server filters. -static void init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, - grpc_channel_element_args* args) { - GPR_ASSERT(!args->is_last); -} - -// Destructor for channel_data. Used for both client and server filters. -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) { -} - -// Constructor for call_data. Used for both client and server filters. -static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_call_element_args* args) { - base_call_data* calld = elem->call_data; - // Note: size of call data is different between client and server. - memset(calld, 0, elem->filter->sizeof_call_data); - calld->call_stack = args->call_stack; - gpr_mu_init(&calld->timer_mu); - return GRPC_ERROR_NONE; -} - -// Destructor for call_data. Used for both client and server filters. -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - const grpc_call_final_info* final_info, - void* and_free_memory) { - base_call_data* calld = elem->call_data; - gpr_mu_destroy(&calld->timer_mu); -} +// +// grpc_deadline_state +// // Timer callback. static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element* elem = arg; - base_call_data* calld = elem->call_data; - gpr_mu_lock(&calld->timer_mu); - calld->timer_pending = false; - gpr_mu_unlock(&calld->timer_mu); + grpc_deadline_state* deadline_state = elem->call_data; +gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client); + gpr_mu_lock(&deadline_state->timer_mu); + deadline_state->timer_pending = false; + gpr_mu_unlock(&deadline_state->timer_mu); if (error != GRPC_ERROR_CANCELLED) { - grpc_call_element_send_cancel(exec_ctx, elem); +gpr_log(GPR_INFO, "DEADLINE_EXCEEDED"); +// FIXME: change grpc_call_element_send_cancel_with_message to not call close +// grpc_call_element_send_cancel(exec_ctx, elem); + grpc_transport_stream_op op; + memset(&op, 0, sizeof(op)); + op.cancel_error = grpc_error_set_int( + GRPC_ERROR_CREATE("Deadline Exceeded"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED); + elem->filter->start_transport_stream_op(exec_ctx, elem, &op); } - GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline_timer"); + GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); } // Starts the deadline timer. static void start_timer_if_needed(grpc_exec_ctx *exec_ctx, grpc_call_element* elem, gpr_timespec deadline) { - base_call_data* calld = elem->call_data; + grpc_deadline_state* deadline_state = elem->call_data; +gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // Take a reference to the call stack, to be owned by the timer. - GRPC_CALL_STACK_REF(calld->call_stack, "deadline_timer"); - gpr_mu_lock(&calld->timer_mu); - calld->timer_pending = true; - grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem, - gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(&calld->timer_mu); + GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); + gpr_mu_lock(&deadline_state->timer_mu); +gpr_log(GPR_INFO, "STARTING TIMER -- is_client=%d", deadline_state->is_client); + deadline_state->timer_pending = true; + grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback, + elem, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(&deadline_state->timer_mu); } } // Cancels the deadline timer. static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, - base_call_data* calld) { - gpr_mu_lock(&calld->timer_mu); - if (calld->timer_pending) { - grpc_timer_cancel(exec_ctx, &calld->timer); - calld->timer_pending = false; + grpc_deadline_state* deadline_state) { +gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client); + gpr_mu_lock(&deadline_state->timer_mu); + if (deadline_state->timer_pending) { +gpr_log(GPR_INFO, "CANCELLING TIMER -- is_client=%d", deadline_state->is_client); + grpc_timer_cancel(exec_ctx, &deadline_state->timer); + deadline_state->timer_pending = false; } - gpr_mu_unlock(&calld->timer_mu); + gpr_mu_unlock(&deadline_state->timer_mu); } // Callback run when the call is complete. static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - base_call_data* calld = arg; - cancel_timer_if_needed(exec_ctx, calld); + grpc_deadline_state* deadline_state = arg; +gpr_log(GPR_INFO, "==> %s(), is_client=%d, next_on_complete->cb=%p", __func__, deadline_state->is_client, deadline_state->next_on_complete->cb); + cancel_timer_if_needed(exec_ctx, deadline_state); // Invoke the next callback. - calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error); + deadline_state->next_on_complete->cb( + exec_ctx, deadline_state->next_on_complete->cb_arg, error); } -// Method for starting a call op for client filter. -static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_transport_stream_op* op) { - base_call_data* calld = elem->call_data; - // If the call is cancelled or closed, cancel the timer. +// Inject our own on_complete callback into op. +static void inject_on_complete_cb(grpc_deadline_state* deadline_state, + grpc_transport_stream_op* op) { +gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client); + deadline_state->next_on_complete = op->on_complete; + grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state); + op->on_complete = &deadline_state->on_complete; +} + +void grpc_deadline_state_init(grpc_deadline_state* deadline_state, + grpc_call_stack* call_stack) { +gpr_log(GPR_INFO, "==> %s()", __func__); + memset(deadline_state, 0, sizeof(*deadline_state)); + deadline_state->call_stack = call_stack; + gpr_mu_init(&deadline_state->timer_mu); +} + +void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, + grpc_deadline_state* deadline_state) { +gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client); + cancel_timer_if_needed(exec_ctx, deadline_state); + gpr_mu_destroy(&deadline_state->timer_mu); +} + +void grpc_deadline_state_client_start_transport_stream_op( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op* op) { +gpr_log(GPR_INFO, "==> %s(): op=%p {on_complete=%p, cancel_error=%p, close_error=%p, send_initial_metadata=%p, send_trailing_metadata=%p, send_message=%p, recv_initial_metadata_ready=%p, recv_trailing_metadata=%p}", __func__, op, op->on_complete, op->cancel_error, op->close_error, op->send_initial_metadata, op->send_trailing_metadata, op->send_message, op->recv_initial_metadata_ready, op->recv_trailing_metadata); + grpc_deadline_state* deadline_state = elem->call_data; if (op->cancel_error != GRPC_ERROR_NONE || op->close_error != GRPC_ERROR_NONE) { - cancel_timer_if_needed(exec_ctx, calld); + cancel_timer_if_needed(exec_ctx, deadline_state); } else { // If we're sending initial metadata, get the deadline from the metadata // and start the timer if needed. @@ -175,11 +151,77 @@ static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, // Make sure we know when the call is complete, so that we can cancel // the timer. if (op->recv_trailing_metadata != NULL) { - calld->next_on_complete = op->on_complete; - grpc_closure_init(&calld->on_complete, on_complete, calld); - op->on_complete = &calld->on_complete; + inject_on_complete_cb(deadline_state, op); } } +} + +// +// filter code +// + +// Used for both client and server filters. +typedef struct channel_data { +} channel_data; + +// Constructor for channel_data. Used for both client and server filters. +static void init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { + GPR_ASSERT(!args->is_last); +} + +// Destructor for channel_data. Used for both client and server filters. +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) { +} + +// Call data used for both client and server filter. +typedef struct base_call_data { + grpc_deadline_state deadline_state; +} base_call_data; + +// Additional call data used only for the server filter. +typedef struct server_call_data { + base_call_data base; // Must be first. + // The closure for receiving initial metadata. + grpc_closure recv_initial_metadata_ready; + // Received initial metadata batch. + grpc_metadata_batch* recv_initial_metadata; + // The original recv_initial_metadata_ready closure, which we chain to + // after our own closure is invoked. + grpc_closure* next_recv_initial_metadata_ready; +} server_call_data; + +// Constructor for call_data. Used for both client and server filters. +static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_call_element_args* args) { +gpr_log(GPR_INFO, "==> %s() -- call_data_size=%lu", __func__, elem->filter->sizeof_call_data); + base_call_data* calld = elem->call_data; + // Note: size of call data is different between client and server. + memset(calld, 0, elem->filter->sizeof_call_data); + grpc_deadline_state_init(&calld->deadline_state, args->call_stack); + +calld->deadline_state.is_client = elem->filter->sizeof_call_data == sizeof(base_call_data); + + return GRPC_ERROR_NONE; +} + +// Destructor for call_data. Used for both client and server filters. +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + const grpc_call_final_info* final_info, + void* and_free_memory) { + base_call_data* calld = elem->call_data; + grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state); +} + +// Method for starting a call op for client filter. +static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_transport_stream_op* op) { +gpr_log(GPR_INFO, "==> %s()", __func__); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); // Chain to next filter. grpc_call_next_op(exec_ctx, elem, op); } @@ -201,11 +243,11 @@ static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { +gpr_log(GPR_INFO, "==> %s(): op=%p {on_complete=%p, cancel_error=%p, close_error=%p, send_initial_metadata=%p, send_trailing_metadata=%p, send_message=%p, recv_initial_metadata_ready=%p, recv_trailing_metadata=%p}", __func__, op, op->on_complete, op->cancel_error, op->close_error, op->send_initial_metadata, op->send_trailing_metadata, op->send_message, op->recv_initial_metadata_ready, op->recv_trailing_metadata); server_call_data* calld = elem->call_data; - // If the call is cancelled or closed, cancel the timer. if (op->cancel_error != GRPC_ERROR_NONE || op->close_error != GRPC_ERROR_NONE) { - cancel_timer_if_needed(exec_ctx, &calld->base); + cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); } else { // If we're receiving initial metadata, we need to get the deadline // from the recv_initial_metadata_ready callback. So we inject our @@ -223,9 +265,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, // the client never sends trailing metadata, because this is the // hook that tells us when the call is complete on the server side. if (op->recv_trailing_metadata != NULL) { - calld->base.next_on_complete = op->on_complete; - grpc_closure_init(&calld->base.on_complete, on_complete, calld); - op->on_complete = &calld->base.on_complete; + inject_on_complete_cb(&calld->base.deadline_state, op); } } // Chain to next filter. diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 323cb4e10cf..793b76f9b68 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -33,7 +33,48 @@ #define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/iomgr/timer.h" +// State used for filters that enforce call deadlines. +// Should be the first field in the filter's call_data. +typedef struct grpc_deadline_state { + // We take a reference to the call stack for the timer callback. + grpc_call_stack* call_stack; + // Guards access to timer_pending and timer. + gpr_mu timer_mu; + // True if the timer callback is currently pending. + bool timer_pending; + // The deadline timer. + grpc_timer timer; + // Closure to invoke when the call is complete. + // We use this to cancel the timer. + grpc_closure on_complete; + // The original on_complete closure, which we chain to after our own + // closure is invoked. + grpc_closure* next_on_complete; + +// FIXME: remove +bool is_client; + +} grpc_deadline_state; + +void grpc_deadline_state_init(grpc_deadline_state* call_data, + grpc_call_stack* call_stack); +void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, + grpc_deadline_state* call_data); + +// To be used in a filter's start_transport_stream_op() method to +// enforce call deadlines. +// It is the caller's responsibility to chain to the next filter if +// necessary after this function returns. +// REQUIRES: The first field in elem is a grpc_deadline_state struct. +void grpc_deadline_state_client_start_transport_stream_op( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op* op); + +// Deadline filters for direct client channels and server channels. +// Note: Deadlines for non-direct client channels are handled by the +// client_channel filter. extern const grpc_channel_filter grpc_client_deadline_filter; extern const grpc_channel_filter grpc_server_deadline_filter; diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 4a3c03b915b..158dca09647 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -98,9 +98,6 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder, } static void register_builtin_channel_init() { - grpc_channel_init_register_stage( - GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - prepend_filter, (void *)&grpc_client_deadline_filter); grpc_channel_init_register_stage( GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, (void *)&grpc_client_deadline_filter);