Pass path into init_call_elem() for use in looking up method config.

pull/8303/head
Mark D. Roth 8 years ago
parent bec804b77f
commit aa850a7b48
  1. 52
      src/core/ext/client_config/client_channel.c
  2. 4
      src/core/ext/client_config/subchannel.c
  3. 3
      src/core/ext/client_config/subchannel.h
  4. 3
      src/core/lib/channel/channel_stack.c
  5. 3
      src/core/lib/channel/channel_stack.h
  6. 20
      src/core/lib/channel/deadline_filter.c
  7. 5
      src/core/lib/channel/deadline_filter.h
  8. 8
      src/core/lib/surface/call.c

@ -421,6 +421,9 @@ typedef struct client_channel_call_data {
grpc_deadline_state deadline_state;
gpr_timespec deadline;
// Request path.
grpc_mdstr *path;
grpc_error *cancel_error;
/** either 0 for no call, 1 for cancelled, or a pointer to a
@ -433,9 +436,6 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
grpc_mdstr *path;
grpc_method_config *method_config;
grpc_transport_stream_op **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
@ -511,9 +511,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = arg;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
call_data *calld = arg;
gpr_mu_lock(&calld->mu);
GPR_ASSERT(calld->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
@ -528,18 +526,11 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_REFERENCING(
"Cancelled before creating subchannel", &error, 1));
} else {
/* Get method config. */
// FIXME: need to actually use the config data!
// FIXME: think about refcounting vs. atomicity here
if (chand->method_config_table != NULL) {
calld->method_config = grpc_method_config_table_get_method_config(
chand->method_config_table, calld->path);
}
/* Create call on subchannel. */
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
calld->deadline, &subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
subchannel_call = CANCELLED_CALL;
@ -745,15 +736,8 @@ retry:
if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
calld->connected_subchannel == NULL &&
op->send_initial_metadata != NULL) {
for (grpc_linked_mdelem *mdelem = op->send_initial_metadata->list.head;
mdelem != NULL; mdelem = mdelem->next) {
if (mdelem->md->key == GRPC_MDSTR_PATH) {
calld->path = GRPC_MDSTR_REF(mdelem->md->value);
break;
}
}
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
grpc_closure_init(&calld->next_step, subchannel_ready, elem);
grpc_closure_init(&calld->next_step, subchannel_ready, calld);
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
@ -768,8 +752,8 @@ retry:
calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
calld->deadline, &subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
@ -790,15 +774,22 @@ retry:
static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_deadline_state_init(exec_ctx, elem, args);
gpr_mu_lock(&chand->mu);
grpc_method_config_table *method_config_table =
chand->method_config_table == NULL
? NULL
: grpc_method_config_table_ref(chand->method_config_table);
gpr_mu_unlock(&chand->mu);
grpc_deadline_state_init(exec_ctx, elem, args, method_config_table);
grpc_method_config_table_unref(chand->method_config_table);
calld->deadline = args->deadline;
calld->path = GRPC_MDSTR_REF(args->path);
calld->cancel_error = GRPC_ERROR_NONE;
gpr_atm_rel_store(&calld->subchannel_call, 0);
gpr_mu_init(&calld->mu);
calld->connected_subchannel = NULL;
calld->path = NULL;
calld->method_config = NULL;
calld->waiting_ops = NULL;
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
@ -815,11 +806,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
void *and_free_memory) {
call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem);
GRPC_MDSTR_UNREF(calld->path);
GRPC_ERROR_UNREF(calld->cancel_error);
// FIXME: remove
if (calld->path != NULL) GRPC_MDSTR_UNREF(calld->path);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");

@ -700,7 +700,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_polling_entity *pollent, gpr_timespec deadline,
grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline,
grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
@ -708,7 +708,7 @@ grpc_error *grpc_connected_subchannel_create_call(
(*call)->connection = con; // Ref is added below.
grpc_error *error =
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
NULL, NULL, deadline, callstk);
NULL, NULL, path, deadline, callstk);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);

@ -38,6 +38,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
@ -110,7 +111,7 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
grpc_polling_entity *pollent, gpr_timespec deadline,
grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline,
grpc_subchannel_call **subchannel_call);
/** process a transport level op */

@ -162,7 +162,7 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
gpr_timespec deadline, grpc_call_stack *call_stack) {
grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@ -183,6 +183,7 @@ grpc_error *grpc_call_stack_init(
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
args.path = path;
args.deadline = deadline;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;

@ -74,6 +74,7 @@ typedef struct {
grpc_call_stack *call_stack;
const void *server_transport_data;
grpc_call_context_element *context;
grpc_mdstr *path;
gpr_timespec deadline;
} grpc_call_element_args;
@ -225,7 +226,7 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
gpr_timespec deadline, grpc_call_stack *call_stack);
grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,

@ -123,15 +123,28 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
}
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_element_args* args) {
grpc_call_element_args* args,
grpc_method_config_table* method_config_table) {
grpc_deadline_state* deadline_state = elem->call_data;
memset(deadline_state, 0, sizeof(*deadline_state));
deadline_state->call_stack = args->call_stack;
gpr_mu_init(&deadline_state->timer_mu);
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
const gpr_timespec deadline =
gpr_timespec deadline =
gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
if (method_config_table != NULL) {
grpc_method_config* method_config =
grpc_method_config_table_get_method_config(method_config_table,
args->path);
if (method_config != NULL) {
gpr_timespec* per_method_deadline =
grpc_method_config_get_timeout(method_config);
if (per_method_deadline != NULL) {
deadline = gpr_time_min(deadline, *per_method_deadline);
}
}
}
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
// When the deadline passes, we indicate the failure by sending down
// an op with cancel_error set. However, we can't send down any ops
@ -209,7 +222,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element_args* args) {
// Note: size of call data is different between client and server.
memset(elem->call_data, 0, elem->filter->sizeof_call_data);
grpc_deadline_state_init(exec_ctx, elem, args);
grpc_deadline_state_init(exec_ctx, elem, args,
NULL /* method_config_table */);
return GRPC_ERROR_NONE;
}

@ -35,6 +35,8 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/ext/client_config/method_config.h"
// State used for filters that enforce call deadlines.
// Must be the first field in the filter's call_data.
typedef struct grpc_deadline_state {
@ -63,7 +65,8 @@ typedef struct grpc_deadline_state {
// caller's responsibility to chain to the next filter if necessary
// after the function returns.
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_element_args* args);
grpc_call_element_args* args,
grpc_method_config_table* method_config_table);
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem);
void grpc_deadline_state_client_start_transport_stream_op(

@ -242,10 +242,14 @@ grpc_call *grpc_call_create(
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = server_transport_data == NULL;
grpc_mdstr *path = NULL;
if (call->is_client) {
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT);
for (i = 0; i < add_initial_metadata_count; i++) {
call->send_extra_metadata[i].md = add_initial_metadata[i];
if (add_initial_metadata[i]->key == GRPC_MDSTR_PATH) {
path = GRPC_MDSTR_REF(add_initial_metadata[i]->value);
}
}
call->send_extra_metadata_count = (int)add_initial_metadata_count;
} else {
@ -307,7 +311,7 @@ grpc_call *grpc_call_create(
/* initial refcount dropped by grpc_call_destroy */
grpc_error *error = grpc_call_stack_init(
&exec_ctx, channel_stack, 1, destroy_call, call, call->context,
server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call));
server_transport_data, path, send_deadline, CALL_STACK_FROM_CALL(call));
if (error != GRPC_ERROR_NONE) {
grpc_status_code status;
const char *error_str;
@ -332,6 +336,8 @@ grpc_call *grpc_call_create(
&exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
if (path != NULL) GRPC_MDSTR_UNREF(path);
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_call_create", 0);
return call;

Loading…
Cancel
Save