Change grpc_channel_filter init_call_elem() method to return grpc_error.

pull/7024/head
Mark D. Roth 9 years ago
parent acbf8c290a
commit 0badbe8b11
  1. 14
      src/core/ext/census/grpc_filter.c
  2. 6
      src/core/ext/client_config/client_channel.c
  3. 7
      src/core/ext/client_config/subchannel.c
  4. 6
      src/core/ext/load_reporting/load_reporting_filter.c
  5. 19
      src/core/lib/channel/channel_stack.c
  6. 18
      src/core/lib/channel/channel_stack.h
  7. 7
      src/core/lib/channel/compress_filter.c
  8. 13
      src/core/lib/channel/connected_channel.c
  9. 6
      src/core/lib/channel/http_client_filter.c
  10. 6
      src/core/lib/channel/http_server_filter.c
  11. 6
      src/core/lib/security/transport/client_auth_filter.c
  12. 7
      src/core/lib/security/transport/server_auth_filter.c
  13. 9
      src/core/lib/surface/call.c
  14. 7
      src/core/lib/surface/lame_client.c
  15. 6
      src/core/lib/surface/server.c
  16. 12
      test/core/channel/channel_stack_test.c
  17. 7
      test/core/end2end/tests/filter_causes_close.c

@ -124,13 +124,14 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, op);
}
static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* client_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
return GRPC_ERROR_NONE;
}
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
@ -142,15 +143,16 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
}
static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* server_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
/* TODO(hongyu): call census_tracing_start_op here. */
grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
return GRPC_ERROR_NONE;
}
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,

@ -430,10 +430,12 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem,
args->call_stack);
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */

@ -709,8 +709,11 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con;
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
NULL, NULL, callstk);
grpc_error* error = grpc_call_stack_init(exec_ctx, chanstk, 1,
subchannel_call_destroy, call,
NULL, NULL, callstk);
// FIXME: handle error (probably requires changing this function's API)
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
return call;
}

@ -56,10 +56,12 @@ static void invoke_lr_fn_locked(grpc_load_reporting_config *lrc,
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(call_data));
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */

@ -157,12 +157,13 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}
void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack) {
grpc_error* grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@ -185,10 +186,14 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
grpc_error* error = call_elems[i].filter->init_call_elem(
exec_ctx, &call_elems[i], &args);
if (error != GRPC_ERROR_NONE)
return error;
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
return GRPC_ERROR_NONE;
}
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,

@ -110,8 +110,9 @@ typedef struct {
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
argument. */
void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args);
grpc_error* (*init_call_elem)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args);
void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent);
@ -209,12 +210,13 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack);
grpc_error* grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,

@ -256,8 +256,9 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@ -266,6 +267,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
calld->has_compression_algorithm = 0;
grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */

@ -81,16 +81,17 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
r = grpc_transport_init_stream(
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data);
GPR_ASSERT(r == 0);
return r == 0
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("transport initialization failed");
}
static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,

@ -169,11 +169,13 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
calld->on_done_recv = NULL;
grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */

@ -224,13 +224,15 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */

@ -264,10 +264,12 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(*calld));
return GRPC_ERROR_NONE;
}
static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,

@ -199,8 +199,9 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
@ -222,6 +223,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
args->context[GRPC_CONTEXT_SECURITY].value = server_ctx;
args->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_server_security_context_destroy;
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */

@ -262,9 +262,12 @@ grpc_call *grpc_call_create(
call->send_deadline = send_deadline;
GRPC_CHANNEL_INTERNAL_REF(channel, "call");
/* initial refcount dropped by grpc_call_destroy */
grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
call->context, server_transport_data,
CALL_STACK_FROM_CALL(call));
grpc_error* error = grpc_call_stack_init(&exec_ctx, channel_stack, 1,
destroy_call, call, call->context,
server_transport_data,
CALL_STACK_FROM_CALL(call));
// FIXME: handle error (probably requires changing this function's API)
GPR_ASSERT(error == GRPC_ERROR_NONE);
if (cq != NULL) {
GPR_ASSERT(
pollset_set_alternative == NULL &&

@ -107,8 +107,11 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {}
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
return GRPC_ERROR_NONE;
}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_stats *stats,

@ -848,8 +848,9 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
}
}
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@ -861,6 +862,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_on_recv_initial_metadata, elem);
server_ref(chand->server);
return GRPC_ERROR_NONE;
}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,

@ -53,10 +53,12 @@ static void channel_init_func(grpc_exec_ctx *exec_ctx,
*(int *)(elem->channel_data) = 0;
}
static void call_init_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error* call_init_func(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
++*(int *)(elem->channel_data);
*(int *)(elem->call_data) = 0;
return GRPC_ERROR_NONE;
}
static void channel_destroy_func(grpc_exec_ctx *exec_ctx,
@ -132,8 +134,10 @@ static void test_create_channel_stack(void) {
GPR_ASSERT(*channel_data == 0);
call_stack = gpr_malloc(channel_stack->call_stack_size);
grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack, NULL,
NULL, call_stack);
grpc_error* error = grpc_call_stack_init(&exec_ctx, channel_stack, 1,
free_call, call_stack, NULL, NULL,
call_stack);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(call_stack->count == 1);
call_elem = grpc_call_stack_element(call_stack, 0);
GPR_ASSERT(call_elem->filter == channel_elem->filter);

@ -233,8 +233,11 @@ static void start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, op);
}
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {}
static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
return GRPC_ERROR_NONE;
}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_stats *stats,

Loading…
Cancel
Save