Robust status overriding

pull/357/head
Craig Tiller 10 years ago
parent b4d3eb0520
commit 6875272b86
  1. 81
      src/core/surface/call.c

@ -89,6 +89,19 @@ typedef struct reqinfo {
gpr_uint32 complete_mask; gpr_uint32 complete_mask;
} reqinfo; } reqinfo;
typedef enum {
STATUS_FROM_API_OVERRIDE = 0,
STATUS_FROM_WIRE,
STATUS_FROM_FAILED_OP,
STATUS_SOURCE_COUNT
} status_source;
typedef struct {
gpr_uint8 set;
grpc_status_code code;
grpc_mdstr *details;
} received_status;
struct grpc_call { struct grpc_call {
grpc_completion_queue *cq; grpc_completion_queue *cq;
grpc_channel *channel; grpc_channel *channel;
@ -113,8 +126,7 @@ struct grpc_call {
grpc_metadata_array buffered_trailing_metadata; grpc_metadata_array buffered_trailing_metadata;
size_t write_index; size_t write_index;
grpc_status_code status_code; received_status status[STATUS_SOURCE_COUNT];
grpc_mdstr *status_details;
grpc_alarm alarm; grpc_alarm alarm;
@ -176,11 +188,14 @@ void grpc_call_internal_ref(grpc_call *c) {
} }
static void destroy_call(grpc_call *c) { static void destroy_call(grpc_call *c) {
int i;
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
grpc_channel_internal_unref(c->channel); grpc_channel_internal_unref(c->channel);
gpr_mu_destroy(&c->mu); gpr_mu_destroy(&c->mu);
if (c->status_details) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
grpc_mdstr_unref(c->status_details); if (c->status[i].details) {
grpc_mdstr_unref(c->status[i].details);
}
} }
if (c->legacy_state) { if (c->legacy_state) {
gpr_free(c->legacy_state->md_out); gpr_free(c->legacy_state->md_out);
@ -198,6 +213,18 @@ void grpc_call_internal_unref(grpc_call *c) {
} }
} }
static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) {
call->status[source].set = 1;
call->status[source].code = status;
}
static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) {
if (call->status[source].details != NULL) {
grpc_mdstr_unref(call->status[source].details);
}
call->status[source].details = status;
}
static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED; if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED;
call->cq = cq; call->cq = cq;
@ -258,6 +285,19 @@ static void unlock(grpc_call *call) {
} }
} }
static void get_final_status(grpc_call *call, grpc_status_code *code, const char **details) {
int i;
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].set) {
*code = call->status[i].code;
*details = grpc_mdstr_as_c_string(call->status[i].details);
return;
}
}
*code = GRPC_STATUS_UNKNOWN;
*details = NULL;
}
static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
grpc_op_error status) { grpc_op_error status) {
reqinfo *master = call->requests[op].master; reqinfo *master = call->requests[op].master;
@ -277,12 +317,9 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
if (master->complete_mask == master->need_mask || if (master->complete_mask == master->need_mask ||
status == GRPC_OP_ERROR) { status == GRPC_OP_ERROR) {
if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status = get_final_status(call,
call->status_code; &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status,
call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->details = &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->details);
call->status_details
? grpc_mdstr_as_c_string(call->status_details)
: NULL;
} }
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
if (call->requests[i].master == master) { if (call->requests[i].master == master) {
@ -600,20 +637,6 @@ void grpc_call_destroy(grpc_call *c) {
grpc_call_internal_unref(c); grpc_call_internal_unref(c);
} }
static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) {
if (call->got_status_code) return;
call->status_code = status;
call->got_status_code = 1;
}
static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) {
if (call->status_details != NULL) {
grpc_mdstr_unref(status);
return;
}
call->status_details = status;
}
grpc_call_error grpc_call_cancel(grpc_call *c) { grpc_call_error grpc_call_cancel(grpc_call *c) {
grpc_call_element *elem; grpc_call_element *elem;
grpc_call_op op; grpc_call_op op;
@ -637,10 +660,8 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
description ? grpc_mdstr_from_string(c->metadata_context, description) description ? grpc_mdstr_from_string(c->metadata_context, description)
: NULL; : NULL;
lock(c); lock(c);
maybe_set_status_code(c, status); set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
if (details) { set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
maybe_set_status_details(c, details);
}
unlock(c); unlock(c);
return grpc_call_cancel(c); return grpc_call_cancel(c);
} }
@ -1024,10 +1045,10 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
lock(call); lock(call);
if (key == grpc_channel_get_status_string(call->channel)) { if (key == grpc_channel_get_status_string(call->channel)) {
maybe_set_status_code(call, decode_status(md)); set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
grpc_mdelem_unref(md); grpc_mdelem_unref(md);
} else if (key == grpc_channel_get_message_string(call->channel)) { } else if (key == grpc_channel_get_message_string(call->channel)) {
maybe_set_status_details(call, grpc_mdstr_ref(md->value)); set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md); grpc_mdelem_unref(md);
} else { } else {
if (!call->got_initial_metadata) { if (!call->got_initial_metadata) {

Loading…
Cancel
Save