Handle cancel correctly

pull/11584/head
Muxi Yan 8 years ago
parent 716f7afa21
commit a1d79d9763
  1. 47
      src/core/ext/transport/cronet/transport/cronet_transport.c

@ -766,20 +766,47 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED];
if (is_canceled_or_failed) {
if (op_id == OP_SEND_INITIAL_METADATA) result = false;
if (op_id == OP_SEND_MESSAGE) result = false;
if (op_id == OP_SEND_TRAILING_METADATA) result = false;
if (op_id == OP_CANCEL_ERROR) result = false;
if (op_id == OP_SEND_INITIAL_METADATA) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_SEND_MESSAGE) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_SEND_TRAILING_METADATA) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_CANCEL_ERROR) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
/* already executed */
if (op_id == OP_RECV_INITIAL_METADATA &&
stream_state->state_op_done[OP_RECV_INITIAL_METADATA])
stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_RECV_MESSAGE &&
stream_state->state_op_done[OP_RECV_MESSAGE])
stream_state->state_op_done[OP_RECV_MESSAGE]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_RECV_TRAILING_METADATA &&
stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
/* If cancelled, we need to wait for the cancel callback (if call is already
* started) */
if (op_id == OP_ON_COMPLETE &&
!(stream_state->state_callback_received[OP_FAILED] ||
stream_state->state_callback_received[OP_CANCELED] ||
!stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
} else if (op_id == OP_SEND_INITIAL_METADATA) {
/* already executed */
if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
@ -868,7 +895,7 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->recv_message &&
!stream_state->state_op_done[OP_RECV_MESSAGE]) {
!op_state->state_op_done[OP_RECV_MESSAGE]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->cancel_stream &&
@ -1067,6 +1094,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
@ -1074,6 +1102,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
@ -1214,8 +1243,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
} else if (stream_op->cancel_stream &&
op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
bidirectional_stream_cancel(s->cbs);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {

Loading…
Cancel
Save