From aa850a7b48ac78f9b86e036b448519ec579c440b Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 26 Sep 2016 13:38:02 -0700 Subject: [PATCH] Pass path into init_call_elem() for use in looking up method config. --- src/core/ext/client_config/client_channel.c | 52 ++++++++------------- src/core/ext/client_config/subchannel.c | 4 +- src/core/ext/client_config/subchannel.h | 3 +- src/core/lib/channel/channel_stack.c | 3 +- src/core/lib/channel/channel_stack.h | 3 +- src/core/lib/channel/deadline_filter.c | 20 ++++++-- src/core/lib/channel/deadline_filter.h | 5 +- src/core/lib/surface/call.c | 8 +++- 8 files changed, 56 insertions(+), 42 deletions(-) diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 87e3a1a63a6..e61d2534806 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -421,6 +421,9 @@ typedef struct client_channel_call_data { grpc_deadline_state deadline_state; gpr_timespec deadline; + // Request path. + grpc_mdstr *path; + grpc_error *cancel_error; /** either 0 for no call, 1 for cancelled, or a pointer to a @@ -433,9 +436,6 @@ typedef struct client_channel_call_data { grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; - grpc_mdstr *path; - grpc_method_config *method_config; - grpc_transport_stream_op **waiting_ops; size_t waiting_ops_count; size_t waiting_ops_capacity; @@ -511,9 +511,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call_element *elem = arg; - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; + call_data *calld = arg; gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); @@ -528,18 +526,11 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING( "Cancelled before creating subchannel", &error, 1)); } else { - /* Get method config. */ -// FIXME: need to actually use the config data! -// FIXME: think about refcounting vs. atomicity here - if (chand->method_config_table != NULL) { - calld->method_config = grpc_method_config_table_get_method_config( - chand->method_config_table, calld->path); - } /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, - &subchannel_call); + exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, + calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -745,15 +736,8 @@ retry: if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { - for (grpc_linked_mdelem *mdelem = op->send_initial_metadata->list.head; - mdelem != NULL; mdelem = mdelem->next) { - if (mdelem->md->key == GRPC_MDSTR_PATH) { - calld->path = GRPC_MDSTR_REF(mdelem->md->value); - break; - } - } calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, elem); + grpc_closure_init(&calld->next_step, subchannel_ready, calld); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, @@ -768,8 +752,8 @@ retry: calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, - &subchannel_call); + exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, + calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); @@ -790,15 +774,22 @@ retry: static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { + channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_deadline_state_init(exec_ctx, elem, args); + gpr_mu_lock(&chand->mu); + grpc_method_config_table *method_config_table = + chand->method_config_table == NULL + ? NULL + : grpc_method_config_table_ref(chand->method_config_table); + gpr_mu_unlock(&chand->mu); + grpc_deadline_state_init(exec_ctx, elem, args, method_config_table); + grpc_method_config_table_unref(chand->method_config_table); calld->deadline = args->deadline; + calld->path = GRPC_MDSTR_REF(args->path); calld->cancel_error = GRPC_ERROR_NONE; gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_mu_init(&calld->mu); calld->connected_subchannel = NULL; - calld->path = NULL; - calld->method_config = NULL; calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; @@ -815,11 +806,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, void *and_free_memory) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); + GRPC_MDSTR_UNREF(calld->path); GRPC_ERROR_UNREF(calld->cancel_error); - -// FIXME: remove - if (calld->path != NULL) GRPC_MDSTR_UNREF(calld->path); - grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index f2b860807f3..6674dbbe68f 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -700,7 +700,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline, grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); @@ -708,7 +708,7 @@ grpc_error *grpc_connected_subchannel_create_call( (*call)->connection = con; // Ref is added below. grpc_error *error = grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, deadline, callstk); + NULL, NULL, path, deadline, callstk); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h index 33306210712..f8de26dfd51 100644 --- a/src/core/ext/client_config/subchannel.h +++ b/src/core/ext/client_config/subchannel.h @@ -38,6 +38,7 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/metadata.h" /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ @@ -110,7 +111,7 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call */ grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline, grpc_subchannel_call **subchannel_call); /** process a transport level op */ diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 57d34d9e9ab..205496f2f26 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -162,7 +162,7 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - gpr_timespec deadline, grpc_call_stack *call_stack) { + grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_call_element_args args; size_t count = channel_stack->count; @@ -183,6 +183,7 @@ grpc_error *grpc_call_stack_init( args.call_stack = call_stack; args.server_transport_data = transport_server_data; args.context = context; + args.path = path; args.deadline = deadline; call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 1cfe2885d81..5b46cd32a3a 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -74,6 +74,7 @@ typedef struct { grpc_call_stack *call_stack; const void *server_transport_data; grpc_call_context_element *context; + grpc_mdstr *path; gpr_timespec deadline; } grpc_call_element_args; @@ -225,7 +226,7 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - gpr_timespec deadline, grpc_call_stack *call_stack); + grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 079b98a2f85..89dc8dd6e8c 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -123,15 +123,28 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, } void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args) { + grpc_call_element_args* args, + grpc_method_config_table* method_config_table) { grpc_deadline_state* deadline_state = elem->call_data; memset(deadline_state, 0, sizeof(*deadline_state)); deadline_state->call_stack = args->call_stack; gpr_mu_init(&deadline_state->timer_mu); // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. - const gpr_timespec deadline = + gpr_timespec deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + if (method_config_table != NULL) { + grpc_method_config* method_config = + grpc_method_config_table_get_method_config(method_config_table, + args->path); + if (method_config != NULL) { + gpr_timespec* per_method_deadline = + grpc_method_config_get_timeout(method_config); + if (per_method_deadline != NULL) { + deadline = gpr_time_min(deadline, *per_method_deadline); + } + } + } if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // When the deadline passes, we indicate the failure by sending down // an op with cancel_error set. However, we can't send down any ops @@ -209,7 +222,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element_args* args) { // Note: size of call data is different between client and server. memset(elem->call_data, 0, elem->filter->sizeof_call_data); - grpc_deadline_state_init(exec_ctx, elem, args); + grpc_deadline_state_init(exec_ctx, elem, args, + NULL /* method_config_table */); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 685df877617..2fcee0b9ef0 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -35,6 +35,8 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/ext/client_config/method_config.h" + // State used for filters that enforce call deadlines. // Must be the first field in the filter's call_data. typedef struct grpc_deadline_state { @@ -63,7 +65,8 @@ typedef struct grpc_deadline_state { // caller's responsibility to chain to the next filter if necessary // after the function returns. void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args); + grpc_call_element_args* args, + grpc_method_config_table* method_config_table); void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem); void grpc_deadline_state_client_start_transport_stream_op( diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 5690bcab1e4..0af72547e37 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -242,10 +242,14 @@ grpc_call *grpc_call_create( /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = server_transport_data == NULL; + grpc_mdstr *path = NULL; if (call->is_client) { GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); for (i = 0; i < add_initial_metadata_count; i++) { call->send_extra_metadata[i].md = add_initial_metadata[i]; + if (add_initial_metadata[i]->key == GRPC_MDSTR_PATH) { + path = GRPC_MDSTR_REF(add_initial_metadata[i]->value); + } } call->send_extra_metadata_count = (int)add_initial_metadata_count; } else { @@ -307,7 +311,7 @@ grpc_call *grpc_call_create( /* initial refcount dropped by grpc_call_destroy */ grpc_error *error = grpc_call_stack_init( &exec_ctx, channel_stack, 1, destroy_call, call, call->context, - server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call)); + server_transport_data, path, send_deadline, CALL_STACK_FROM_CALL(call)); if (error != GRPC_ERROR_NONE) { grpc_status_code status; const char *error_str; @@ -332,6 +336,8 @@ grpc_call *grpc_call_create( &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } + if (path != NULL) GRPC_MDSTR_UNREF(path); + grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_create", 0); return call;