diff --git a/src/core/surface/call.c b/src/core/surface/call.c index e2e8fe23a52..a6389799590 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -58,7 +58,7 @@ typedef struct { /* input buffers */ grpc_metadata_array initial_md_in; grpc_metadata_array trailing_md_in; - + size_t details_capacity; char *details; grpc_status_code status; @@ -76,8 +76,7 @@ typedef enum { SEND_NOTHING, SEND_INITIAL_METADATA, SEND_MESSAGE, - SEND_TRAILING_METADATA, - SEND_STATUS, + SEND_TRAILING_METADATA_AND_FINISH, SEND_FINISH } send_action; @@ -89,7 +88,7 @@ typedef struct { /* See reqinfo.set below for a description */ #define REQSET_EMPTY 255 -#define REQSET_DONE 254 +#define REQSET_DONE 254 /* The state of an ioreq */ typedef struct reqinfo { @@ -158,7 +157,7 @@ struct grpc_call { legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) @@ -191,6 +190,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, } if (call->is_client) { call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE; + call->requests[GRPC_IOREQ_SEND_STATUS].set = REQSET_DONE; } grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); @@ -341,7 +341,8 @@ static void get_final_status(grpc_call *call, grpc_recv_status_args args) { gpr_slice details = call->status[i].details->slice; size_t len = GPR_SLICE_LENGTH(details); if (len + 1 > *args.details_capacity) { - *args.details_capacity = GPR_MAX(len + 1, *args.details_capacity * 3 / 2); + *args.details_capacity = + GPR_MAX(len + 1, *args.details_capacity * 3 / 2); *args.details = gpr_realloc(*args.details, *args.details_capacity); } memcpy(*args.details, GPR_SLICE_START_PTR(details), len); @@ -374,12 +375,10 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE) ? REQSET_EMPTY : REQSET_DONE; - if (master->complete_mask == master->need_mask || - status == GRPC_OP_ERROR) { + if (master->complete_mask == master->need_mask || status == GRPC_OP_ERROR) { if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { get_final_status( - call, - call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status); + call, call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status); } for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { if (call->requests[i].set == op) { @@ -397,7 +396,8 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, } } -static void finish_send_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error error) { +static void finish_send_op(grpc_call *call, grpc_ioreq_op op, + grpc_op_error error) { lock(call); finish_ioreq_op(call, op, error); call->sending = 0; @@ -434,30 +434,20 @@ static send_action choose_send_action(grpc_call *call) { case REQSET_DONE: break; } - switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set) { - case REQSET_EMPTY: - return SEND_NOTHING; - default: - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); - return SEND_TRAILING_METADATA; - case REQSET_DONE: - break; - } - switch (call->requests[GRPC_IOREQ_SEND_STATUS].set) { - case REQSET_EMPTY: - return SEND_NOTHING; - default: - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK); - return SEND_STATUS; - case REQSET_DONE: - break; - } switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) { case REQSET_EMPTY: case REQSET_DONE: return SEND_NOTHING; default: - return SEND_FINISH; + if (call->is_client) { + return SEND_FINISH; + } else if (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set != + REQSET_EMPTY && + call->requests[GRPC_IOREQ_SEND_STATUS].set != REQSET_EMPTY) { + return SEND_TRAILING_METADATA_AND_FINISH; + } else { + return SEND_NOTHING; + } } } @@ -509,7 +499,8 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.user_data = call; grpc_call_execute_op(call, &op); break; - case SEND_TRAILING_METADATA: + case SEND_TRAILING_METADATA_AND_FINISH: + /* send trailing metadata */ data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data; for (i = 0; i < data.send_metadata.count; i++) { const grpc_metadata *md = &data.send_metadata.metadata[i]; @@ -518,12 +509,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { call->metadata_context, md->key, (const gpr_uint8 *)md->value, md->value_length)); } - lock(call); - call->sending = 0; - unlock(call); - grpc_call_internal_unref(call, 0); - break; - case SEND_STATUS: + /* send status */ /* TODO(ctiller): cache common status values */ data = call->requests[GRPC_IOREQ_SEND_CLOSE].data; gpr_ltoa(data.send_status.code, status_str); @@ -534,15 +520,15 @@ static void enact_send_action(grpc_call *call, send_action sa) { grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, status_str))); if (data.send_status.details) { - send_metadata(call, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref( - grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + send_metadata( + call, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, + data.send_status.details))); } - break; + /* fallthrough: see choose_send_action for details */ case SEND_FINISH: op.type = GRPC_SEND_FINISH; op.dir = GRPC_CALL_DOWN; @@ -591,8 +577,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); } else if (requests[op].set == REQSET_DONE) { - return start_ioreq_error(call, have_ops, - GRPC_CALL_ERROR_ALREADY_INVOKED); + return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED); } have_ops |= 1 << op; data = reqs[i].data; @@ -755,11 +740,14 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, lock(call); ls = get_legacy_state(call); - if (ls->md_out_count[ls->md_out_buffer] == ls->md_out_capacity[ls->md_out_buffer]) { + if (ls->md_out_count[ls->md_out_buffer] == + ls->md_out_capacity[ls->md_out_buffer]) { ls->md_out_capacity[ls->md_out_buffer] = - GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2, ls->md_out_capacity[ls->md_out_buffer] + 8); - ls->md_out[ls->md_out_buffer] = - gpr_realloc(ls->md_out[ls->md_out_buffer], sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]); + GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2, + ls->md_out_capacity[ls->md_out_buffer] + 8); + ls->md_out[ls->md_out_buffer] = gpr_realloc( + ls->md_out[ls->md_out_buffer], + sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]); } mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++]; mdout->key = gpr_strdup(metadata->key); @@ -776,8 +764,8 @@ static void maybe_finish_legacy(grpc_call *call) { legacy_state *ls = get_legacy_state(call); if (ls->got_status) { grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, - ls->status, ls->details, - ls->trailing_md_in.metadata, ls->trailing_md_in.count); + ls->status, ls->details, ls->trailing_md_in.metadata, + ls->trailing_md_in.count); } } @@ -800,7 +788,8 @@ static void finish_recv_metadata(grpc_call *call, grpc_op_error status, ls = get_legacy_state(call); if (status == GRPC_OP_OK) { grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, - ls->initial_md_in.count, ls->initial_md_in.metadata); + ls->initial_md_in.count, + ls->initial_md_in.metadata); } else { grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, @@ -1000,11 +989,12 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call, reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA; reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - reqs[1].op = GRPC_IOREQ_SEND_CLOSE; + reqs[1].op = GRPC_IOREQ_SEND_STATUS; reqs[1].data.send_status.code = status; /* MEMLEAK */ reqs[1].data.send_status.details = gpr_strdup(details); - err = start_ioreq(call, reqs, 2, finish_finish, tag); + reqs[2].op = GRPC_IOREQ_SEND_CLOSE; + err = start_ioreq(call, reqs, 3, finish_finish, tag); unlock(call); return err; @@ -1077,7 +1067,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -1119,16 +1109,17 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { grpc_mdelem_unref(md); } else { if (!call->got_initial_metadata) { - dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < GRPC_IOREQ_OP_COUNT + dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < + GRPC_IOREQ_OP_COUNT ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] .data.recv_metadata : &call->buffered_initial_metadata; } else { - dest = - call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set < GRPC_IOREQ_OP_COUNT - ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA] - .data.recv_metadata - : &call->buffered_trailing_metadata; + dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set < + GRPC_IOREQ_OP_COUNT + ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA] + .data.recv_metadata + : &call->buffered_trailing_metadata; } if (dest->count == dest->capacity) { dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);