|
|
@ -412,7 +412,7 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data, |
|
|
|
s->state.rs.received_bytes += count; |
|
|
|
s->state.rs.received_bytes += count; |
|
|
|
s->state.rs.remaining_bytes -= count; |
|
|
|
s->state.rs.remaining_bytes -= count; |
|
|
|
if (s->state.rs.remaining_bytes > 0) { |
|
|
|
if (s->state.rs.remaining_bytes > 0) { |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read"); |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); |
|
|
|
s->state.state_op_done[OP_READ_REQ_MADE] = true; |
|
|
|
s->state.state_op_done[OP_READ_REQ_MADE] = true; |
|
|
|
cronet_bidirectional_stream_read( |
|
|
|
cronet_bidirectional_stream_read( |
|
|
|
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, |
|
|
|
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, |
|
|
@ -602,6 +602,9 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, |
|
|
|
/* we haven't received trailers yet. */ |
|
|
|
/* we haven't received trailers yet. */ |
|
|
|
else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA]) |
|
|
|
else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA]) |
|
|
|
result = false; |
|
|
|
result = false; |
|
|
|
|
|
|
|
/* we haven't received on_succeeded yet. */ |
|
|
|
|
|
|
|
else if (!stream_state->state_callback_received[OP_SUCCEEDED]) |
|
|
|
|
|
|
|
result = false; |
|
|
|
} else if (op_id == OP_SEND_TRAILING_METADATA) { |
|
|
|
} else if (op_id == OP_SEND_TRAILING_METADATA) { |
|
|
|
/* already executed */ |
|
|
|
/* already executed */ |
|
|
|
if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false; |
|
|
|
if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false; |
|
|
@ -699,16 +702,17 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { |
|
|
|
/* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
|
|
|
|
/* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
|
|
|
|
* on_failed */ |
|
|
|
* on_failed */ |
|
|
|
GPR_ASSERT(s->cbs == NULL); |
|
|
|
GPR_ASSERT(s->cbs == NULL); |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_create"); |
|
|
|
|
|
|
|
s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, |
|
|
|
s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, |
|
|
|
&cronet_callbacks); |
|
|
|
&cronet_callbacks); |
|
|
|
|
|
|
|
CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs); |
|
|
|
char *url; |
|
|
|
char *url; |
|
|
|
s->header_array.headers = NULL; |
|
|
|
s->header_array.headers = NULL; |
|
|
|
convert_metadata_to_cronet_headers( |
|
|
|
convert_metadata_to_cronet_headers( |
|
|
|
stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, |
|
|
|
stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, |
|
|
|
&s->header_array.headers, &s->header_array.count); |
|
|
|
&s->header_array.headers, &s->header_array.count); |
|
|
|
s->header_array.capacity = s->header_array.count; |
|
|
|
s->header_array.capacity = s->header_array.count; |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start %s", url); |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs, |
|
|
|
|
|
|
|
url); |
|
|
|
cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array, |
|
|
|
cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array, |
|
|
|
false); |
|
|
|
false); |
|
|
|
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; |
|
|
|
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; |
|
|
@ -746,8 +750,8 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { |
|
|
|
int write_buffer_size; |
|
|
|
int write_buffer_size; |
|
|
|
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, |
|
|
|
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, |
|
|
|
&write_buffer_size); |
|
|
|
&write_buffer_size); |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p)", |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)", |
|
|
|
stream_state->ws.write_buffer); |
|
|
|
s->cbs, stream_state->ws.write_buffer); |
|
|
|
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, |
|
|
|
cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, |
|
|
|
write_buffer_size, false); |
|
|
|
write_buffer_size, false); |
|
|
@ -785,7 +789,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { |
|
|
|
GPR_ASSERT(stream_state->rs.read_buffer); |
|
|
|
GPR_ASSERT(stream_state->rs.read_buffer); |
|
|
|
stream_state->rs.remaining_bytes = stream_state->rs.length_field; |
|
|
|
stream_state->rs.remaining_bytes = stream_state->rs.length_field; |
|
|
|
stream_state->rs.received_bytes = 0; |
|
|
|
stream_state->rs.received_bytes = 0; |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read"); |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); |
|
|
|
stream_state->state_op_done[OP_READ_REQ_MADE] = |
|
|
|
stream_state->state_op_done[OP_READ_REQ_MADE] = |
|
|
|
true; /* Indicates that at least one read request has been made */ |
|
|
|
true; /* Indicates that at least one read request has been made */ |
|
|
|
cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, |
|
|
|
cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, |
|
|
@ -810,7 +814,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { |
|
|
|
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; |
|
|
|
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; |
|
|
|
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; |
|
|
|
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; |
|
|
|
stream_state->rs.received_bytes = 0; |
|
|
|
stream_state->rs.received_bytes = 0; |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read"); |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); |
|
|
|
stream_state->state_op_done[OP_READ_REQ_MADE] = |
|
|
|
stream_state->state_op_done[OP_READ_REQ_MADE] = |
|
|
|
true; /* Indicates that at least one read request has been made */ |
|
|
|
true; /* Indicates that at least one read request has been made */ |
|
|
|
cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, |
|
|
|
cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, |
|
|
@ -857,7 +861,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { |
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
OP_SEND_TRAILING_METADATA)) { |
|
|
|
OP_SEND_TRAILING_METADATA)) { |
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); |
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (0)"); |
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs); |
|
|
|
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
cronet_bidirectional_stream_write(s->cbs, "", 0, true); |
|
|
|
cronet_bidirectional_stream_write(s->cbs, "", 0, true); |
|
|
|
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; |
|
|
|
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; |
|
|
|