pull/381/head
Craig Tiller 10 years ago
parent c230a7451d
commit 8b976d0c24
  1. 41
      src/core/channel/client_channel.c
  2. 11
      src/core/surface/call.c
  3. 9
      src/core/surface/server.c
  4. 25
      test/core/end2end/tests/cancel_after_accept.c
  5. 2
      test/core/end2end/tests/cancel_after_invoke.c

@ -210,11 +210,30 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
chand->waiting_child_count = new_count; chand->waiting_child_count = new_count;
} }
static void send_up_cancelled_ops(grpc_call_element *elem) {
grpc_call_op finish_op;
channel_data *chand = elem->channel_data;
/* send up a synthesized status */
finish_op.type = GRPC_RECV_METADATA;
finish_op.dir = GRPC_CALL_UP;
finish_op.flags = 0;
finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
grpc_call_next_op(elem, &finish_op);
/* send up a finish */
finish_op.type = GRPC_RECV_FINISH;
finish_op.dir = GRPC_CALL_UP;
finish_op.flags = 0;
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
grpc_call_next_op(elem, &finish_op);
}
static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
grpc_call_element *child_elem; grpc_call_element *child_elem;
grpc_call_op finish_op;
gpr_mu_lock(&chand->mu); gpr_mu_lock(&chand->mu);
switch (calld->state) { switch (calld->state) {
@ -225,27 +244,15 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
return; /* early out */ return; /* early out */
case CALL_WAITING: case CALL_WAITING:
remove_waiting_child(chand, calld); remove_waiting_child(chand, calld);
gpr_mu_unlock(&chand->mu);
send_up_cancelled_ops(elem);
calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data, calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
GRPC_OP_ERROR); GRPC_OP_ERROR);
/* fallthrough intended */ return; /* early out */
case CALL_CREATED: case CALL_CREATED:
calld->state = CALL_CANCELLED; calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu); gpr_mu_unlock(&chand->mu);
/* send up a synthesized status */ send_up_cancelled_ops(elem);
finish_op.type = GRPC_RECV_METADATA;
finish_op.dir = GRPC_CALL_UP;
finish_op.flags = 0;
finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
grpc_call_next_op(elem, &finish_op);
/* send up a finish */
finish_op.type = GRPC_RECV_FINISH;
finish_op.dir = GRPC_CALL_UP;
finish_op.flags = 0;
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
grpc_call_next_op(elem, &finish_op);
return; /* early out */ return; /* early out */
case CALL_CANCELLED: case CALL_CANCELLED:
gpr_mu_unlock(&chand->mu); gpr_mu_unlock(&chand->mu);

@ -295,10 +295,19 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source, static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) { gpr_uint32 status) {
int flush;
call->status[source].is_set = 1; call->status[source].is_set = 1;
call->status[source].code = status; call->status[source].code = status;
if (status != GRPC_OP_OK) { if (call->is_client) {
flush = status == GRPC_STATUS_CANCELLED;
} else {
flush = status != GRPC_STATUS_OK;
}
if (flush && !grpc_bbq_empty(&call->incoming_queue)) {
gpr_log(GPR_ERROR, "Flushing unread messages due to error status %d", status);
grpc_bbq_flush(&call->incoming_queue); grpc_bbq_flush(&call->incoming_queue);
} }
} }

@ -346,6 +346,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
static void channel_op(grpc_channel_element *elem, static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) { grpc_channel_element *from_elem, grpc_channel_op *op) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
grpc_server *server = chand->server;
switch (op->type) { switch (op->type) {
case GRPC_ACCEPT_CALL: case GRPC_ACCEPT_CALL:
@ -356,11 +357,11 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_TRANSPORT_CLOSED: case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the /* if the transport is closed for a server channel, we destroy the
channel */ channel */
gpr_mu_lock(&chand->server->mu); gpr_mu_lock(&server->mu);
server_ref(chand->server); server_ref(server);
destroy_channel(chand); destroy_channel(chand);
gpr_mu_unlock(&chand->server->mu); gpr_mu_unlock(&server->mu);
server_unref(chand->server); server_unref(server);
break; break;
case GRPC_TRANSPORT_GOAWAY: case GRPC_TRANSPORT_GOAWAY:
gpr_slice_unref(op->data.goaway.message); gpr_slice_unref(op->data.goaway.message);

@ -105,8 +105,7 @@ static void end_test(grpc_end2end_test_fixture *f) {
/* Cancel after accept, no payload */ /* Cancel after accept, no payload */
static void test_cancel_after_accept(grpc_end2end_test_config config, static void test_cancel_after_accept(grpc_end2end_test_config config,
cancellation_mode mode, int client_ops, cancellation_mode mode) {
int server_ops) {
grpc_op ops[6]; grpc_op ops[6];
grpc_op *op; grpc_op *op;
grpc_call *c; grpc_call *c;
@ -162,7 +161,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
op->op = GRPC_OP_RECV_MESSAGE; op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &response_payload_recv; op->data.recv_message = &response_payload_recv;
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, client_ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details, &call_details,
@ -172,6 +171,9 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
cq_verify(v_server); cq_verify(v_server);
op = ops; op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled; op->data.recv_close_on_server.cancelled = &was_cancelled;
op++; op++;
@ -181,10 +183,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
op->op = GRPC_OP_SEND_MESSAGE; op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload; op->data.send_message = response_payload;
op++; op++;
op->op = GRPC_OP_RECV_MESSAGE; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(3)));
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, server_ops, tag(3)));
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c)); GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
@ -204,24 +203,24 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
grpc_call_details_destroy(&call_details); grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_byte_buffer_destroy(response_payload_recv); grpc_byte_buffer_destroy(response_payload_recv);
gpr_free(details); gpr_free(details);
grpc_call_destroy(c); grpc_call_destroy(c);
grpc_call_destroy(s);
cq_verifier_destroy(v_client); cq_verifier_destroy(v_client);
cq_verifier_destroy(v_server);
end_test(&f); end_test(&f);
config.tear_down_data(&f); config.tear_down_data(&f);
} }
void grpc_end2end_tests(grpc_end2end_test_config config) { void grpc_end2end_tests(grpc_end2end_test_config config) {
unsigned i, j, k; unsigned i;
for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) { for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) {
for (j = 2; j <= 6; j++) { test_cancel_after_accept(config, cancellation_modes[i]);
for (k = 1; k <= 4; k++) {
test_cancel_after_accept(config, cancellation_modes[i], j, k);
}
}
} }
} }

@ -183,7 +183,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
void grpc_end2end_tests(grpc_end2end_test_config config) { void grpc_end2end_tests(grpc_end2end_test_config config) {
unsigned i, j; unsigned i, j;
for (j = 1; j < 6; j++) { for (j = 2; j < 6; j++) {
for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) { for (i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); i++) {
test_cancel_after_invoke(config, cancellation_modes[i], j); test_cancel_after_invoke(config, cancellation_modes[i], j);
} }

Loading…
Cancel
Save