From 3f2c2214b718ee61ddfd52efe922e652535aa537 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 23 Apr 2015 07:56:33 -0700 Subject: [PATCH] Fiddling with an initial op --- src/core/channel/census_filter.c | 30 +++++++++++++--------- src/core/channel/channel_stack.c | 3 ++- src/core/channel/channel_stack.h | 4 ++- src/core/channel/child_channel.c | 6 ++--- src/core/channel/child_channel.h | 2 +- src/core/channel/client_channel.c | 3 ++- src/core/channel/connected_channel.c | 7 +++--- src/core/transport/chttp2_transport.c | 1 + src/core/transport/transport.h | 36 +++++++++++++-------------- 9 files changed, 52 insertions(+), 40 deletions(-) diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index 3e0fc39fc9a..47461f7f2b3 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -82,15 +82,18 @@ static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, call_da } } -static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { +static void client_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; - GPR_ASSERT(calld != NULL); - GPR_ASSERT(chand != NULL); - GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); if (op->send_ops) { extract_and_annotate_method_tag(op->send_ops, calld, chand); } +} + +static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { + call_data* calld = elem->call_data; + GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); + client_mutate_op(elem, op); grpc_call_next_op(elem, op); } @@ -104,12 +107,8 @@ static void server_on_done_recv(void *ptr, int success) { calld->on_done_recv(calld->recv_user_data, success); } -static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { +static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { call_data* calld = elem->call_data; - channel_data* chand = elem->channel_data; - GPR_ASSERT(calld != NULL); - GPR_ASSERT(chand != NULL); - GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); if (op->recv_ops) { /* substitute our callback for the op callback */ calld->recv_ops = op->recv_ops; @@ -118,7 +117,12 @@ static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op op->on_done_recv = server_on_done_recv; op->recv_user_data = elem; } - /* Always pass control up or down the stack depending on op->dir */ +} + +static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { + call_data* calld = elem->call_data; + GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); + server_mutate_op(elem, op); grpc_call_next_op(elem, op); } @@ -136,12 +140,13 @@ static void channel_op(grpc_channel_element* elem, } static void client_init_call_elem(grpc_call_element* elem, - const void* server_transport_data) { + const void* server_transport_data, grpc_transport_op *initial_op) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); init_rpc_stats(&d->stats); d->start_ts = gpr_now(); d->op_id = census_tracing_start_op(); + if (initial_op) client_mutate_op(elem, initial_op); } static void client_destroy_call_elem(grpc_call_element* elem) { @@ -152,12 +157,13 @@ static void client_destroy_call_elem(grpc_call_element* elem) { } static void server_init_call_elem(grpc_call_element* elem, - const void* server_transport_data) { + const void* server_transport_data, grpc_transport_op *initial_op) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); init_rpc_stats(&d->stats); d->start_ts = gpr_now(); d->op_id = census_tracing_start_op(); + if (initial_op) server_mutate_op(elem, initial_op); } static void server_destroy_call_elem(grpc_call_element* elem) { diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index c121e270056..022100e8bd7 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -148,6 +148,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack) { void grpc_call_stack_init(grpc_channel_stack *channel_stack, const void *transport_server_data, + grpc_transport_op *initial_op, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); size_t count = channel_stack->count; @@ -165,7 +166,7 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack, call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; - call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data); + call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data, initial_op); user_data += ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 75897ff6516..94b12639fc8 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -121,7 +121,8 @@ typedef struct { transport and is on the server. Most filters want to ignore this argument.*/ void (*init_call_elem)(grpc_call_element *elem, - const void *server_transport_data); + const void *server_transport_data, + grpc_transport_op *initial_op); /* Destroy per call data. The filter does not need to do any chaining */ void (*destroy_call_elem)(grpc_call_element *elem); @@ -200,6 +201,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack); server. */ void grpc_call_stack_init(grpc_channel_stack *channel_stack, const void *transport_server_data, + grpc_transport_op *initial_op, grpc_call_stack *call_stack); /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_call_stack *stack); diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index 244417384a7..817a2a8c70a 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -121,7 +121,7 @@ static void lb_channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void lb_init_call_elem(grpc_call_element *elem, - const void *server_transport_data) {} + const void *server_transport_data, grpc_transport_op *initial_op) {} /* Destructor for call_data */ static void lb_destroy_call_elem(grpc_call_element *elem) {} @@ -261,13 +261,13 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel, } grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, - grpc_call_element *parent) { + grpc_call_element *parent, grpc_transport_op *initial_op) { grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size); grpc_call_element *lbelem; lb_call_data *lbcalld; lb_channel_data *lbchand; - grpc_call_stack_init(channel, NULL, stk); + grpc_call_stack_init(channel, NULL, initial_op, stk); lbelem = LINK_BACK_ELEM_FROM_CALL(stk); lbchand = lbelem->channel_data; lbcalld = lbelem->call_data; diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h index 38695402ab0..264a8bbb826 100644 --- a/src/core/channel/child_channel.h +++ b/src/core/channel/child_channel.h @@ -57,7 +57,7 @@ void grpc_child_channel_destroy(grpc_child_channel *channel, int wait_for_callbacks); grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, - grpc_call_element *parent); + grpc_call_element *parent, grpc_transport_op *initial_op); grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call); void grpc_child_call_destroy(grpc_child_call *call); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 6ad50cb9448..e6b0f7bba8f 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -105,7 +105,8 @@ static int prepare_activate(grpc_call_element *elem, calld->state = CALL_ACTIVE; /* create a child call */ - calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem); + /* TODO(ctiller): pass the waiting op down here */ + calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem, NULL); return 1; } diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 9e2d92ffbc2..5a5a8499072 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -95,15 +95,16 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int r; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - r = grpc_transport_init_stream(chand->transport, + r = grpc_transport_1chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - server_transport_data); + server_transport_data, initial_op); GPR_ASSERT(r == 0); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 9c2af560c13..acdc98b86b4 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -607,6 +607,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, lock(t); s->id = 0; } else { + /* already locked */ s->id = (gpr_uint32)(gpr_uintptr)server_data; t->incoming_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index d0007680e3a..a51e01d3c96 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -60,6 +60,23 @@ typedef enum grpc_stream_state { GRPC_STREAM_CLOSED } grpc_stream_state; +/* Transport op: a set of operations to perform on a transport */ +typedef struct grpc_transport_op { + grpc_stream_op_buffer *send_ops; + int is_last_send; + void (*on_done_send)(void *user_data, int success); + void *send_user_data; + + grpc_stream_op_buffer *recv_ops; + grpc_stream_state *recv_state; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; + + grpc_pollset *bind_pollset; + + grpc_status_code cancel_with_status; +} grpc_transport_op; + /* Callbacks made from the transport to the upper layers of grpc. */ struct grpc_transport_callbacks { /* Initialize a new stream on behalf of the transport. @@ -98,7 +115,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport); server_data - either NULL for a client initiated stream, or a pointer supplied from the accept_stream callback function */ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, - const void *server_data); + const void *server_data, grpc_transport_op *initial_op); /* Destroy transport data for a stream. @@ -113,23 +130,6 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream); -/* Transport op: a set of operations to perform on a transport */ -typedef struct grpc_transport_op { - grpc_stream_op_buffer *send_ops; - int is_last_send; - void (*on_done_send)(void *user_data, int success); - void *send_user_data; - - grpc_stream_op_buffer *recv_ops; - grpc_stream_state *recv_state; - void (*on_done_recv)(void *user_data, int success); - void *recv_user_data; - - grpc_pollset *bind_pollset; - - grpc_status_code cancel_with_status; -} grpc_transport_op; - void grpc_transport_op_finish_with_failure(grpc_transport_op *op); /* TODO(ctiller): remove this */