Merge pull request #10009 from muxi/fix-flush-read

Fix flush read
pull/9734/merge
Muxi Yan 8 years ago committed by GitHub
commit c5591f5143
  1. 130
      src/core/ext/transport/cronet/transport/cronet_transport.c

@ -54,6 +54,7 @@
#include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5
#define GRPC_FLUSH_READ_SIZE 4096
#define CRONET_LOG(...) \
do { \
@ -151,11 +152,17 @@ struct write_state {
struct op_state {
bool state_op_done[OP_NUM_OPS];
bool state_callback_received[OP_NUM_OPS];
/* A non-zero gRPC status code has been seen */
bool fail_state;
/* Transport is discarding all buffered messages */
bool flush_read;
bool flush_cronet_when_ready;
bool pending_write_for_trailer;
bool unprocessed_send_message;
bool pending_send_message;
/* User requested RECV_TRAILING_METADATA */
bool pending_recv_trailing_metadata;
/* Cronet has not issued a callback of a bidirectional read */
bool pending_read_from_cronet;
grpc_error *cancel_error;
/* data structure for storing data coming from server */
struct read_state rs;
@ -248,11 +255,35 @@ static const char *op_id_string(enum e_op_id i) {
return "UNKNOWN";
}
static void free_read_buffer(stream_obj *s) {
static void null_and_maybe_free_read_buffer(stream_obj *s) {
if (s->state.rs.read_buffer &&
s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
gpr_free(s->state.rs.read_buffer);
s->state.rs.read_buffer = NULL;
}
s->state.rs.read_buffer = NULL;
}
static void maybe_flush_read(stream_obj *s) {
/* To enter flush read state (discarding all the buffered messages in
* transport layer), two conditions must be satisfied: 1) non-zero grpc status
* has been received, and 2) an op requesting the status code
* (RECV_TRAILING_METADATA) is issued by the user. (See
* doc/status_ordering.md) */
/* Whenever the evaluation of any of the two condition is changed, we check
* whether we should enter the flush read state. */
if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
if (!s->state.flush_read) {
CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
s->state.flush_read = true;
null_and_maybe_free_read_buffer(s);
s->state.rs.read_buffer = gpr_malloc(GRPC_FLUSH_READ_SIZE);
if (!s->state.pending_read_from_cronet) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
GRPC_FLUSH_READ_SIZE);
s->state.pending_read_from_cronet = true;
}
}
}
}
@ -279,7 +310,11 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
storage->head = new_op;
storage->num_pending_ops++;
if (op->send_message) {
s->state.unprocessed_send_message = true;
s->state.pending_send_message = true;
}
if (op->recv_trailing_metadata) {
s->state.pending_recv_trailing_metadata = true;
maybe_flush_read(s);
}
CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
storage->num_pending_ops);
@ -367,7 +402,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
free_read_buffer(s);
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@ -390,7 +425,7 @@ static void on_canceled(bidirectional_stream *stream) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
free_read_buffer(s);
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@ -405,7 +440,7 @@ static void on_succeeded(bidirectional_stream *stream) {
bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL;
free_read_buffer(s);
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@ -473,6 +508,7 @@ static void on_response_headers_received(
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
s->state.rs.remaining_bytes);
s->state.pending_read_from_cronet = true;
}
gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx);
@ -504,10 +540,13 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
gpr_mu_lock(&s->mu);
s->state.pending_read_from_cronet = false;
s->state.state_callback_received[OP_RECV_MESSAGE] = true;
if (count > 0 && s->state.flush_read) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096);
bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
GRPC_FLUSH_READ_SIZE);
s->state.pending_read_from_cronet = true;
gpr_mu_unlock(&s->mu);
} else if (count > 0) {
s->state.rs.received_bytes += count;
@ -518,16 +557,14 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
bidirectional_stream_read(
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
s->state.rs.remaining_bytes);
s->state.pending_read_from_cronet = true;
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
} else {
if (s->state.flush_read) {
gpr_free(s->state.rs.read_buffer);
s->state.rs.read_buffer = NULL;
}
null_and_maybe_free_read_buffer(s);
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
@ -564,6 +601,7 @@ static void on_response_trailers_received(
if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
0 != strcmp(trailers->headers[i].value, "0")) {
s->state.fail_state = true;
maybe_flush_read(s);
}
}
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
@ -778,7 +816,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
result = false;
/* we haven't sent message yet */
else if (stream_state->unprocessed_send_message &&
else if (stream_state->pending_send_message &&
!stream_state->state_op_done[OP_SEND_MESSAGE])
result = false;
/* we haven't got on_write_completed for the send yet */
@ -900,7 +938,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
} else if (stream_op->send_message &&
op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
stream_state->unprocessed_send_message = false;
stream_state->pending_send_message = false;
if (stream_state->state_callback_received[OP_FAILED]) {
result = NO_ACTION_POSSIBLE;
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
@ -1009,6 +1047,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "flush read");
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.length_field_received == false) {
if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
stream_state->rs.remaining_bytes == 0) {
@ -1029,6 +1074,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
true; /* Indicates that at least one read request has been made */
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
stream_state->rs.remaining_bytes = 0;
@ -1047,11 +1093,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0;
stream_state->rs.length_field_received = false;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_state->rs.remaining_bytes == 0) {
@ -1064,6 +1112,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
true; /* Indicates that at least one read request has been made */
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
result = NO_ACTION_POSSIBLE;
@ -1075,7 +1124,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice);
memcpy(dst_p, stream_state->rs.read_buffer,
(size_t)stream_state->rs.length_field);
free_read_buffer(s);
null_and_maybe_free_read_buffer(s);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
@ -1096,6 +1145,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_op->recv_trailing_metadata &&
@ -1153,15 +1203,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
make a note */
if (stream_op->recv_message)
stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
} else if (stream_state->fail_state && !stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas);
if (stream_state->rs.read_buffer &&
stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) {
gpr_free(stream_state->rs.read_buffer);
stream_state->rs.read_buffer = NULL;
}
stream_state->rs.read_buffer = gpr_malloc(4096);
stream_state->flush_read = true;
} else {
result = NO_ACTION_POSSIBLE;
}
@ -1190,7 +1231,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->state.fail_state = s->state.flush_read = false;
s->state.cancel_error = NULL;
s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false;
s->state.unprocessed_send_message = false;
s->state.pending_send_message = false;
s->state.pending_recv_trailing_metadata = false;
s->state.pending_read_from_cronet = false;
s->curr_gs = gs;
s->curr_ct = (grpc_cronet_transport *)gt;
@ -1209,37 +1252,30 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op");
stream_obj *s = (stream_obj *)gs;
add_to_storage(s, op);
if (op->send_initial_metadata &&
header_has_authority(op->send_initial_metadata->list.head)) {
/* Cronet does not support :authority header field. We cancel the call when
this field is present in metadata */
bidirectional_stream_header_array header_array;
bidirectional_stream_header *header;
bidirectional_stream cbs;
CRONET_LOG(GPR_DEBUG,
":authority header is provided but not supported;"
" cancel operations");
/* Notify application that operation is cancelled by forging trailers */
header_array.count = 1;
header_array.capacity = 1;
header_array.headers = gpr_malloc(sizeof(bidirectional_stream_header));
header = (bidirectional_stream_header *)header_array.headers;
header->key = "grpc-status";
header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */
cbs.annotation = (void *)s;
s->state.state_op_done[OP_CANCEL_ERROR] = true;
on_response_trailers_received(&cbs, &header_array);
gpr_free(header_array.headers);
} else {
execute_from_storage(s);
this field is present in metadata */
if (op->recv_initial_metadata_ready) {
grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED);
}
if (op->recv_message_ready) {
grpc_closure_sched(exec_ctx, op->recv_message_ready,
GRPC_ERROR_CANCELLED);
}
grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED);
return;
}
stream_obj *s = (stream_obj *)gs;
add_to_storage(s, op);
execute_from_storage(s);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, void *and_free_memory) {
stream_obj *s = (stream_obj *)gs;
null_and_maybe_free_read_buffer(s);
GRPC_ERROR_UNREF(s->state.cancel_error);
}

Loading…
Cancel
Save