|
|
|
@ -295,31 +295,6 @@ static void null_and_maybe_free_read_buffer(stream_obj* s) { |
|
|
|
|
s->state.rs.read_buffer = nullptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void maybe_flush_read(stream_obj* s) { |
|
|
|
|
/* To enter flush read state (discarding all the buffered messages in
|
|
|
|
|
* transport layer), two conditions must be satisfied: 1) non-zero grpc status |
|
|
|
|
* has been received, and 2) an op requesting the status code |
|
|
|
|
* (RECV_TRAILING_METADATA) is issued by the user. (See |
|
|
|
|
* doc/status_ordering.md) */ |
|
|
|
|
/* Whenever the evaluation of any of the two condition is changed, we check
|
|
|
|
|
* whether we should enter the flush read state. */ |
|
|
|
|
if (s->state.pending_recv_trailing_metadata && s->state.fail_state) { |
|
|
|
|
if (!s->state.flush_read && !s->state.rs.read_stream_closed) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "%p: Flush read", s); |
|
|
|
|
s->state.flush_read = true; |
|
|
|
|
null_and_maybe_free_read_buffer(s); |
|
|
|
|
s->state.rs.read_buffer = |
|
|
|
|
static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE)); |
|
|
|
|
if (!s->state.pending_read_from_cronet) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); |
|
|
|
|
bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, |
|
|
|
|
GRPC_FLUSH_READ_SIZE); |
|
|
|
|
s->state.pending_read_from_cronet = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void read_grpc_header(stream_obj* s) { |
|
|
|
|
s->state.rs.read_buffer = s->state.rs.grpc_header_bytes; |
|
|
|
|
s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; |
|
|
|
@ -359,7 +334,6 @@ static void add_to_storage(struct stream_obj* s, |
|
|
|
|
} |
|
|
|
|
if (op->recv_trailing_metadata) { |
|
|
|
|
s->state.pending_recv_trailing_metadata = true; |
|
|
|
|
maybe_flush_read(s); |
|
|
|
|
} |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, |
|
|
|
|
storage->num_pending_ops); |
|
|
|
@ -569,11 +543,10 @@ static void on_response_headers_received( |
|
|
|
|
for (size_t i = 0; i < headers->count; i++) { |
|
|
|
|
if (0 == strcmp("grpc-status", headers->headers[i].key)) { |
|
|
|
|
on_response_trailers_received(stream, headers); |
|
|
|
|
/* Do an extra read for a trailer-only stream with grpc_status = 0
|
|
|
|
|
to trigger on_succeeded() callback */ |
|
|
|
|
if (0 == strcmp(headers->headers[i].value, "0")) { |
|
|
|
|
read_grpc_header(s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Do an extra read for a trailer-only stream to trigger on_succeeded()
|
|
|
|
|
* callback */ |
|
|
|
|
read_grpc_header(s); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -670,13 +643,6 @@ static void on_response_trailers_received( |
|
|
|
|
if (trailers->count > 0) { |
|
|
|
|
s->state.rs.trailing_metadata_valid = true; |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < trailers->count; i++) { |
|
|
|
|
if (0 == strcmp(trailers->headers[i].key, "grpc-status") && |
|
|
|
|
0 != strcmp(trailers->headers[i].value, "0")) { |
|
|
|
|
s->state.fail_state = true; |
|
|
|
|
maybe_flush_read(s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; |
|
|
|
|
/* Send a EOS when server terminates the stream (testServerFinishesRequest) to
|
|
|
|
|
* trigger on_succeeded */ |
|
|
|
|