Robust status overriding

pull/357/head
Craig Tiller 10 years ago
parent 6875272b86
commit 928fbc8ed0
  1. 154
      src/core/surface/call.c

@ -183,9 +183,7 @@ legacy_state *get_legacy_state(grpc_call *call) {
return call->legacy_state; return call->legacy_state;
} }
void grpc_call_internal_ref(grpc_call *c) { void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
gpr_ref(&c->internal_refcount);
}
static void destroy_call(grpc_call *c) { static void destroy_call(grpc_call *c) {
int i; int i;
@ -213,12 +211,14 @@ void grpc_call_internal_unref(grpc_call *c) {
} }
} }
static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) { static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) {
call->status[source].set = 1; call->status[source].set = 1;
call->status[source].code = status; call->status[source].code = status;
} }
static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) { if (call->status[source].details != NULL) {
grpc_mdstr_unref(call->status[source].details); grpc_mdstr_unref(call->status[source].details);
} }
@ -285,7 +285,8 @@ static void unlock(grpc_call *call) {
} }
} }
static void get_final_status(grpc_call *call, grpc_status_code *code, const char **details) { static void get_final_status(grpc_call *call, grpc_status_code *code,
const char **details) {
int i; int i;
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].set) { if (call->status[i].set) {
@ -317,9 +318,11 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
if (master->complete_mask == master->need_mask || if (master->complete_mask == master->need_mask ||
status == GRPC_OP_ERROR) { status == GRPC_OP_ERROR) {
if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
get_final_status(call, get_final_status(
&call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status, call,
&call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->details); &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status,
&call->requests[GRPC_IOREQ_RECV_STATUS]
.data.recv_status->details);
} }
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
if (call->requests[i].master == master) { if (call->requests[i].master == master) {
@ -426,11 +429,10 @@ static void enact_send_action(grpc_call *call, send_action sa) {
data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data; data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data;
for (i = 0; i < data.send_metadata.count; i++) { for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i]; const grpc_metadata *md = &data.send_metadata.metadata[i];
send_metadata( send_metadata(call,
call, grpc_mdelem_from_string_and_buffer(
grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, call->metadata_context, md->key,
(const gpr_uint8 *)md->value, (const gpr_uint8 *)md->value, md->value_length));
md->value_length));
} }
op.type = GRPC_SEND_START; op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN; op.dir = GRPC_CALL_DOWN;
@ -454,11 +456,10 @@ static void enact_send_action(grpc_call *call, send_action sa) {
data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data; data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data;
for (i = 0; i < data.send_metadata.count; i++) { for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i]; const grpc_metadata *md = &data.send_metadata.metadata[i];
send_metadata( send_metadata(call,
call, grpc_mdelem_from_string_and_buffer(
grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, call->metadata_context, md->key,
(const gpr_uint8 *)md->value, (const gpr_uint8 *)md->value, md->value_length));
md->value_length));
} }
lock(call); lock(call);
call->sending = 0; call->sending = 0;
@ -478,13 +479,13 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, status_str))); grpc_mdstr_from_string(call->metadata_context, status_str)));
if (data.send_close.details) { if (data.send_close.details) {
send_metadata( send_metadata(call,
call, grpc_mdelem_from_metadata_strings(
grpc_mdelem_from_metadata_strings( call->metadata_context,
call->metadata_context, grpc_mdstr_ref(
grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), grpc_channel_get_message_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, grpc_mdstr_from_string(call->metadata_context,
data.send_close.details))); data.send_close.details)));
} }
} }
op.type = GRPC_SEND_FINISH; op.type = GRPC_SEND_FINISH;
@ -556,44 +557,46 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
for (i = 0; i < nreqs; i++) { for (i = 0; i < nreqs; i++) {
op = reqs[i].op; op = reqs[i].op;
switch (op) { switch (op) {
default: default:
break; break;
case GRPC_IOREQ_RECV_MESSAGES: case GRPC_IOREQ_RECV_MESSAGES:
data.recv_messages->count = 0; data.recv_messages->count = 0;
if (call->buffered_messages.count > 0 || call->read_closed) { if (call->buffered_messages.count > 0 || call->read_closed) {
SWAP(grpc_byte_buffer_array, *data.recv_messages, SWAP(grpc_byte_buffer_array, *data.recv_messages,
call->buffered_messages); call->buffered_messages);
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
} else { } else {
call->need_more_data = 1; call->need_more_data = 1;
} }
break; break;
case GRPC_IOREQ_SEND_MESSAGES: case GRPC_IOREQ_SEND_MESSAGES:
call->write_index = 0; call->write_index = 0;
break; break;
case GRPC_IOREQ_SEND_CLOSE: case GRPC_IOREQ_SEND_CLOSE:
if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) { if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) {
requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE; requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE;
} }
break; break;
case GRPC_IOREQ_RECV_INITIAL_METADATA: case GRPC_IOREQ_RECV_INITIAL_METADATA:
data.recv_metadata->count = 0; data.recv_metadata->count = 0;
if (call->buffered_initial_metadata.count > 0) { if (call->buffered_initial_metadata.count > 0) {
SWAP(grpc_metadata_array, *data.recv_metadata, call->buffered_initial_metadata); SWAP(grpc_metadata_array, *data.recv_metadata,
} call->buffered_initial_metadata);
if (call->got_initial_metadata) { }
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); if (call->got_initial_metadata) {
} finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
break; }
case GRPC_IOREQ_RECV_TRAILING_METADATA: break;
data.recv_metadata->count = 0; case GRPC_IOREQ_RECV_TRAILING_METADATA:
if (call->buffered_trailing_metadata.count > 0) { data.recv_metadata->count = 0;
SWAP(grpc_metadata_array, *data.recv_metadata, call->buffered_trailing_metadata); if (call->buffered_trailing_metadata.count > 0) {
} SWAP(grpc_metadata_array, *data.recv_metadata,
if (call->read_closed) { call->buffered_trailing_metadata);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); }
} if (call->read_closed) {
break; finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
}
break;
} }
} }
@ -698,6 +701,18 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
return GRPC_CALL_OK; return GRPC_CALL_OK;
} }
static void publish_failed_finished(grpc_call *call, grpc_status_code status,
const char *desc) {
grpc_status_code status_code;
const char *details;
set_status_code(call, STATUS_FROM_FAILED_OP, status);
set_status_details(call, STATUS_FROM_FAILED_OP,
grpc_mdstr_from_string(call->metadata_context, desc));
get_final_status(call, &status_code, &details);
grpc_cq_end_finished(call->cq, get_legacy_state(call)->finished_tag, call,
do_nothing, NULL, status_code, details, NULL, 0);
}
static void finish_status(grpc_call *call, grpc_op_error status, void *tag) { static void finish_status(grpc_call *call, grpc_op_error status, void *tag) {
legacy_state *ls; legacy_state *ls;
@ -710,8 +725,7 @@ static void finish_status(grpc_call *call, grpc_op_error status, void *tag) {
ls->status_in.status, ls->status_in.details, ls->status_in.status, ls->status_in.details,
ls->trail_md_in.metadata, ls->trail_md_in.count); ls->trail_md_in.metadata, ls->trail_md_in.count);
} else { } else {
grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL, publish_failed_finished(call, GRPC_STATUS_UNKNOWN, "Read status failed");
GRPC_STATUS_UNKNOWN, "Read status failed", NULL, 0);
} }
} }
@ -756,9 +770,8 @@ static void finish_send_metadata(grpc_call *call, grpc_op_error status,
ls = get_legacy_state(call); ls = get_legacy_state(call);
grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call,
do_nothing, NULL, 0, NULL); do_nothing, NULL, 0, NULL);
grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, publish_failed_finished(call, GRPC_STATUS_UNKNOWN,
GRPC_STATUS_UNKNOWN, "Failed to read initial metadata", "Failed to read initial metadata");
NULL, 0);
} }
unlock(call); unlock(call);
} }
@ -828,8 +841,7 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
return err; return err;
} }
void grpc_call_initial_metadata_complete( void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element); grpc_call *call = grpc_call_from_top_element(surface_element);
lock(call); lock(call);
call->got_initial_metadata = 1; call->got_initial_metadata = 1;

Loading…
Cancel
Save