From 8b976d0c244e516c2195c699bf396cf1dac2baa5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 5 Feb 2015 21:41:23 -0800 Subject: [PATCH] Fixes --- src/core/channel/client_channel.c | 41 +++++++++++-------- src/core/surface/call.c | 11 ++++- src/core/surface/server.c | 9 ++-- test/core/end2end/tests/cancel_after_accept.c | 25 ++++++----- test/core/end2end/tests/cancel_after_invoke.c | 2 +- 5 files changed, 52 insertions(+), 36 deletions(-) diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 507b91b8a63..c2c23f51564 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -210,11 +210,30 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { chand->waiting_child_count = new_count; } +static void send_up_cancelled_ops(grpc_call_element *elem) { + grpc_call_op finish_op; + channel_data *chand = elem->channel_data; + /* send up a synthesized status */ + finish_op.type = GRPC_RECV_METADATA; + finish_op.dir = GRPC_CALL_UP; + finish_op.flags = 0; + finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status); + finish_op.done_cb = do_nothing; + finish_op.user_data = NULL; + grpc_call_next_op(elem, &finish_op); + /* send up a finish */ + finish_op.type = GRPC_RECV_FINISH; + finish_op.dir = GRPC_CALL_UP; + finish_op.flags = 0; + finish_op.done_cb = do_nothing; + finish_op.user_data = NULL; + grpc_call_next_op(elem, &finish_op); +} + static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_call_element *child_elem; - grpc_call_op finish_op; gpr_mu_lock(&chand->mu); switch (calld->state) { @@ -225,27 +244,15 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { return; /* early out */ case CALL_WAITING: remove_waiting_child(chand, calld); + gpr_mu_unlock(&chand->mu); + send_up_cancelled_ops(elem); calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data, GRPC_OP_ERROR); - /* fallthrough intended */ + return; /* early out */ case CALL_CREATED: calld->state = CALL_CANCELLED; gpr_mu_unlock(&chand->mu); - /* send up a synthesized status */ - finish_op.type = GRPC_RECV_METADATA; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status); - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); - /* send up a finish */ - finish_op.type = GRPC_RECV_FINISH; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); + send_up_cancelled_ops(elem); return; /* early out */ case CALL_CANCELLED: gpr_mu_unlock(&chand->mu); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index bcb7c877334..b3f272e0682 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -295,10 +295,19 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) { + int flush; + call->status[source].is_set = 1; call->status[source].code = status; - if (status != GRPC_OP_OK) { + if (call->is_client) { + flush = status == GRPC_STATUS_CANCELLED; + } else { + flush = status != GRPC_STATUS_OK; + } + + if (flush && !grpc_bbq_empty(&call->incoming_queue)) { + gpr_log(GPR_ERROR, "Flushing unread messages due to error status %d", status); grpc_bbq_flush(&call->incoming_queue); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index c0c524ad8de..ee0f96a5803 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -346,6 +346,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, static void channel_op(grpc_channel_element *elem, grpc_channel_element *from_elem, grpc_channel_op *op) { channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; switch (op->type) { case GRPC_ACCEPT_CALL: @@ -356,11 +357,11 @@ static void channel_op(grpc_channel_element *elem, case GRPC_TRANSPORT_CLOSED: /* if the transport is closed for a server channel, we destroy the channel */ - gpr_mu_lock(&chand->server->mu); - server_ref(chand->server); + gpr_mu_lock(&server->mu); + server_ref(server); destroy_channel(chand); - gpr_mu_unlock(&chand->server->mu); - server_unref(chand->server); + gpr_mu_unlock(&server->mu); + server_unref(server); break; case GRPC_TRANSPORT_GOAWAY: gpr_slice_unref(op->data.goaway.message); diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index 7e13021dc8e..eb26ff14f00 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -105,8 +105,7 @@ static void end_test(grpc_end2end_test_fixture *f) { /* Cancel after accept, no payload */ static void test_cancel_after_accept(grpc_end2end_test_config config, - cancellation_mode mode, int client_ops, - int server_ops) { + cancellation_mode mode) { grpc_op ops[6]; grpc_op *op; grpc_call *c; @@ -162,7 +161,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; op++; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, client_ops, tag(1))); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, @@ -172,6 +171,9 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, cq_verify(v_server); op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv; + op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; op++; @@ -181,10 +183,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &request_payload_recv; - op++; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, server_ops, tag(3))); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(3))); GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c)); @@ -204,24 +203,24 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, grpc_call_details_destroy(&call_details); grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); grpc_byte_buffer_destroy(response_payload_recv); gpr_free(details); grpc_call_destroy(c); + grpc_call_destroy(s); cq_verifier_destroy(v_client); + cq_verifier_destroy(v_server); end_test(&f); config.tear_down_data(&f); } void grpc_end2end_tests(grpc_end2end_test_config config) { - unsigned i, j, k; + unsigned i; for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) { - for (j = 2; j <= 6; j++) { - for (k = 1; k <= 4; k++) { - test_cancel_after_accept(config, cancellation_modes[i], j, k); - } - } + test_cancel_after_accept(config, cancellation_modes[i]); } } diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c index ace10f172d5..96a8186d15e 100644 --- a/test/core/end2end/tests/cancel_after_invoke.c +++ b/test/core/end2end/tests/cancel_after_invoke.c @@ -183,7 +183,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config, void grpc_end2end_tests(grpc_end2end_test_config config) { unsigned i, j; - for (j = 1; j < 6; j++) { + for (j = 2; j < 6; j++) { for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) { test_cancel_after_invoke(config, cancellation_modes[i], j); }