diff --git a/src/core/surface/call.c b/src/core/surface/call.c index dfee93eb7d8..da966c874ac 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -77,7 +77,7 @@ typedef struct { These structures are manipulated in sets, where a set is a set of operations begin with the same call to start_ioreq and the various public and private api's that call it. Each set has a master reqinfo - in which we set a few additional fields. */ + in which we set a few additional fields - see reqinfo_master. */ typedef struct { /* User supplied parameters */ grpc_ioreq_data data; @@ -93,25 +93,51 @@ typedef struct { } reqinfo; typedef struct { + /* Overall status of the operation: starts OK, may degrade to + non-OK */ grpc_op_error status; + /* Completion function to call at the end of the operation */ grpc_ioreq_completion_func on_complete; void *user_data; + /* a bit mask of which request ops are needed (1 << opid) */ gpr_uint32 need_mask; + /* a bit mask of which request ops are now completed */ gpr_uint32 complete_mask; } reqinfo_master; +/* Status data for a request can come from several sources; this + enumerates them all, and acts as a priority sorting for which + status to return to the application - earlier entries override + later ones */ typedef enum { + /* Status came from the application layer overriding whatever + the wire says */ STATUS_FROM_API_OVERRIDE = 0, + /* Status came from 'the wire' - or somewhere below the surface + layer */ STATUS_FROM_WIRE, STATUS_SOURCE_COUNT } status_source; typedef struct { - gpr_uint8 set; + gpr_uint8 is_set; grpc_status_code code; grpc_mdstr *details; } received_status; +/* How far through the GRPC stream have we read? */ +typedef enum { + /* We are still waiting for initial metadata to complete */ + READ_STATE_INITIAL, + /* We have gotten initial metadata, and are reading either + messages or trailing metadata */ + READ_STATE_GOT_INITIAL_METADATA, + /* The stream is closed for reading */ + READ_STATE_READ_CLOSED, + /* The stream is closed for reading & writing */ + READ_STATE_STREAM_CLOSED +} read_state; + struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; @@ -120,10 +146,8 @@ struct grpc_call { gpr_mu mu; gpr_uint8 is_client; - gpr_uint8 got_initial_metadata; + read_state read_state; gpr_uint8 have_alarm; - gpr_uint8 read_closed; - gpr_uint8 stream_closed; gpr_uint8 sending; gpr_uint8 num_completed_requests; gpr_uint8 need_more_data; @@ -229,7 +253,7 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) { - call->status[source].set = 1; + call->status[source].is_set = 1; call->status[source].code = status; } @@ -308,7 +332,7 @@ static void unlock(grpc_call *call) { static void get_final_status(grpc_call *call, grpc_recv_status_args args) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].set) { + if (call->status[i].is_set) { *args.code = call->status[i].code; if (!args.details) return; if (call->status[i].details) { @@ -581,14 +605,14 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, *data.recv_message = grpc_bbq_pop(&call->incoming_queue); if (*data.recv_message) { finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); - if (call->stream_closed && grpc_bbq_empty(&call->incoming_queue)) { + if (call->read_state == READ_STATE_STREAM_CLOSED && grpc_bbq_empty(&call->incoming_queue)) { finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); } } else { /* no message: either end of stream or we need more bytes */ - if (call->read_closed) { + if (call->read_state >= READ_STATE_READ_CLOSED) { finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); - if (call->stream_closed) { + if (call->read_state == READ_STATE_STREAM_CLOSED) { /* stream closed AND we've drained all messages: signal to the application */ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); } @@ -598,12 +622,12 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, } break; case GRPC_IOREQ_RECV_STATUS: - if (call->read_closed) { + if (call->read_state >= READ_STATE_READ_CLOSED) { finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); } break; case GRPC_IOREQ_RECV_CLOSE: - if (call->stream_closed) { + if (call->read_state == READ_STATE_STREAM_CLOSED) { finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); } break; @@ -611,7 +635,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) { requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE; } - if (call->stream_closed) { + if (call->read_state == READ_STATE_STREAM_CLOSED) { finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_ERROR); } break; @@ -619,7 +643,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, case GRPC_IOREQ_SEND_INITIAL_METADATA: case GRPC_IOREQ_SEND_TRAILING_METADATA: case GRPC_IOREQ_SEND_STATUS: - if (call->stream_closed) { + if (call->read_state == READ_STATE_STREAM_CLOSED) { finish_ioreq_op(call, op, GRPC_OP_ERROR); } break; @@ -629,11 +653,8 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, SWAP(grpc_metadata_array, *data.recv_metadata, call->buffered_initial_metadata); } - if (call->got_initial_metadata) { + if (call->read_state >= READ_STATE_GOT_INITIAL_METADATA) { finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); - } else if (call->stream_closed) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, - GRPC_OP_ERROR); } break; case GRPC_IOREQ_RECV_TRAILING_METADATA: @@ -642,7 +663,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, SWAP(grpc_metadata_array, *data.recv_metadata, call->buffered_trailing_metadata); } - if (call->read_closed) { + if (call->read_state >= READ_STATE_READ_CLOSED) { finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); } break; @@ -683,7 +704,7 @@ void grpc_call_destroy(grpc_call *c) { grpc_alarm_cancel(&c->alarm); c->have_alarm = 0; } - cancel = !c->stream_closed; + cancel = c->read_state != READ_STATE_STREAM_CLOSED; unlock(c); if (cancel) grpc_call_cancel(c); grpc_call_internal_unref(c, 1); @@ -754,7 +775,7 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { } static void mark_read_closed(grpc_call *call) { - call->read_closed = 1; + call->read_state = READ_STATE_READ_CLOSED; finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); @@ -764,7 +785,7 @@ static void mark_read_closed(grpc_call *call) { void grpc_call_read_closed(grpc_call_element *elem) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); lock(call); - GPR_ASSERT(!call->read_closed); + GPR_ASSERT(call->read_state < READ_STATE_READ_CLOSED); mark_read_closed(call); unlock(call); } @@ -772,11 +793,11 @@ void grpc_call_read_closed(grpc_call_element *elem) { void grpc_call_stream_closed(grpc_call_element *elem) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); lock(call); - GPR_ASSERT(!call->stream_closed); - if (!call->read_closed) { + GPR_ASSERT(call->read_state < READ_STATE_STREAM_CLOSED); + if (call->read_state < READ_STATE_READ_CLOSED) { mark_read_closed(call); } - call->stream_closed = 1; + call->read_state = READ_STATE_STREAM_CLOSED; if (grpc_bbq_empty(&call->incoming_queue)) { finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); } @@ -835,7 +856,7 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); grpc_mdelem_unref(md); } else { - if (!call->got_initial_metadata) { + if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) { dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < GRPC_IOREQ_OP_COUNT ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] @@ -1079,7 +1100,9 @@ grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call, void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { grpc_call *call = grpc_call_from_top_element(surface_element); lock(call); - call->got_initial_metadata = 1; + if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) { + call->read_state = READ_STATE_GOT_INITIAL_METADATA; + } finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); unlock(call); }