From 928fbc8ed0588f016fa2dd71517f532b25d6eb3c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 29 Jan 2015 15:06:42 -0800 Subject: [PATCH] Robust status overriding --- src/core/surface/call.c | 154 ++++++++++++++++++++++------------------ 1 file changed, 83 insertions(+), 71 deletions(-) diff --git a/src/core/surface/call.c b/src/core/surface/call.c index be624bfe9b0..6a1fa361de6 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -183,9 +183,7 @@ legacy_state *get_legacy_state(grpc_call *call) { return call->legacy_state; } -void grpc_call_internal_ref(grpc_call *c) { - gpr_ref(&c->internal_refcount); -} +void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } static void destroy_call(grpc_call *c) { int i; @@ -213,12 +211,14 @@ void grpc_call_internal_unref(grpc_call *c) { } } -static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) { +static void set_status_code(grpc_call *call, status_source source, + gpr_uint32 status) { call->status[source].set = 1; call->status[source].code = status; } -static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { +static void set_status_details(grpc_call *call, status_source source, + grpc_mdstr *status) { if (call->status[source].details != NULL) { grpc_mdstr_unref(call->status[source].details); } @@ -285,7 +285,8 @@ static void unlock(grpc_call *call) { } } -static void get_final_status(grpc_call *call, grpc_status_code *code, const char **details) { +static void get_final_status(grpc_call *call, grpc_status_code *code, + const char **details) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].set) { @@ -317,9 +318,11 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, if (master->complete_mask == master->need_mask || status == GRPC_OP_ERROR) { if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { - get_final_status(call, - &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status, - &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->details); + get_final_status( + call, + &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status, + &call->requests[GRPC_IOREQ_RECV_STATUS] + .data.recv_status->details); } for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { if (call->requests[i].master == master) { @@ -426,11 +429,10 @@ static void enact_send_action(grpc_call *call, send_action sa) { data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data; for (i = 0; i < data.send_metadata.count; i++) { const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata( - call, - grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, - (const gpr_uint8 *)md->value, - md->value_length)); + send_metadata(call, + grpc_mdelem_from_string_and_buffer( + call->metadata_context, md->key, + (const gpr_uint8 *)md->value, md->value_length)); } op.type = GRPC_SEND_START; op.dir = GRPC_CALL_DOWN; @@ -454,11 +456,10 @@ static void enact_send_action(grpc_call *call, send_action sa) { data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data; for (i = 0; i < data.send_metadata.count; i++) { const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata( - call, - grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, - (const gpr_uint8 *)md->value, - md->value_length)); + send_metadata(call, + grpc_mdelem_from_string_and_buffer( + call->metadata_context, md->key, + (const gpr_uint8 *)md->value, md->value_length)); } lock(call); call->sending = 0; @@ -478,13 +479,13 @@ static void enact_send_action(grpc_call *call, send_action sa) { grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, status_str))); if (data.send_close.details) { - send_metadata( - call, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_close.details))); + send_metadata(call, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref( + grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, + data.send_close.details))); } } op.type = GRPC_SEND_FINISH; @@ -556,44 +557,46 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, for (i = 0; i < nreqs; i++) { op = reqs[i].op; switch (op) { - default: - break; - case GRPC_IOREQ_RECV_MESSAGES: - data.recv_messages->count = 0; - if (call->buffered_messages.count > 0 || call->read_closed) { - SWAP(grpc_byte_buffer_array, *data.recv_messages, - call->buffered_messages); - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); - } else { - call->need_more_data = 1; - } - break; - case GRPC_IOREQ_SEND_MESSAGES: - call->write_index = 0; - break; - case GRPC_IOREQ_SEND_CLOSE: - if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) { - requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE; - } - break; - case GRPC_IOREQ_RECV_INITIAL_METADATA: - data.recv_metadata->count = 0; - if (call->buffered_initial_metadata.count > 0) { - SWAP(grpc_metadata_array, *data.recv_metadata, call->buffered_initial_metadata); - } - if (call->got_initial_metadata) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); - } - break; - case GRPC_IOREQ_RECV_TRAILING_METADATA: - data.recv_metadata->count = 0; - if (call->buffered_trailing_metadata.count > 0) { - SWAP(grpc_metadata_array, *data.recv_metadata, call->buffered_trailing_metadata); - } - if (call->read_closed) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); - } - break; + default: + break; + case GRPC_IOREQ_RECV_MESSAGES: + data.recv_messages->count = 0; + if (call->buffered_messages.count > 0 || call->read_closed) { + SWAP(grpc_byte_buffer_array, *data.recv_messages, + call->buffered_messages); + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); + } else { + call->need_more_data = 1; + } + break; + case GRPC_IOREQ_SEND_MESSAGES: + call->write_index = 0; + break; + case GRPC_IOREQ_SEND_CLOSE: + if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) { + requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE; + } + break; + case GRPC_IOREQ_RECV_INITIAL_METADATA: + data.recv_metadata->count = 0; + if (call->buffered_initial_metadata.count > 0) { + SWAP(grpc_metadata_array, *data.recv_metadata, + call->buffered_initial_metadata); + } + if (call->got_initial_metadata) { + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); + } + break; + case GRPC_IOREQ_RECV_TRAILING_METADATA: + data.recv_metadata->count = 0; + if (call->buffered_trailing_metadata.count > 0) { + SWAP(grpc_metadata_array, *data.recv_metadata, + call->buffered_trailing_metadata); + } + if (call->read_closed) { + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); + } + break; } } @@ -698,6 +701,18 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, return GRPC_CALL_OK; } +static void publish_failed_finished(grpc_call *call, grpc_status_code status, + const char *desc) { + grpc_status_code status_code; + const char *details; + set_status_code(call, STATUS_FROM_FAILED_OP, status); + set_status_details(call, STATUS_FROM_FAILED_OP, + grpc_mdstr_from_string(call->metadata_context, desc)); + get_final_status(call, &status_code, &details); + grpc_cq_end_finished(call->cq, get_legacy_state(call)->finished_tag, call, + do_nothing, NULL, status_code, details, NULL, 0); +} + static void finish_status(grpc_call *call, grpc_op_error status, void *tag) { legacy_state *ls; @@ -710,8 +725,7 @@ static void finish_status(grpc_call *call, grpc_op_error status, void *tag) { ls->status_in.status, ls->status_in.details, ls->trail_md_in.metadata, ls->trail_md_in.count); } else { - grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL, - GRPC_STATUS_UNKNOWN, "Read status failed", NULL, 0); + publish_failed_finished(call, GRPC_STATUS_UNKNOWN, "Read status failed"); } } @@ -756,9 +770,8 @@ static void finish_send_metadata(grpc_call *call, grpc_op_error status, ls = get_legacy_state(call); grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, do_nothing, NULL, 0, NULL); - grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, - GRPC_STATUS_UNKNOWN, "Failed to read initial metadata", - NULL, 0); + publish_failed_finished(call, GRPC_STATUS_UNKNOWN, + "Failed to read initial metadata"); } unlock(call); } @@ -828,8 +841,7 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, return err; } -void grpc_call_initial_metadata_complete( - grpc_call_element *surface_element) { +void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { grpc_call *call = grpc_call_from_top_element(surface_element); lock(call); call->got_initial_metadata = 1;