Add some documentation, simplify state

pull/357/head
Craig Tiller 10 years ago
parent c18c56e40c
commit daceea8670
  1. 77
      src/core/surface/call.c

@ -77,7 +77,7 @@ typedef struct {
These structures are manipulated in sets, where a set is a set of These structures are manipulated in sets, where a set is a set of
operations begin with the same call to start_ioreq and the various operations begin with the same call to start_ioreq and the various
public and private api's that call it. Each set has a master reqinfo public and private api's that call it. Each set has a master reqinfo
in which we set a few additional fields. */ in which we set a few additional fields - see reqinfo_master. */
typedef struct { typedef struct {
/* User supplied parameters */ /* User supplied parameters */
grpc_ioreq_data data; grpc_ioreq_data data;
@ -93,25 +93,51 @@ typedef struct {
} reqinfo; } reqinfo;
typedef struct { typedef struct {
/* Overall status of the operation: starts OK, may degrade to
non-OK */
grpc_op_error status; grpc_op_error status;
/* Completion function to call at the end of the operation */
grpc_ioreq_completion_func on_complete; grpc_ioreq_completion_func on_complete;
void *user_data; void *user_data;
/* a bit mask of which request ops are needed (1 << opid) */
gpr_uint32 need_mask; gpr_uint32 need_mask;
/* a bit mask of which request ops are now completed */
gpr_uint32 complete_mask; gpr_uint32 complete_mask;
} reqinfo_master; } reqinfo_master;
/* Status data for a request can come from several sources; this
enumerates them all, and acts as a priority sorting for which
status to return to the application - earlier entries override
later ones */
typedef enum { typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0, STATUS_FROM_API_OVERRIDE = 0,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE, STATUS_FROM_WIRE,
STATUS_SOURCE_COUNT STATUS_SOURCE_COUNT
} status_source; } status_source;
typedef struct { typedef struct {
gpr_uint8 set; gpr_uint8 is_set;
grpc_status_code code; grpc_status_code code;
grpc_mdstr *details; grpc_mdstr *details;
} received_status; } received_status;
/* How far through the GRPC stream have we read? */
typedef enum {
/* We are still waiting for initial metadata to complete */
READ_STATE_INITIAL,
/* We have gotten initial metadata, and are reading either
messages or trailing metadata */
READ_STATE_GOT_INITIAL_METADATA,
/* The stream is closed for reading */
READ_STATE_READ_CLOSED,
/* The stream is closed for reading & writing */
READ_STATE_STREAM_CLOSED
} read_state;
struct grpc_call { struct grpc_call {
grpc_completion_queue *cq; grpc_completion_queue *cq;
grpc_channel *channel; grpc_channel *channel;
@ -120,10 +146,8 @@ struct grpc_call {
gpr_mu mu; gpr_mu mu;
gpr_uint8 is_client; gpr_uint8 is_client;
gpr_uint8 got_initial_metadata; read_state read_state;
gpr_uint8 have_alarm; gpr_uint8 have_alarm;
gpr_uint8 read_closed;
gpr_uint8 stream_closed;
gpr_uint8 sending; gpr_uint8 sending;
gpr_uint8 num_completed_requests; gpr_uint8 num_completed_requests;
gpr_uint8 need_more_data; gpr_uint8 need_more_data;
@ -229,7 +253,7 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source, static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) { gpr_uint32 status) {
call->status[source].set = 1; call->status[source].is_set = 1;
call->status[source].code = status; call->status[source].code = status;
} }
@ -308,7 +332,7 @@ static void unlock(grpc_call *call) {
static void get_final_status(grpc_call *call, grpc_recv_status_args args) { static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
int i; int i;
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].set) { if (call->status[i].is_set) {
*args.code = call->status[i].code; *args.code = call->status[i].code;
if (!args.details) return; if (!args.details) return;
if (call->status[i].details) { if (call->status[i].details) {
@ -581,14 +605,14 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
*data.recv_message = grpc_bbq_pop(&call->incoming_queue); *data.recv_message = grpc_bbq_pop(&call->incoming_queue);
if (*data.recv_message) { if (*data.recv_message) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
if (call->stream_closed && grpc_bbq_empty(&call->incoming_queue)) { if (call->read_state == READ_STATE_STREAM_CLOSED && grpc_bbq_empty(&call->incoming_queue)) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
} }
} else { } else {
/* no message: either end of stream or we need more bytes */ /* no message: either end of stream or we need more bytes */
if (call->read_closed) { if (call->read_state >= READ_STATE_READ_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
if (call->stream_closed) { if (call->read_state == READ_STATE_STREAM_CLOSED) {
/* stream closed AND we've drained all messages: signal to the application */ /* stream closed AND we've drained all messages: signal to the application */
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
} }
@ -598,12 +622,12 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
} }
break; break;
case GRPC_IOREQ_RECV_STATUS: case GRPC_IOREQ_RECV_STATUS:
if (call->read_closed) { if (call->read_state >= READ_STATE_READ_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
} }
break; break;
case GRPC_IOREQ_RECV_CLOSE: case GRPC_IOREQ_RECV_CLOSE:
if (call->stream_closed) { if (call->read_state == READ_STATE_STREAM_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
} }
break; break;
@ -611,7 +635,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) { if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) {
requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE; requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE;
} }
if (call->stream_closed) { if (call->read_state == READ_STATE_STREAM_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_ERROR); finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_ERROR);
} }
break; break;
@ -619,7 +643,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
case GRPC_IOREQ_SEND_INITIAL_METADATA: case GRPC_IOREQ_SEND_INITIAL_METADATA:
case GRPC_IOREQ_SEND_TRAILING_METADATA: case GRPC_IOREQ_SEND_TRAILING_METADATA:
case GRPC_IOREQ_SEND_STATUS: case GRPC_IOREQ_SEND_STATUS:
if (call->stream_closed) { if (call->read_state == READ_STATE_STREAM_CLOSED) {
finish_ioreq_op(call, op, GRPC_OP_ERROR); finish_ioreq_op(call, op, GRPC_OP_ERROR);
} }
break; break;
@ -629,11 +653,8 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
SWAP(grpc_metadata_array, *data.recv_metadata, SWAP(grpc_metadata_array, *data.recv_metadata,
call->buffered_initial_metadata); call->buffered_initial_metadata);
} }
if (call->got_initial_metadata) { if (call->read_state >= READ_STATE_GOT_INITIAL_METADATA) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
} else if (call->stream_closed) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA,
GRPC_OP_ERROR);
} }
break; break;
case GRPC_IOREQ_RECV_TRAILING_METADATA: case GRPC_IOREQ_RECV_TRAILING_METADATA:
@ -642,7 +663,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
SWAP(grpc_metadata_array, *data.recv_metadata, SWAP(grpc_metadata_array, *data.recv_metadata,
call->buffered_trailing_metadata); call->buffered_trailing_metadata);
} }
if (call->read_closed) { if (call->read_state >= READ_STATE_READ_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
} }
break; break;
@ -683,7 +704,7 @@ void grpc_call_destroy(grpc_call *c) {
grpc_alarm_cancel(&c->alarm); grpc_alarm_cancel(&c->alarm);
c->have_alarm = 0; c->have_alarm = 0;
} }
cancel = !c->stream_closed; cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c); unlock(c);
if (cancel) grpc_call_cancel(c); if (cancel) grpc_call_cancel(c);
grpc_call_internal_unref(c, 1); grpc_call_internal_unref(c, 1);
@ -754,7 +775,7 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
} }
static void mark_read_closed(grpc_call *call) { static void mark_read_closed(grpc_call *call) {
call->read_closed = 1; call->read_state = READ_STATE_READ_CLOSED;
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
@ -764,7 +785,7 @@ static void mark_read_closed(grpc_call *call) {
void grpc_call_read_closed(grpc_call_element *elem) { void grpc_call_read_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem); grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call); lock(call);
GPR_ASSERT(!call->read_closed); GPR_ASSERT(call->read_state < READ_STATE_READ_CLOSED);
mark_read_closed(call); mark_read_closed(call);
unlock(call); unlock(call);
} }
@ -772,11 +793,11 @@ void grpc_call_read_closed(grpc_call_element *elem) {
void grpc_call_stream_closed(grpc_call_element *elem) { void grpc_call_stream_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem); grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call); lock(call);
GPR_ASSERT(!call->stream_closed); GPR_ASSERT(call->read_state < READ_STATE_STREAM_CLOSED);
if (!call->read_closed) { if (call->read_state < READ_STATE_READ_CLOSED) {
mark_read_closed(call); mark_read_closed(call);
} }
call->stream_closed = 1; call->read_state = READ_STATE_STREAM_CLOSED;
if (grpc_bbq_empty(&call->incoming_queue)) { if (grpc_bbq_empty(&call->incoming_queue)) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
} }
@ -835,7 +856,7 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md); grpc_mdelem_unref(md);
} else { } else {
if (!call->got_initial_metadata) { if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) {
dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
GRPC_IOREQ_OP_COUNT GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
@ -1079,7 +1100,9 @@ grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element); grpc_call *call = grpc_call_from_top_element(surface_element);
lock(call); lock(call);
call->got_initial_metadata = 1; if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) {
call->read_state = READ_STATE_GOT_INITIAL_METADATA;
}
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
unlock(call); unlock(call);
} }

Loading…
Cancel
Save