Get server status send working

pull/357/head
Craig Tiller 10 years ago
parent b8a318acd9
commit 1c1419011a
  1. 115
      src/core/surface/call.c

@ -76,8 +76,7 @@ typedef enum {
SEND_NOTHING, SEND_NOTHING,
SEND_INITIAL_METADATA, SEND_INITIAL_METADATA,
SEND_MESSAGE, SEND_MESSAGE,
SEND_TRAILING_METADATA, SEND_TRAILING_METADATA_AND_FINISH,
SEND_STATUS,
SEND_FINISH SEND_FINISH
} send_action; } send_action;
@ -89,7 +88,7 @@ typedef struct {
/* See reqinfo.set below for a description */ /* See reqinfo.set below for a description */
#define REQSET_EMPTY 255 #define REQSET_EMPTY 255
#define REQSET_DONE 254 #define REQSET_DONE 254
/* The state of an ioreq */ /* The state of an ioreq */
typedef struct reqinfo { typedef struct reqinfo {
@ -158,7 +157,7 @@ struct grpc_call {
legacy_state *legacy_state; legacy_state *legacy_state;
}; };
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \ #define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
@ -191,6 +190,7 @@ grpc_call *grpc_call_create(grpc_channel *channel,
} }
if (call->is_client) { if (call->is_client) {
call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE; call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE;
call->requests[GRPC_IOREQ_SEND_STATUS].set = REQSET_DONE;
} }
grpc_channel_internal_ref(channel); grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel); call->metadata_context = grpc_channel_get_metadata_context(channel);
@ -341,7 +341,8 @@ static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
gpr_slice details = call->status[i].details->slice; gpr_slice details = call->status[i].details->slice;
size_t len = GPR_SLICE_LENGTH(details); size_t len = GPR_SLICE_LENGTH(details);
if (len + 1 > *args.details_capacity) { if (len + 1 > *args.details_capacity) {
*args.details_capacity = GPR_MAX(len + 1, *args.details_capacity * 3 / 2); *args.details_capacity =
GPR_MAX(len + 1, *args.details_capacity * 3 / 2);
*args.details = gpr_realloc(*args.details, *args.details_capacity); *args.details = gpr_realloc(*args.details, *args.details_capacity);
} }
memcpy(*args.details, GPR_SLICE_START_PTR(details), len); memcpy(*args.details, GPR_SLICE_START_PTR(details), len);
@ -374,12 +375,10 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
(op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE) (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE)
? REQSET_EMPTY ? REQSET_EMPTY
: REQSET_DONE; : REQSET_DONE;
if (master->complete_mask == master->need_mask || if (master->complete_mask == master->need_mask || status == GRPC_OP_ERROR) {
status == GRPC_OP_ERROR) {
if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
get_final_status( get_final_status(
call, call, call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status);
call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status);
} }
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
if (call->requests[i].set == op) { if (call->requests[i].set == op) {
@ -397,7 +396,8 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
} }
} }
static void finish_send_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error error) { static void finish_send_op(grpc_call *call, grpc_ioreq_op op,
grpc_op_error error) {
lock(call); lock(call);
finish_ioreq_op(call, op, error); finish_ioreq_op(call, op, error);
call->sending = 0; call->sending = 0;
@ -434,30 +434,20 @@ static send_action choose_send_action(grpc_call *call) {
case REQSET_DONE: case REQSET_DONE:
break; break;
} }
switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set) {
case REQSET_EMPTY:
return SEND_NOTHING;
default:
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
return SEND_TRAILING_METADATA;
case REQSET_DONE:
break;
}
switch (call->requests[GRPC_IOREQ_SEND_STATUS].set) {
case REQSET_EMPTY:
return SEND_NOTHING;
default:
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
return SEND_STATUS;
case REQSET_DONE:
break;
}
switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) { switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) {
case REQSET_EMPTY: case REQSET_EMPTY:
case REQSET_DONE: case REQSET_DONE:
return SEND_NOTHING; return SEND_NOTHING;
default: default:
return SEND_FINISH; if (call->is_client) {
return SEND_FINISH;
} else if (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set !=
REQSET_EMPTY &&
call->requests[GRPC_IOREQ_SEND_STATUS].set != REQSET_EMPTY) {
return SEND_TRAILING_METADATA_AND_FINISH;
} else {
return SEND_NOTHING;
}
} }
} }
@ -509,7 +499,8 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.user_data = call; op.user_data = call;
grpc_call_execute_op(call, &op); grpc_call_execute_op(call, &op);
break; break;
case SEND_TRAILING_METADATA: case SEND_TRAILING_METADATA_AND_FINISH:
/* send trailing metadata */
data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data; data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data;
for (i = 0; i < data.send_metadata.count; i++) { for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i]; const grpc_metadata *md = &data.send_metadata.metadata[i];
@ -518,12 +509,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
call->metadata_context, md->key, call->metadata_context, md->key,
(const gpr_uint8 *)md->value, md->value_length)); (const gpr_uint8 *)md->value, md->value_length));
} }
lock(call); /* send status */
call->sending = 0;
unlock(call);
grpc_call_internal_unref(call, 0);
break;
case SEND_STATUS:
/* TODO(ctiller): cache common status values */ /* TODO(ctiller): cache common status values */
data = call->requests[GRPC_IOREQ_SEND_CLOSE].data; data = call->requests[GRPC_IOREQ_SEND_CLOSE].data;
gpr_ltoa(data.send_status.code, status_str); gpr_ltoa(data.send_status.code, status_str);
@ -534,15 +520,15 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, status_str))); grpc_mdstr_from_string(call->metadata_context, status_str)));
if (data.send_status.details) { if (data.send_status.details) {
send_metadata(call, send_metadata(
grpc_mdelem_from_metadata_strings( call,
call->metadata_context, grpc_mdelem_from_metadata_strings(
grpc_mdstr_ref( call->metadata_context,
grpc_channel_get_message_string(call->channel)), grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, grpc_mdstr_from_string(call->metadata_context,
data.send_status.details))); data.send_status.details)));
} }
break; /* fallthrough: see choose_send_action for details */
case SEND_FINISH: case SEND_FINISH:
op.type = GRPC_SEND_FINISH; op.type = GRPC_SEND_FINISH;
op.dir = GRPC_CALL_DOWN; op.dir = GRPC_CALL_DOWN;
@ -591,8 +577,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
return start_ioreq_error(call, have_ops, return start_ioreq_error(call, have_ops,
GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
} else if (requests[op].set == REQSET_DONE) { } else if (requests[op].set == REQSET_DONE) {
return start_ioreq_error(call, have_ops, return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
GRPC_CALL_ERROR_ALREADY_INVOKED);
} }
have_ops |= 1 << op; have_ops |= 1 << op;
data = reqs[i].data; data = reqs[i].data;
@ -755,11 +740,14 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
lock(call); lock(call);
ls = get_legacy_state(call); ls = get_legacy_state(call);
if (ls->md_out_count[ls->md_out_buffer] == ls->md_out_capacity[ls->md_out_buffer]) { if (ls->md_out_count[ls->md_out_buffer] ==
ls->md_out_capacity[ls->md_out_buffer]) {
ls->md_out_capacity[ls->md_out_buffer] = ls->md_out_capacity[ls->md_out_buffer] =
GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2, ls->md_out_capacity[ls->md_out_buffer] + 8); GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2,
ls->md_out[ls->md_out_buffer] = ls->md_out_capacity[ls->md_out_buffer] + 8);
gpr_realloc(ls->md_out[ls->md_out_buffer], sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]); ls->md_out[ls->md_out_buffer] = gpr_realloc(
ls->md_out[ls->md_out_buffer],
sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]);
} }
mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++]; mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++];
mdout->key = gpr_strdup(metadata->key); mdout->key = gpr_strdup(metadata->key);
@ -776,8 +764,8 @@ static void maybe_finish_legacy(grpc_call *call) {
legacy_state *ls = get_legacy_state(call); legacy_state *ls = get_legacy_state(call);
if (ls->got_status) { if (ls->got_status) {
grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
ls->status, ls->details, ls->status, ls->details, ls->trailing_md_in.metadata,
ls->trailing_md_in.metadata, ls->trailing_md_in.count); ls->trailing_md_in.count);
} }
} }
@ -800,7 +788,8 @@ static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
ls = get_legacy_state(call); ls = get_legacy_state(call);
if (status == GRPC_OP_OK) { if (status == GRPC_OP_OK) {
grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
ls->initial_md_in.count, ls->initial_md_in.metadata); ls->initial_md_in.count,
ls->initial_md_in.metadata);
} else { } else {
grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
@ -1000,11 +989,12 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call,
reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA; reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
reqs[1].op = GRPC_IOREQ_SEND_CLOSE; reqs[1].op = GRPC_IOREQ_SEND_STATUS;
reqs[1].data.send_status.code = status; reqs[1].data.send_status.code = status;
/* MEMLEAK */ /* MEMLEAK */
reqs[1].data.send_status.details = gpr_strdup(details); reqs[1].data.send_status.details = gpr_strdup(details);
err = start_ioreq(call, reqs, 2, finish_finish, tag); reqs[2].op = GRPC_IOREQ_SEND_CLOSE;
err = start_ioreq(call, reqs, 3, finish_finish, tag);
unlock(call); unlock(call);
return err; return err;
@ -1077,7 +1067,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status; gpr_uint32 status;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status); void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) { if (user_data) {
status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
} else { } else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice), GPR_SLICE_LENGTH(md->value->slice),
@ -1119,16 +1109,17 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
grpc_mdelem_unref(md); grpc_mdelem_unref(md);
} else { } else {
if (!call->got_initial_metadata) { if (!call->got_initial_metadata) {
dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < GRPC_IOREQ_OP_COUNT dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
.data.recv_metadata .data.recv_metadata
: &call->buffered_initial_metadata; : &call->buffered_initial_metadata;
} else { } else {
dest = dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set <
call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set < GRPC_IOREQ_OP_COUNT GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA] ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
.data.recv_metadata .data.recv_metadata
: &call->buffered_trailing_metadata; : &call->buffered_trailing_metadata;
} }
if (dest->count == dest->capacity) { if (dest->count == dest->capacity) {
dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);

Loading…
Cancel
Save