Merge branch 'async-api' of github.com:ctiller/grpc into async-api-new

pull/381/head
Craig Tiller 10 years ago
commit d6124991d7
  1. 63
      src/core/surface/call.c

@ -76,7 +76,7 @@ typedef struct {
/* Completion function to call at the end of the operation */ /* Completion function to call at the end of the operation */
grpc_ioreq_completion_func on_complete; grpc_ioreq_completion_func on_complete;
void *user_data; void *user_data;
/* a bit mask of which request ops are needed (1 << opid) */ /* a bit mask of which request ops are needed (1u << opid) */
gpr_uint32 need_mask; gpr_uint32 need_mask;
/* a bit mask of which request ops are now completed */ /* a bit mask of which request ops are now completed */
gpr_uint32 complete_mask; gpr_uint32 complete_mask;
@ -128,30 +128,79 @@ struct grpc_call {
/* TODO(ctiller): share with cq if possible? */ /* TODO(ctiller): share with cq if possible? */
gpr_mu mu; gpr_mu mu;
gpr_uint8 is_client; /* how far through the stream have we read? */
read_state read_state; read_state read_state;
/* how far through the stream have we written? */
write_state write_state; write_state write_state;
/* client or server call */
gpr_uint8 is_client;
/* is the alarm set */
gpr_uint8 have_alarm; gpr_uint8 have_alarm;
/* are we currently performing a send operation */
gpr_uint8 sending; gpr_uint8 sending;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests; gpr_uint8 num_completed_requests;
/* flag that we need to request more data */
gpr_uint8 need_more_data; gpr_uint8 need_more_data;
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
operation.
request_set[op] is an integer specifying a set of operations to which
the request belongs:
- if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending
completion, and the integer represents to which group of operations
the ioreq belongs. Each group is represented by one master, and the
integer in request_set is an index into masters to find the master
data.
- if it is REQSET_EMPTY, the ioreq op is inactive and available to be
started
- finally, if request_set[op] is REQSET_DONE, then the operation is
complete and unavailable to be started again
request_data[op] is the request data as supplied by the initiator of
a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT.
The set fields are as per the request type specified by op.
Finally, one element of masters[op] is set per active _group_ of ioreq
operations. It describes work left outstanding, result status, and
what work to perform upon operation completion. As one ioreq of each
op type can be active at once, by convention we choose the first element
of a the group to be the master. This allows constant time allocation
and a strong upper bound of a count of masters to be calculated. */
gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
reqinfo_master masters[GRPC_IOREQ_OP_COUNT]; reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
/* Dynamic array of ioreq's that have completed: the count of
elements is queued in num_completed_requests.
This list is built up under lock(), and flushed entirely during
unlock().
We know the upper bound of the number of elements as we can only
have one ioreq of each type active at once. */
completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
/* Incoming buffer of messages */
grpc_byte_buffer_queue incoming_queue; grpc_byte_buffer_queue incoming_queue;
/* Buffered read metadata waiting to be returned to the application.
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array buffered_metadata[2]; grpc_metadata_array buffered_metadata[2];
/* All metadata received - unreffed at once at the end of the call */
grpc_mdelem **owned_metadata; grpc_mdelem **owned_metadata;
size_t owned_metadata_count; size_t owned_metadata_count;
size_t owned_metadata_capacity; size_t owned_metadata_capacity;
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT]; received_status status[STATUS_SOURCE_COUNT];
/* Deadline alarm - if have_alarm is non-zero */
grpc_alarm alarm; grpc_alarm alarm;
/* Call refcount - to keep the call alive during asynchronous operations */
gpr_refcount internal_refcount; gpr_refcount internal_refcount;
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state; legacy_state *legacy_state;
}; };
@ -280,7 +329,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
reqinfo_master *master; reqinfo_master *master;
if (set >= GRPC_IOREQ_OP_COUNT) return 0; if (set >= GRPC_IOREQ_OP_COUNT) return 0;
master = &call->masters[set]; master = &call->masters[set];
return (master->complete_mask & (1 << op)) == 0; return (master->complete_mask & (1u << op)) == 0;
} }
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
@ -380,7 +429,7 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
size_t i; size_t i;
/* ioreq is live: we need to do something */ /* ioreq is live: we need to do something */
master = &call->masters[master_set]; master = &call->masters[master_set];
master->complete_mask |= 1 << op; master->complete_mask |= 1u << op;
if (status != GRPC_OP_OK) { if (status != GRPC_OP_OK) {
master->status = status; master->status = status;
master->complete_mask = master->need_mask; master->complete_mask = master->need_mask;
@ -585,7 +634,7 @@ static grpc_call_error start_ioreq_error(grpc_call *call,
grpc_call_error ret) { grpc_call_error ret) {
size_t i; size_t i;
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
if (mutated_ops & (1 << i)) { if (mutated_ops & (1u << i)) {
call->request_set[i] = REQSET_EMPTY; call->request_set[i] = REQSET_EMPTY;
} }
} }
@ -672,7 +721,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
} else if (call->request_set[op] == REQSET_DONE) { } else if (call->request_set[op] == REQSET_DONE) {
return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED); return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
} }
have_ops |= 1 << op; have_ops |= 1u << op;
data = reqs[i].data; data = reqs[i].data;
call->request_data[op] = data; call->request_data[op] = data;
@ -686,7 +735,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
master->on_complete = completion; master->on_complete = completion;
master->user_data = user_data; master->user_data = user_data;
if (have_ops & (1 << GRPC_IOREQ_RECV_MESSAGE)) { if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) {
call->need_more_data = 1; call->need_more_data = 1;
} }

Loading…
Cancel
Save