|
|
|
@ -88,7 +88,7 @@ enum e_op_id { |
|
|
|
|
|
|
|
|
|
/* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */ |
|
|
|
|
|
|
|
|
|
static void on_request_headers_sent(bidirectional_stream *); |
|
|
|
|
static void on_stream_ready(bidirectional_stream *); |
|
|
|
|
static void on_response_headers_received( |
|
|
|
|
bidirectional_stream *, const bidirectional_stream_header_array *, |
|
|
|
|
const char *); |
|
|
|
@ -100,7 +100,7 @@ static void on_succeeded(bidirectional_stream *); |
|
|
|
|
static void on_failed(bidirectional_stream *, int); |
|
|
|
|
static void on_canceled(bidirectional_stream *); |
|
|
|
|
static bidirectional_stream_callback cronet_callbacks = { |
|
|
|
|
on_request_headers_sent, |
|
|
|
|
on_stream_ready, |
|
|
|
|
on_response_headers_received, |
|
|
|
|
on_read_completed, |
|
|
|
|
on_write_completed, |
|
|
|
@ -114,6 +114,7 @@ struct grpc_cronet_transport { |
|
|
|
|
grpc_transport base; /* must be first element in this structure */ |
|
|
|
|
stream_engine *engine; |
|
|
|
|
char *host; |
|
|
|
|
bool use_packet_coalescing; |
|
|
|
|
}; |
|
|
|
|
typedef struct grpc_cronet_transport grpc_cronet_transport; |
|
|
|
|
|
|
|
|
@ -152,6 +153,9 @@ struct op_state { |
|
|
|
|
bool state_callback_received[OP_NUM_OPS]; |
|
|
|
|
bool fail_state; |
|
|
|
|
bool flush_read; |
|
|
|
|
bool flush_cronet_when_ready; |
|
|
|
|
bool pending_write_for_trailer; |
|
|
|
|
bool unprocessed_send_message; |
|
|
|
|
grpc_error *cancel_error; |
|
|
|
|
/* data structure for storing data coming from server */ |
|
|
|
|
struct read_state rs; |
|
|
|
@ -175,7 +179,7 @@ struct op_storage { |
|
|
|
|
struct stream_obj { |
|
|
|
|
struct op_and_state *oas; |
|
|
|
|
grpc_transport_stream_op *curr_op; |
|
|
|
|
grpc_cronet_transport curr_ct; |
|
|
|
|
grpc_cronet_transport *curr_ct; |
|
|
|
|
grpc_stream *curr_gs; |
|
|
|
|
bidirectional_stream *cbs; |
|
|
|
|
bidirectional_stream_header_array header_array; |
|
|
|
@ -274,6 +278,9 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { |
|
|
|
|
new_op->next = storage->head; |
|
|
|
|
storage->head = new_op; |
|
|
|
|
storage->num_pending_ops++; |
|
|
|
|
if (op->send_message) { |
|
|
|
|
s->state.unprocessed_send_message = true; |
|
|
|
|
} |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, |
|
|
|
|
storage->num_pending_ops); |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
@ -406,9 +413,10 @@ static void on_succeeded(bidirectional_stream *stream) { |
|
|
|
|
/*
|
|
|
|
|
Cronet callback |
|
|
|
|
*/ |
|
|
|
|
static void on_request_headers_sent(bidirectional_stream *stream) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); |
|
|
|
|
static void on_stream_ready(bidirectional_stream *stream) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); |
|
|
|
|
stream_obj *s = (stream_obj *)stream->annotation; |
|
|
|
|
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; |
|
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; |
|
|
|
|
s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; |
|
|
|
@ -417,6 +425,14 @@ static void on_request_headers_sent(bidirectional_stream *stream) { |
|
|
|
|
gpr_free(s->header_array.headers); |
|
|
|
|
s->header_array.headers = NULL; |
|
|
|
|
} |
|
|
|
|
/* Send the initial metadata on wire if there is no SEND_MESSAGE or
|
|
|
|
|
* SEND_TRAILING_METADATA ops pending */ |
|
|
|
|
if (t->use_packet_coalescing) { |
|
|
|
|
if (s->state.flush_cronet_when_ready) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); |
|
|
|
|
bidirectional_stream_flush(stream); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
execute_from_storage(s); |
|
|
|
|
} |
|
|
|
@ -528,6 +544,7 @@ static void on_response_trailers_received( |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, |
|
|
|
|
trailers); |
|
|
|
|
stream_obj *s = (stream_obj *)stream->annotation; |
|
|
|
|
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; |
|
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
memset(&s->state.rs.trailing_metadata, 0, |
|
|
|
|
sizeof(s->state.rs.trailing_metadata)); |
|
|
|
@ -558,6 +575,10 @@ static void on_response_trailers_received( |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); |
|
|
|
|
s->state.state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
|
bidirectional_stream_write(s->cbs, "", 0, true); |
|
|
|
|
if (t->use_packet_coalescing) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); |
|
|
|
|
bidirectional_stream_flush(s->cbs); |
|
|
|
|
} |
|
|
|
|
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
@ -607,7 +628,7 @@ static void convert_metadata_to_cronet_headers( |
|
|
|
|
curr = curr->next; |
|
|
|
|
num_headers_available++; |
|
|
|
|
} |
|
|
|
|
/* Allocate enough memory. It is freed in the on_request_headers_sent callback
|
|
|
|
|
/* Allocate enough memory. It is freed in the on_stream_ready callback
|
|
|
|
|
*/ |
|
|
|
|
bidirectional_stream_header *headers = |
|
|
|
|
(bidirectional_stream_header *)gpr_malloc( |
|
|
|
@ -687,8 +708,10 @@ static bool header_has_authority(grpc_linked_mdelem *head) { |
|
|
|
|
executed. This is the heart of the state machine. |
|
|
|
|
*/ |
|
|
|
|
static bool op_can_be_run(grpc_transport_stream_op *curr_op, |
|
|
|
|
struct op_state *stream_state, |
|
|
|
|
struct op_state *op_state, enum e_op_id op_id) { |
|
|
|
|
struct stream_obj *s, struct op_state *op_state, |
|
|
|
|
enum e_op_id op_id) { |
|
|
|
|
struct op_state *stream_state = &s->state; |
|
|
|
|
grpc_cronet_transport *t = s->curr_ct; |
|
|
|
|
bool result = true; |
|
|
|
|
/* When call is canceled, every op can be run, except under following
|
|
|
|
|
conditions |
|
|
|
@ -755,12 +778,14 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, |
|
|
|
|
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) |
|
|
|
|
result = false; |
|
|
|
|
/* we haven't sent message yet */ |
|
|
|
|
else if (curr_op->send_message && |
|
|
|
|
else if (stream_state->unprocessed_send_message && |
|
|
|
|
!stream_state->state_op_done[OP_SEND_MESSAGE]) |
|
|
|
|
result = false; |
|
|
|
|
/* we haven't got on_write_completed for the send yet */ |
|
|
|
|
else if (stream_state->state_op_done[OP_SEND_MESSAGE] && |
|
|
|
|
!stream_state->state_callback_received[OP_SEND_MESSAGE]) |
|
|
|
|
!stream_state->state_callback_received[OP_SEND_MESSAGE] && |
|
|
|
|
!(t->use_packet_coalescing && |
|
|
|
|
stream_state->pending_write_for_trailer)) |
|
|
|
|
result = false; |
|
|
|
|
} else if (op_id == OP_CANCEL_ERROR) { |
|
|
|
|
/* already executed */ |
|
|
|
@ -833,24 +858,28 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
struct op_and_state *oas) { |
|
|
|
|
grpc_transport_stream_op *stream_op = &oas->op; |
|
|
|
|
struct stream_obj *s = oas->s; |
|
|
|
|
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; |
|
|
|
|
struct op_state *stream_state = &s->state; |
|
|
|
|
enum e_op_result result = NO_ACTION_POSSIBLE; |
|
|
|
|
if (stream_op->send_initial_metadata && |
|
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
|
OP_SEND_INITIAL_METADATA)) { |
|
|
|
|
op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas); |
|
|
|
|
/* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
|
|
|
|
|
* on_failed */ |
|
|
|
|
GPR_ASSERT(s->cbs == NULL); |
|
|
|
|
GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]); |
|
|
|
|
s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, |
|
|
|
|
&cronet_callbacks); |
|
|
|
|
s->cbs = |
|
|
|
|
bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks); |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs); |
|
|
|
|
if (t->use_packet_coalescing) { |
|
|
|
|
bidirectional_stream_disable_auto_flush(s->cbs, true); |
|
|
|
|
bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); |
|
|
|
|
} |
|
|
|
|
char *url = NULL; |
|
|
|
|
const char *method = "POST"; |
|
|
|
|
s->header_array.headers = NULL; |
|
|
|
|
convert_metadata_to_cronet_headers( |
|
|
|
|
stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, |
|
|
|
|
stream_op->send_initial_metadata->list.head, t->host, &url, |
|
|
|
|
&s->header_array.headers, &s->header_array.count, &method); |
|
|
|
|
s->header_array.capacity = s->header_array.count; |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); |
|
|
|
@ -862,30 +891,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_free((void *)s->header_array.headers[header_index].value); |
|
|
|
|
} |
|
|
|
|
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; |
|
|
|
|
result = ACTION_TAKEN_WITH_CALLBACK; |
|
|
|
|
} else if (stream_op->recv_initial_metadata && |
|
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
|
OP_RECV_INITIAL_METADATA)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); |
|
|
|
|
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { |
|
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} else if (stream_state->state_callback_received[OP_FAILED]) { |
|
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
grpc_chttp2_incoming_metadata_buffer_publish( |
|
|
|
|
exec_ctx, &oas->s->state.rs.initial_metadata, |
|
|
|
|
stream_op->recv_initial_metadata); |
|
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
if (t->use_packet_coalescing) { |
|
|
|
|
if (!stream_op->send_message && !stream_op->send_trailing_metadata) { |
|
|
|
|
s->state.flush_cronet_when_ready = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; |
|
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
|
result = ACTION_TAKEN_WITH_CALLBACK; |
|
|
|
|
} else if (stream_op->send_message && |
|
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
|
OP_SEND_MESSAGE)) { |
|
|
|
|
op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); |
|
|
|
|
stream_state->unprocessed_send_message = false; |
|
|
|
|
if (stream_state->state_callback_received[OP_FAILED]) { |
|
|
|
|
result = NO_ACTION_POSSIBLE; |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); |
|
|
|
@ -916,16 +931,63 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
|
bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, |
|
|
|
|
(int)write_buffer_size, false); |
|
|
|
|
result = ACTION_TAKEN_WITH_CALLBACK; |
|
|
|
|
if (t->use_packet_coalescing) { |
|
|
|
|
if (!stream_op->send_trailing_metadata) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); |
|
|
|
|
bidirectional_stream_flush(s->cbs); |
|
|
|
|
result = ACTION_TAKEN_WITH_CALLBACK; |
|
|
|
|
} else { |
|
|
|
|
stream_state->pending_write_for_trailer = true; |
|
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
result = ACTION_TAKEN_WITH_CALLBACK; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
result = NO_ACTION_POSSIBLE; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
stream_state->state_op_done[OP_SEND_MESSAGE] = true; |
|
|
|
|
oas->state.state_op_done[OP_SEND_MESSAGE] = true; |
|
|
|
|
} else if (stream_op->send_trailing_metadata && |
|
|
|
|
op_can_be_run(stream_op, s, &oas->state, |
|
|
|
|
OP_SEND_TRAILING_METADATA)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); |
|
|
|
|
if (stream_state->state_callback_received[OP_FAILED]) { |
|
|
|
|
result = NO_ACTION_POSSIBLE; |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); |
|
|
|
|
} else { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); |
|
|
|
|
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
|
bidirectional_stream_write(s->cbs, "", 0, true); |
|
|
|
|
if (t->use_packet_coalescing) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); |
|
|
|
|
bidirectional_stream_flush(s->cbs); |
|
|
|
|
} |
|
|
|
|
result = ACTION_TAKEN_WITH_CALLBACK; |
|
|
|
|
} |
|
|
|
|
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; |
|
|
|
|
} else if (stream_op->recv_initial_metadata && |
|
|
|
|
op_can_be_run(stream_op, s, &oas->state, |
|
|
|
|
OP_RECV_INITIAL_METADATA)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); |
|
|
|
|
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { |
|
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} else if (stream_state->state_callback_received[OP_FAILED]) { |
|
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
grpc_chttp2_incoming_metadata_buffer_publish( |
|
|
|
|
exec_ctx, &oas->s->state.rs.initial_metadata, |
|
|
|
|
stream_op->recv_initial_metadata); |
|
|
|
|
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; |
|
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
|
} else if (stream_op->recv_message && |
|
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
|
OP_RECV_MESSAGE)) { |
|
|
|
|
op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); |
|
|
|
|
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); |
|
|
|
@ -980,6 +1042,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
|
oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
|
|
|
|
|
|
|
|
|
/* Extra read to trigger on_succeed */ |
|
|
|
|
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; |
|
|
|
|
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; |
|
|
|
|
stream_state->rs.received_bytes = 0; |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); |
|
|
|
|
stream_state->state_op_done[OP_READ_REQ_MADE] = |
|
|
|
|
true; /* Indicates that at least one read request has been made */ |
|
|
|
|
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, |
|
|
|
|
stream_state->rs.remaining_bytes); |
|
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
|
} |
|
|
|
|
} else if (stream_state->rs.remaining_bytes == 0) { |
|
|
|
@ -1027,7 +1099,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
|
} |
|
|
|
|
} else if (stream_op->recv_trailing_metadata && |
|
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
|
op_can_be_run(stream_op, s, &oas->state, |
|
|
|
|
OP_RECV_TRAILING_METADATA)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); |
|
|
|
|
if (oas->s->state.rs.trailing_metadata_valid) { |
|
|
|
@ -1038,23 +1110,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; |
|
|
|
|
result = ACTION_TAKEN_NO_CALLBACK; |
|
|
|
|
} else if (stream_op->send_trailing_metadata && |
|
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
|
OP_SEND_TRAILING_METADATA)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); |
|
|
|
|
if (stream_state->state_callback_received[OP_FAILED]) { |
|
|
|
|
result = NO_ACTION_POSSIBLE; |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); |
|
|
|
|
} else { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); |
|
|
|
|
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
|
|
|
|
bidirectional_stream_write(s->cbs, "", 0, true); |
|
|
|
|
result = ACTION_TAKEN_WITH_CALLBACK; |
|
|
|
|
} |
|
|
|
|
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; |
|
|
|
|
} else if (stream_op->cancel_error && |
|
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
|
OP_CANCEL_ERROR)) { |
|
|
|
|
op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); |
|
|
|
|
if (s->cbs) { |
|
|
|
@ -1068,8 +1125,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); |
|
|
|
|
} |
|
|
|
|
} else if (stream_op->on_complete && |
|
|
|
|
op_can_be_run(stream_op, stream_state, &oas->state, |
|
|
|
|
OP_ON_COMPLETE)) { |
|
|
|
|
op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); |
|
|
|
|
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { |
|
|
|
|
grpc_closure_sched(exec_ctx, stream_op->on_complete, |
|
|
|
@ -1133,6 +1189,12 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
|
|
|
|
sizeof(s->state.state_callback_received)); |
|
|
|
|
s->state.fail_state = s->state.flush_read = false; |
|
|
|
|
s->state.cancel_error = NULL; |
|
|
|
|
s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; |
|
|
|
|
s->state.unprocessed_send_message = false; |
|
|
|
|
|
|
|
|
|
s->curr_gs = gs; |
|
|
|
|
s->curr_ct = (grpc_cronet_transport *)gt; |
|
|
|
|
|
|
|
|
|
gpr_mu_init(&s->mu); |
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
@ -1148,8 +1210,6 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
|
|
|
|
grpc_stream *gs, grpc_transport_stream_op *op) { |
|
|
|
|
CRONET_LOG(GPR_DEBUG, "perform_stream_op"); |
|
|
|
|
stream_obj *s = (stream_obj *)gs; |
|
|
|
|
s->curr_gs = gs; |
|
|
|
|
memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport)); |
|
|
|
|
add_to_storage(s, op); |
|
|
|
|
if (op->send_initial_metadata && |
|
|
|
|
header_has_authority(op->send_initial_metadata->list.head)) { |
|
|
|
@ -1197,14 +1257,58 @@ static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, |
|
|
|
|
static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
|
|
|
|
grpc_transport_op *op) {} |
|
|
|
|
|
|
|
|
|
const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), |
|
|
|
|
"cronet_http", |
|
|
|
|
init_stream, |
|
|
|
|
set_pollset_do_nothing, |
|
|
|
|
set_pollset_set_do_nothing, |
|
|
|
|
perform_stream_op, |
|
|
|
|
perform_op, |
|
|
|
|
destroy_stream, |
|
|
|
|
destroy_transport, |
|
|
|
|
get_peer, |
|
|
|
|
get_endpoint}; |
|
|
|
|
static const grpc_transport_vtable grpc_cronet_vtable = { |
|
|
|
|
sizeof(stream_obj), |
|
|
|
|
"cronet_http", |
|
|
|
|
init_stream, |
|
|
|
|
set_pollset_do_nothing, |
|
|
|
|
set_pollset_set_do_nothing, |
|
|
|
|
perform_stream_op, |
|
|
|
|
perform_op, |
|
|
|
|
destroy_stream, |
|
|
|
|
destroy_transport, |
|
|
|
|
get_peer, |
|
|
|
|
get_endpoint}; |
|
|
|
|
|
|
|
|
|
grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, |
|
|
|
|
const grpc_channel_args *args, |
|
|
|
|
void *reserved) { |
|
|
|
|
grpc_cronet_transport *ct = gpr_malloc(sizeof(grpc_cronet_transport)); |
|
|
|
|
if (!ct) { |
|
|
|
|
goto error; |
|
|
|
|
} |
|
|
|
|
ct->base.vtable = &grpc_cronet_vtable; |
|
|
|
|
ct->engine = engine; |
|
|
|
|
ct->host = gpr_malloc(strlen(target) + 1); |
|
|
|
|
if (!ct->host) { |
|
|
|
|
goto error; |
|
|
|
|
} |
|
|
|
|
strcpy(ct->host, target); |
|
|
|
|
|
|
|
|
|
ct->use_packet_coalescing = true; |
|
|
|
|
if (args) { |
|
|
|
|
for (size_t i = 0; i < args->num_args; i++) { |
|
|
|
|
if (0 == |
|
|
|
|
strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) { |
|
|
|
|
if (args->args[i].type != GRPC_ARG_INTEGER) { |
|
|
|
|
gpr_log(GPR_ERROR, "%s ignored: it must be an integer", |
|
|
|
|
GRPC_ARG_USE_CRONET_PACKET_COALESCING); |
|
|
|
|
} else { |
|
|
|
|
ct->use_packet_coalescing = (args->args[i].value.integer != 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return &ct->base; |
|
|
|
|
|
|
|
|
|
error: |
|
|
|
|
if (ct) { |
|
|
|
|
if (ct->host) { |
|
|
|
|
gpr_free(ct->host); |
|
|
|
|
} |
|
|
|
|
gpr_free(ct); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|