From 2ea37fd2ce9046ccf2a0b89ba43c93d8fe80408a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 24 Apr 2015 13:03:49 -0700 Subject: [PATCH] Bug fixing --- src/core/channel/channel_stack.h | 3 -- src/core/security/auth.c | 14 ++--- src/core/surface/call.c | 1 - src/core/transport/chttp2_transport.c | 65 +++++++++++++----------- src/core/transport/transport.c | 9 ++++ src/core/transport/transport.h | 3 ++ src/core/transport/transport_op_string.c | 4 ++ 7 files changed, 56 insertions(+), 43 deletions(-) diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 94b12639fc8..de0e4e45184 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -222,9 +222,6 @@ void grpc_call_log_op(char *file, int line, gpr_log_severity severity, grpc_call_element *elem, grpc_transport_op *op); void grpc_call_element_send_cancel(grpc_call_element *cur_elem); -void grpc_call_element_recv_status(grpc_call_element *cur_elem, - grpc_status_code status, - const char *message); extern int grpc_trace_channel; diff --git a/src/core/security/auth.c b/src/core/security/auth.c index 4dbc25675b9..b6a002d43c4 100644 --- a/src/core/security/auth.c +++ b/src/core/security/auth.c @@ -67,11 +67,6 @@ typedef struct { grpc_mdstr *status_key; } channel_data; -static void bubbleup_error(grpc_call_element *elem, const char *error_msg) { - grpc_call_element_recv_status(elem, GRPC_STATUS_UNAUTHENTICATED, error_msg); - grpc_call_element_send_cancel(elem); -} - static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems, size_t num_md, grpc_credentials_status status) { @@ -141,6 +136,7 @@ static void send_security_metadata(grpc_call_element *elem, grpc_transport_op *o static void on_host_checked(void *user_data, grpc_security_status status) { grpc_call_element *elem = (grpc_call_element *)user_data; call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; if (status == GRPC_SECURITY_OK) { send_security_metadata(elem, &calld->op); @@ -148,9 +144,9 @@ static void on_host_checked(void *user_data, grpc_security_status status) { char *error_msg; gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", grpc_mdstr_as_c_string(calld->host)); - bubbleup_error(elem, error_msg); + grpc_transport_op_add_cancellation(&calld->op, GRPC_STATUS_UNAUTHENTICATED, grpc_mdstr_from_string(chand->md_ctx, error_msg)); gpr_free(error_msg); - grpc_transport_op_finish_with_failure(&calld->op); + grpc_call_next_op(elem, &calld->op); } } @@ -199,9 +195,9 @@ static void auth_start_transport_op(grpc_call_element *elem, gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", call_host); - bubbleup_error(elem, error_msg); + grpc_transport_op_add_cancellation(&calld->op, GRPC_STATUS_UNAUTHENTICATED, grpc_mdstr_from_string(channeld->md_ctx, error_msg)); gpr_free(error_msg); - grpc_transport_op_finish_with_failure(&calld->op); + grpc_call_next_op(elem, &calld->op); } return; /* early exit */ } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 2f514465fc7..8eee67bb831 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -424,7 +424,6 @@ static void unlock(grpc_call *call) { memset(&op, 0, sizeof(op)); if (!call->receiving && - (call->write_state >= WRITE_STATE_STARTED || !call->is_client) && need_more_data(call)) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 237def41aa6..7b50e285d0d 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -361,7 +361,8 @@ static void cancel_stream_id(transport *t, gpr_uint32 id, grpc_chttp2_error_code error_code, int send_rst); static void cancel_stream(transport *t, stream *s, grpc_status_code local_status, - grpc_chttp2_error_code error_code, int send_rst); + grpc_chttp2_error_code error_code, + grpc_mdstr *optional_message, int send_rst); static void finalize_cancellations(transport *t); static stream *lookup_stream(transport *t, gpr_uint32 id); static void remove_from_stream_map(transport *t, stream *s); @@ -1011,6 +1012,12 @@ static void maybe_start_some_streams(transport *t) { } static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { + if (op->cancel_with_status != GRPC_STATUS_OK) { + cancel_stream( + t, s, op->cancel_with_status, + grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), op->cancel_message, 1); + } + if (op->send_ops) { GPR_ASSERT(s->outgoing_sopb == NULL); s->send_done_closure.cb = op->on_done_send; @@ -1037,26 +1044,16 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { GPR_ASSERT(s->incoming_sopb == NULL); s->recv_done_closure.cb = op->on_done_recv; s->recv_done_closure.user_data = op->recv_user_data; - if (!s->cancelled) { - s->incoming_sopb = op->recv_ops; - s->incoming_sopb->nops = 0; - s->publish_state = op->recv_state; - maybe_finish_read(t, s); - maybe_join_window_updates(t, s); - } else { - schedule_cb(t, s->recv_done_closure, 0); - } + s->incoming_sopb = op->recv_ops; + s->incoming_sopb->nops = 0; + s->publish_state = op->recv_state; + maybe_finish_read(t, s); + maybe_join_window_updates(t, s); } if (op->bind_pollset) { add_to_pollset_locked(t, op->bind_pollset); } - - if (op->cancel_with_status != GRPC_STATUS_OK) { - cancel_stream( - t, s, op->cancel_with_status, - grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), 1); - } } static void perform_op(grpc_transport *gt, grpc_stream *gs, @@ -1123,6 +1120,7 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) { static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, + grpc_mdstr *optional_message, int send_rst) { int had_outgoing; char buffer[GPR_LTOA_MIN_BUFSIZE]; @@ -1147,14 +1145,18 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, add_incoming_metadata( t, s, grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); - switch (local_status) { - case GRPC_STATUS_CANCELLED: - add_incoming_metadata( - t, s, grpc_mdelem_from_strings(t->metadata_context, - "grpc-message", "Cancelled")); - break; - default: - break; + if (!optional_message) { + switch (local_status) { + case GRPC_STATUS_CANCELLED: + add_incoming_metadata( + t, s, grpc_mdelem_from_strings(t->metadata_context, + "grpc-message", "Cancelled")); + break; + default: + break; + } + } else { + add_incoming_metadata(t, s, grpc_mdelem_from_metadata_strings(t->metadata_context, grpc_mdstr_from_string(t->metadata_context, "grpc-message"), grpc_mdstr_ref(optional_message))); } add_metadata_batch(t, s); maybe_finish_read(t, s); @@ -1165,24 +1167,27 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create(id, error_code)); } + if (optional_message) { + grpc_mdstr_unref(optional_message); + } } static void cancel_stream_id(transport *t, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, int send_rst) { cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code, - send_rst); + NULL, send_rst); } static void cancel_stream(transport *t, stream *s, grpc_status_code local_status, - grpc_chttp2_error_code error_code, int send_rst) { - cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst); + grpc_chttp2_error_code error_code, grpc_mdstr *optional_message, int send_rst) { + cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, send_rst); } static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) { cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE, - GRPC_CHTTP2_INTERNAL_ERROR, 0); + GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0); } static void end_all_the_calls(transport *t) { @@ -1285,7 +1290,7 @@ static int init_data_frame_parser(transport *t) { case GRPC_CHTTP2_STREAM_ERROR: cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( GRPC_CHTTP2_INTERNAL_ERROR), - GRPC_CHTTP2_INTERNAL_ERROR, 1); + GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1); return init_skip_frame(t, 0); case GRPC_CHTTP2_CONNECTION_ERROR: drop_connection(t); @@ -1598,7 +1603,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { if (!is_window_update_legal(st.window_update, s->outgoing_window)) { cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( GRPC_CHTTP2_FLOW_CONTROL_ERROR), - GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1); + GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1); } else { s->outgoing_window += st.window_update; /* if this window update makes outgoing ops writable again, diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 987dd4c9180..cc9392177fd 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -94,3 +94,12 @@ void grpc_transport_op_finish_with_failure(grpc_transport_op *op) { op->on_done_recv(op->recv_user_data, 0); } } + +void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message) { + if (op->cancel_with_status == GRPC_STATUS_OK) { + op->cancel_with_status = status; + op->cancel_message = message; + } else if (message) { + grpc_mdstr_unref(message); + } +} diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 5036dfc2de2..7c4bed1863a 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -75,6 +75,7 @@ typedef struct grpc_transport_op { grpc_pollset *bind_pollset; grpc_status_code cancel_with_status; + grpc_mdstr *cancel_message; } grpc_transport_op; /* Callbacks made from the transport to the upper layers of grpc. */ @@ -134,6 +135,8 @@ void grpc_transport_destroy_stream(grpc_transport *transport, void grpc_transport_op_finish_with_failure(grpc_transport_op *op); +void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message); + /* TODO(ctiller): remove this */ void grpc_transport_add_to_pollset(grpc_transport *transport, grpc_pollset *pollset); diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 54f501f8980..b9283b7abf0 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -139,6 +139,10 @@ char *grpc_transport_op_string(grpc_transport_op *op) { first = 0; gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); gpr_strvec_add(&b, tmp); + if (op->cancel_message) { + gpr_asprintf(&tmp, ";msg='%s'", grpc_mdstr_as_c_string(op->cancel_message)); + gpr_strvec_add(&b, tmp); + } } out = gpr_strvec_flatten(&b, NULL);