fixes to handle failed connections

1. added lock/unlock around cronet callbacks to avoid race
2. added on_failed condition check in addition to cancellation
3. changed return code when no cronet call is initiated in failed/cancelled case
pull/8059/head
Makarand Dharmapurikar 9 years ago
parent 1c8bbc4106
commit 99bdd206da
  1. 109
      src/core/ext/transport/cronet/transport/cronet_transport.c

@ -294,7 +294,7 @@ static void remove_from_storage(struct stream_obj *s,
/* /*
Cycle through ops and try to take next action. Break when either Cycle through ops and try to take next action. Break when either
an action with callback is taken, or no action is possible. an action with callback is taken, or no action is possible.
This can be executed from the Cronet network thread via cronet callback This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function. or on the application supplied thread via the perform_stream_op function.
*/ */
static void execute_from_storage(stream_obj *s) { static void execute_from_storage(stream_obj *s) {
@ -329,6 +329,7 @@ static void execute_from_storage(stream_obj *s) {
static void on_failed(cronet_bidirectional_stream *stream, int net_error) { static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
cronet_bidirectional_stream_destroy(s->cbs); cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_FAILED] = true; s->state.state_callback_received[OP_FAILED] = true;
s->cbs = NULL; s->cbs = NULL;
@ -340,6 +341,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
gpr_free(s->state.ws.write_buffer); gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL; s->state.ws.write_buffer = NULL;
} }
gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
@ -349,6 +351,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
static void on_canceled(cronet_bidirectional_stream *stream) { static void on_canceled(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
cronet_bidirectional_stream_destroy(s->cbs); cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_CANCELED] = true; s->state.state_callback_received[OP_CANCELED] = true;
s->cbs = NULL; s->cbs = NULL;
@ -360,6 +363,7 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
gpr_free(s->state.ws.write_buffer); gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL; s->state.ws.write_buffer = NULL;
} }
gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
@ -369,9 +373,11 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
static void on_succeeded(cronet_bidirectional_stream *stream) { static void on_succeeded(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
cronet_bidirectional_stream_destroy(s->cbs); cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true; s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL; s->cbs = NULL;
gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
@ -381,6 +387,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
static void on_request_headers_sent(cronet_bidirectional_stream *stream) { static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
/* Free the memory allocated for headers */ /* Free the memory allocated for headers */
@ -388,6 +395,7 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
gpr_free(s->header_array.headers); gpr_free(s->header_array.headers);
s->header_array.headers = NULL; s->header_array.headers = NULL;
} }
gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
@ -401,6 +409,7 @@ static void on_response_headers_received(
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol); headers, negotiated_protocol);
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
memset(&s->state.rs.initial_metadata, 0, memset(&s->state.rs.initial_metadata, 0,
sizeof(s->state.rs.initial_metadata)); sizeof(s->state.rs.initial_metadata));
grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
@ -412,6 +421,7 @@ static void on_response_headers_received(
grpc_mdstr_from_string(headers->headers[i].value))); grpc_mdstr_from_string(headers->headers[i].value)));
} }
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
@ -422,11 +432,13 @@ static void on_write_completed(cronet_bidirectional_stream *stream,
const char *data) { const char *data) {
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu);
if (s->state.ws.write_buffer) { if (s->state.ws.write_buffer) {
gpr_free(s->state.ws.write_buffer); gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL; s->state.ws.write_buffer = NULL;
} }
s->state.state_callback_received[OP_SEND_MESSAGE] = true; s->state.state_callback_received[OP_SEND_MESSAGE] = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
@ -438,6 +450,7 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count); count);
gpr_mu_lock(&s->mu);
s->state.state_callback_received[OP_RECV_MESSAGE] = true; s->state.state_callback_received[OP_RECV_MESSAGE] = true;
if (count > 0) { if (count > 0) {
s->state.rs.received_bytes += count; s->state.rs.received_bytes += count;
@ -448,11 +461,14 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
cronet_bidirectional_stream_read( cronet_bidirectional_stream_read(
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
s->state.rs.remaining_bytes); s->state.rs.remaining_bytes);
gpr_mu_unlock(&s->mu);
} else { } else {
gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
} else { } else {
s->state.rs.read_stream_closed = true; s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
} }
@ -466,6 +482,7 @@ static void on_response_trailers_received(
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers); trailers);
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
memset(&s->state.rs.trailing_metadata, 0, memset(&s->state.rs.trailing_metadata, 0,
sizeof(s->state.rs.trailing_metadata)); sizeof(s->state.rs.trailing_metadata));
s->state.rs.trailing_metadata_valid = false; s->state.rs.trailing_metadata_valid = false;
@ -481,6 +498,7 @@ static void on_response_trailers_received(
s->state.rs.trailing_metadata_valid = true; s->state.rs.trailing_metadata_valid = true;
} }
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
@ -757,14 +775,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_INITIAL_METADATA)) { OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (!stream_state->state_op_done[OP_CANCEL_ERROR]) { if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED]) {
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED, NULL);
} else {
grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_NONE, NULL); GRPC_ERROR_NONE, NULL);
} else {
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED, NULL);
} }
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK; result = ACTION_TAKEN_NO_CALLBACK;
@ -772,32 +791,39 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_MESSAGE)) { OP_SEND_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
gpr_slice_buffer write_slice_buffer; if (stream_state->state_callback_received[OP_FAILED]) {
gpr_slice slice; result = NO_ACTION_POSSIBLE;
gpr_slice_buffer_init(&write_slice_buffer); CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
grpc_byte_stream_next(NULL, stream_op->send_message, &slice, } else {
stream_op->send_message->length, NULL); gpr_slice_buffer write_slice_buffer;
/* Check that compression flag is OFF. We don't support compression yet. */ gpr_slice slice;
if (stream_op->send_message->flags != 0) { gpr_slice_buffer_init(&write_slice_buffer);
gpr_log(GPR_ERROR, "Compression is not supported"); grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
GPR_ASSERT(stream_op->send_message->flags == 0); stream_op->send_message->length, NULL);
} /* Check that compression flag is OFF. We don't support compression yet. */
gpr_slice_buffer_add(&write_slice_buffer, slice); if (stream_op->send_message->flags != 0) {
if (write_slice_buffer.count != 1) { gpr_log(GPR_ERROR, "Compression is not supported");
/* Empty request not handled yet */ GPR_ASSERT(stream_op->send_message->flags == 0);
gpr_log(GPR_ERROR, "Empty request is not supported"); }
GPR_ASSERT(write_slice_buffer.count == 1); gpr_slice_buffer_add(&write_slice_buffer, slice);
} if (write_slice_buffer.count != 1) {
if (write_slice_buffer.count > 0) { /* Empty request not handled yet */
size_t write_buffer_size; gpr_log(GPR_ERROR, "Empty request is not supported");
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, GPR_ASSERT(write_slice_buffer.count == 1);
&write_buffer_size); }
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)", if (write_slice_buffer.count > 0) {
s->cbs, stream_state->ws.write_buffer); size_t write_buffer_size;
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, &write_buffer_size);
(int)write_buffer_size, false); CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
result = ACTION_TAKEN_WITH_CALLBACK; s->cbs, stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
(int)write_buffer_size, false);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
result = NO_ACTION_POSSIBLE;
}
} }
stream_state->state_op_done[OP_SEND_MESSAGE] = true; stream_state->state_op_done[OP_SEND_MESSAGE] = true;
oas->state.state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true;
@ -805,7 +831,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_MESSAGE)) { OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_CANCELLED, NULL); GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true; stream_state->state_op_done[OP_RECV_MESSAGE] = true;
@ -861,8 +889,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
true; /* Indicates that at least one read request has been made */ true; /* Indicates that at least one read request has been made */
cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes); stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
result = NO_ACTION_POSSIBLE;
} }
result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_state->rs.remaining_bytes == 0) { } else if (stream_state->rs.remaining_bytes == 0) {
CRONET_LOG(GPR_DEBUG, "read operation complete"); CRONET_LOG(GPR_DEBUG, "read operation complete");
gpr_slice read_data_slice = gpr_slice read_data_slice =
@ -903,11 +933,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_TRAILING_METADATA)) { OP_SEND_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs); if (stream_state->state_callback_received[OP_FAILED]) {
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; result = NO_ACTION_POSSIBLE;
cronet_bidirectional_stream_write(s->cbs, "", 0, true); CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
} else {
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
cronet_bidirectional_stream_write(s->cbs, "", 0, true);
result = ACTION_TAKEN_WITH_CALLBACK;
}
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_op->cancel_error && } else if (stream_op->cancel_error &&
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, stream_state, &oas->state,
OP_CANCEL_ERROR)) { OP_CANCEL_ERROR)) {

Loading…
Cancel
Save