Merge branch 'one-pass' of github.com:ctiller/grpc into one-pass

pull/1369/head
Craig Tiller 10 years ago
commit 0b20751b50
  1. 30
      src/core/channel/census_filter.c
  2. 3
      src/core/channel/channel_stack.c
  3. 4
      src/core/channel/channel_stack.h
  4. 6
      src/core/channel/child_channel.c
  5. 2
      src/core/channel/child_channel.h
  6. 3
      src/core/channel/client_channel.c
  7. 7
      src/core/channel/connected_channel.c
  8. 1
      src/core/transport/chttp2_transport.c
  9. 36
      src/core/transport/transport.h

@ -82,15 +82,18 @@ static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, call_da
}
}
static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
static void client_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
if (op->send_ops) {
extract_and_annotate_method_tag(op->send_ops, calld, chand);
}
}
static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
client_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
@ -104,12 +107,8 @@ static void server_on_done_recv(void *ptr, int success) {
calld->on_done_recv(calld->recv_user_data, success);
}
static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
if (op->recv_ops) {
/* substitute our callback for the op callback */
calld->recv_ops = op->recv_ops;
@ -118,7 +117,12 @@ static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op
op->on_done_recv = server_on_done_recv;
op->recv_user_data = elem;
}
/* Always pass control up or down the stack depending on op->dir */
}
static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
@ -136,12 +140,13 @@ static void channel_op(grpc_channel_element* elem,
}
static void client_init_call_elem(grpc_call_element* elem,
const void* server_transport_data) {
const void* server_transport_data, grpc_transport_op *initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
d->start_ts = gpr_now();
d->op_id = census_tracing_start_op();
if (initial_op) client_mutate_op(elem, initial_op);
}
static void client_destroy_call_elem(grpc_call_element* elem) {
@ -152,12 +157,13 @@ static void client_destroy_call_elem(grpc_call_element* elem) {
}
static void server_init_call_elem(grpc_call_element* elem,
const void* server_transport_data) {
const void* server_transport_data, grpc_transport_op *initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
d->start_ts = gpr_now();
d->op_id = census_tracing_start_op();
if (initial_op) server_mutate_op(elem, initial_op);
}
static void server_destroy_call_elem(grpc_call_element* elem) {

@ -148,6 +148,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack) {
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
grpc_transport_op *initial_op,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count;
@ -165,7 +166,7 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack,
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data);
call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data, initial_op);
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}

@ -121,7 +121,8 @@ typedef struct {
transport and is on the server. Most filters want to ignore this
argument.*/
void (*init_call_elem)(grpc_call_element *elem,
const void *server_transport_data);
const void *server_transport_data,
grpc_transport_op *initial_op);
/* Destroy per call data.
The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_call_element *elem);
@ -200,6 +201,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack);
server. */
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
grpc_transport_op *initial_op,
grpc_call_stack *call_stack);
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack);

@ -121,7 +121,7 @@ static void lb_channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void lb_init_call_elem(grpc_call_element *elem,
const void *server_transport_data) {}
const void *server_transport_data, grpc_transport_op *initial_op) {}
/* Destructor for call_data */
static void lb_destroy_call_elem(grpc_call_element *elem) {}
@ -261,13 +261,13 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel,
}
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
grpc_call_element *parent) {
grpc_call_element *parent, grpc_transport_op *initial_op) {
grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
grpc_call_element *lbelem;
lb_call_data *lbcalld;
lb_channel_data *lbchand;
grpc_call_stack_init(channel, NULL, stk);
grpc_call_stack_init(channel, NULL, initial_op, stk);
lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
lbchand = lbelem->channel_data;
lbcalld = lbelem->call_data;

@ -57,7 +57,7 @@ void grpc_child_channel_destroy(grpc_child_channel *channel,
int wait_for_callbacks);
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
grpc_call_element *parent);
grpc_call_element *parent, grpc_transport_op *initial_op);
grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call);
void grpc_child_call_destroy(grpc_child_call *call);

@ -105,7 +105,8 @@ static int prepare_activate(grpc_call_element *elem,
calld->state = CALL_ACTIVE;
/* create a child call */
calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
/* TODO(ctiller): pass the waiting op down here */
calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem, NULL);
return 1;
}

@ -95,15 +95,16 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data) {
const void *server_transport_data,
grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
r = grpc_transport_init_stream(chand->transport,
r = grpc_transport_1chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
server_transport_data);
server_transport_data, initial_op);
GPR_ASSERT(r == 0);
}

@ -607,6 +607,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
lock(t);
s->id = 0;
} else {
/* already locked */
s->id = (gpr_uint32)(gpr_uintptr)server_data;
t->incoming_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);

@ -60,6 +60,23 @@ typedef enum grpc_stream_state {
GRPC_STREAM_CLOSED
} grpc_stream_state;
/* Transport op: a set of operations to perform on a transport */
typedef struct grpc_transport_op {
grpc_stream_op_buffer *send_ops;
int is_last_send;
void (*on_done_send)(void *user_data, int success);
void *send_user_data;
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
void (*on_done_recv)(void *user_data, int success);
void *recv_user_data;
grpc_pollset *bind_pollset;
grpc_status_code cancel_with_status;
} grpc_transport_op;
/* Callbacks made from the transport to the upper layers of grpc. */
struct grpc_transport_callbacks {
/* Initialize a new stream on behalf of the transport.
@ -98,7 +115,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
server_data - either NULL for a client initiated stream, or a pointer
supplied from the accept_stream callback function */
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
const void *server_data);
const void *server_data, grpc_transport_op *initial_op);
/* Destroy transport data for a stream.
@ -113,23 +130,6 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream);
/* Transport op: a set of operations to perform on a transport */
typedef struct grpc_transport_op {
grpc_stream_op_buffer *send_ops;
int is_last_send;
void (*on_done_send)(void *user_data, int success);
void *send_user_data;
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
void (*on_done_recv)(void *user_data, int success);
void *recv_user_data;
grpc_pollset *bind_pollset;
grpc_status_code cancel_with_status;
} grpc_transport_op;
void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
/* TODO(ctiller): remove this */

Loading…
Cancel
Save