|
|
@ -916,9 +916,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
char *url = NULL; |
|
|
|
char *url = NULL; |
|
|
|
const char *method = "POST"; |
|
|
|
const char *method = "POST"; |
|
|
|
s->header_array.headers = NULL; |
|
|
|
s->header_array.headers = NULL; |
|
|
|
convert_metadata_to_cronet_headers( |
|
|
|
convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata |
|
|
|
stream_op->send_initial_metadata->list.head, t->host, &url, |
|
|
|
.send_initial_metadata->list.head, |
|
|
|
&s->header_array.headers, &s->header_array.count, &method); |
|
|
|
t->host, &url, &s->header_array.headers, |
|
|
|
|
|
|
|
&s->header_array.count, &method); |
|
|
|
s->header_array.capacity = s->header_array.count; |
|
|
|
s->header_array.capacity = s->header_array.count; |
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); |
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); |
|
|
|
bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); |
|
|
|
bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); |
|
|
@ -946,13 +947,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_slice_buffer write_slice_buffer; |
|
|
|
grpc_slice_buffer write_slice_buffer; |
|
|
|
grpc_slice slice; |
|
|
|
grpc_slice slice; |
|
|
|
grpc_slice_buffer_init(&write_slice_buffer); |
|
|
|
grpc_slice_buffer_init(&write_slice_buffer); |
|
|
|
grpc_byte_stream_next(NULL, stream_op->send_message, &slice, |
|
|
|
grpc_byte_stream_next( |
|
|
|
stream_op->send_message->length, NULL); |
|
|
|
NULL, stream_op->payload->send_message.send_message, &slice, |
|
|
|
|
|
|
|
stream_op->payload->send_message.send_message->length, NULL); |
|
|
|
/* Check that compression flag is OFF. We don't support compression yet.
|
|
|
|
/* Check that compression flag is OFF. We don't support compression yet.
|
|
|
|
*/ |
|
|
|
*/ |
|
|
|
if (stream_op->send_message->flags != 0) { |
|
|
|
if (stream_op->payload->send_message.send_message->flags != 0) { |
|
|
|
gpr_log(GPR_ERROR, "Compression is not supported"); |
|
|
|
gpr_log(GPR_ERROR, "Compression is not supported"); |
|
|
|
GPR_ASSERT(stream_op->send_message->flags == 0); |
|
|
|
GPR_ASSERT(stream_op->payload->send_message.send_message->flags == 0); |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_slice_buffer_add(&write_slice_buffer, slice); |
|
|
|
grpc_slice_buffer_add(&write_slice_buffer, slice); |
|
|
|
if (write_slice_buffer.count != 1) { |
|
|
|
if (write_slice_buffer.count != 1) { |
|
|
@ -1010,17 +1012,23 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
OP_RECV_INITIAL_METADATA)) { |
|
|
|
OP_RECV_INITIAL_METADATA)) { |
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); |
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); |
|
|
|
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { |
|
|
|
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { |
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
|
|
|
grpc_closure_sched( |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
exec_ctx, |
|
|
|
|
|
|
|
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, |
|
|
|
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
} else if (stream_state->state_callback_received[OP_FAILED]) { |
|
|
|
} else if (stream_state->state_callback_received[OP_FAILED]) { |
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
|
|
|
grpc_closure_sched( |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
exec_ctx, |
|
|
|
|
|
|
|
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, |
|
|
|
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
grpc_chttp2_incoming_metadata_buffer_publish( |
|
|
|
grpc_chttp2_incoming_metadata_buffer_publish( |
|
|
|
exec_ctx, &oas->s->state.rs.initial_metadata, |
|
|
|
exec_ctx, &oas->s->state.rs.initial_metadata, |
|
|
|
stream_op->recv_initial_metadata); |
|
|
|
stream_op->payload->recv_initial_metadata.recv_initial_metadata); |
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
|
|
|
grpc_closure_sched( |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
exec_ctx, |
|
|
|
|
|
|
|
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, |
|
|
|
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
} |
|
|
|
} |
|
|
|
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; |
|
|
|
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
@ -1029,27 +1037,31 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); |
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); |
|
|
|
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { |
|
|
|
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { |
|
|
|
CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); |
|
|
|
CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); |
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, |
|
|
|
grpc_closure_sched(exec_ctx, |
|
|
|
|
|
|
|
stream_op->payload->recv_message.recv_message_ready, |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
} else if (stream_state->state_callback_received[OP_FAILED]) { |
|
|
|
} else if (stream_state->state_callback_received[OP_FAILED]) { |
|
|
|
CRONET_LOG(GPR_DEBUG, "Stream failed."); |
|
|
|
CRONET_LOG(GPR_DEBUG, "Stream failed."); |
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, |
|
|
|
grpc_closure_sched(exec_ctx, |
|
|
|
|
|
|
|
stream_op->payload->recv_message.recv_message_ready, |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
} else if (stream_state->rs.read_stream_closed == true) { |
|
|
|
} else if (stream_state->rs.read_stream_closed == true) { |
|
|
|
/* No more data will be received */ |
|
|
|
/* No more data will be received */ |
|
|
|
CRONET_LOG(GPR_DEBUG, "read stream closed"); |
|
|
|
CRONET_LOG(GPR_DEBUG, "read stream closed"); |
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, |
|
|
|
grpc_closure_sched(exec_ctx, |
|
|
|
|
|
|
|
stream_op->payload->recv_message.recv_message_ready, |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
} else if (stream_state->flush_read) { |
|
|
|
} else if (stream_state->flush_read) { |
|
|
|
CRONET_LOG(GPR_DEBUG, "flush read"); |
|
|
|
CRONET_LOG(GPR_DEBUG, "flush read"); |
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, |
|
|
|
grpc_closure_sched(exec_ctx, |
|
|
|
|
|
|
|
stream_op->payload->recv_message.recv_message_ready, |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
|
|
@ -1084,8 +1096,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
&stream_state->rs.read_slice_buffer, 0); |
|
|
|
&stream_state->rs.read_slice_buffer, 0); |
|
|
|
*((grpc_byte_buffer **)stream_op->recv_message) = |
|
|
|
*((grpc_byte_buffer **)stream_op->recv_message) = |
|
|
|
(grpc_byte_buffer *)&stream_state->rs.sbs; |
|
|
|
(grpc_byte_buffer *)&stream_state->rs.sbs; |
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, |
|
|
|
grpc_closure_sched( |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
exec_ctx, stream_op->payload->recv_message.recv_message_ready, |
|
|
|
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
|
|
|
|
|
|
|
@ -1132,7 +1145,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
&stream_state->rs.read_slice_buffer, 0); |
|
|
|
&stream_state->rs.read_slice_buffer, 0); |
|
|
|
*((grpc_byte_buffer **)stream_op->recv_message) = |
|
|
|
*((grpc_byte_buffer **)stream_op->recv_message) = |
|
|
|
(grpc_byte_buffer *)&stream_state->rs.sbs; |
|
|
|
(grpc_byte_buffer *)&stream_state->rs.sbs; |
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, |
|
|
|
grpc_closure_sched(exec_ctx, |
|
|
|
|
|
|
|
stream_op->payload->recv_message.recv_message_ready, |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
GRPC_ERROR_NONE); |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
|
|
@ -1155,12 +1169,12 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
if (oas->s->state.rs.trailing_metadata_valid) { |
|
|
|
if (oas->s->state.rs.trailing_metadata_valid) { |
|
|
|
grpc_chttp2_incoming_metadata_buffer_publish( |
|
|
|
grpc_chttp2_incoming_metadata_buffer_publish( |
|
|
|
exec_ctx, &oas->s->state.rs.trailing_metadata, |
|
|
|
exec_ctx, &oas->s->state.rs.trailing_metadata, |
|
|
|
stream_op->recv_trailing_metadata); |
|
|
|
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); |
|
|
|
stream_state->rs.trailing_metadata_valid = false; |
|
|
|
stream_state->rs.trailing_metadata_valid = false; |
|
|
|
} |
|
|
|
} |
|
|
|
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; |
|
|
|
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
} else if (stream_op->cancel_error && |
|
|
|
} else if (stream_op->cancel_stream && |
|
|
|
op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { |
|
|
|
op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { |
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); |
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); |
|
|
|
CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); |
|
|
|
CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); |
|
|
@ -1172,7 +1186,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
} |
|
|
|
} |
|
|
|
stream_state->state_op_done[OP_CANCEL_ERROR] = true; |
|
|
|
stream_state->state_op_done[OP_CANCEL_ERROR] = true; |
|
|
|
if (!stream_state->cancel_error) { |
|
|
|
if (!stream_state->cancel_error) { |
|
|
|
stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); |
|
|
|
stream_state->cancel_error = |
|
|
|
|
|
|
|
GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (stream_op->on_complete && |
|
|
|
} else if (stream_op->on_complete && |
|
|
|
op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { |
|
|
|
op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { |
|
|
@ -1253,15 +1268,18 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
|
|
|
grpc_stream *gs, grpc_transport_stream_op *op) { |
|
|
|
grpc_stream *gs, grpc_transport_stream_op *op) { |
|
|
|
CRONET_LOG(GPR_DEBUG, "perform_stream_op"); |
|
|
|
CRONET_LOG(GPR_DEBUG, "perform_stream_op"); |
|
|
|
if (op->send_initial_metadata && |
|
|
|
if (op->send_initial_metadata && |
|
|
|
header_has_authority(op->send_initial_metadata->list.head)) { |
|
|
|
header_has_authority(op->payload->send_initial_metadata |
|
|
|
|
|
|
|
.send_initial_metadata->list.head)) { |
|
|
|
/* Cronet does not support :authority header field. We cancel the call when
|
|
|
|
/* Cronet does not support :authority header field. We cancel the call when
|
|
|
|
this field is present in metadata */ |
|
|
|
this field is present in metadata */ |
|
|
|
if (op->recv_initial_metadata_ready) { |
|
|
|
if (op->recv_initial_metadata) { |
|
|
|
grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, |
|
|
|
grpc_closure_sched( |
|
|
|
GRPC_ERROR_CANCELLED); |
|
|
|
exec_ctx, |
|
|
|
|
|
|
|
op->payload->recv_initial_metadata.recv_initial_metadata_ready, |
|
|
|
|
|
|
|
GRPC_ERROR_CANCELLED); |
|
|
|
} |
|
|
|
} |
|
|
|
if (op->recv_message_ready) { |
|
|
|
if (op->recv_message) { |
|
|
|
grpc_closure_sched(exec_ctx, op->recv_message_ready, |
|
|
|
grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, |
|
|
|
GRPC_ERROR_CANCELLED); |
|
|
|
GRPC_ERROR_CANCELLED); |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); |
|
|
|
grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); |
|
|
|