fixing issues from cronet end to end testing

changed core logic for executing multiple ops.
added call cancellation logic
simplified the code.
pull/7557/head
Makarand Dharmapurikar 9 years ago
parent 01da196879
commit 6b86080912
  1. 367
      src/core/ext/transport/cronet/transport/cronet_transport.c

@ -65,10 +65,11 @@ typedef struct grpc_cronet_transport grpc_cronet_transport;
enum send_state { enum send_state {
CRONET_SEND_IDLE = 0, CRONET_SEND_IDLE = 0,
CRONET_REQ_STARTED,
CRONET_SEND_HEADER, CRONET_SEND_HEADER,
CRONET_WRITE, CRONET_WRITE_PENDING,
CRONET_WRITE_COMPLETED, CRONET_WRITE_COMPLETED,
CRONET_WAIT_FOR_CANCEL,
CRONET_STREAM_CLOSED,
}; };
enum recv_state { enum recv_state {
@ -91,13 +92,14 @@ enum e_caller {
}; };
enum callback_id { enum callback_id {
CB_SEND_INITIAL_METADATA = 0, OP_SEND_INITIAL_METADATA = 0,
CB_SEND_MESSAGE, OP_SEND_MESSAGE,
CB_SEND_TRAILING_METADATA, OP_SEND_TRAILING_METADATA,
CB_RECV_MESSAGE, OP_RECV_MESSAGE,
CB_RECV_INITIAL_METADATA, OP_RECV_INITIAL_METADATA,
CB_RECV_TRAILING_METADATA, OP_RECV_TRAILING_METADATA,
CB_NUM_CALLBACKS OP_CANCEL_ERROR,
OP_NUM_CALLBACKS
}; };
struct stream_obj { struct stream_obj {
@ -117,23 +119,29 @@ struct stream_obj {
// Hold the URL // Hold the URL
char *url; char *url;
bool response_headers_received; // One bit per operation
bool read_requested; bool op_requested[OP_NUM_CALLBACKS];
bool response_trailers_received; bool op_done[OP_NUM_CALLBACKS];
// Set to true when server indicates no more data will be sent
bool read_closed; bool read_closed;
// Recv message stuff // Recv message stuff
grpc_byte_buffer **recv_message; grpc_byte_buffer **recv_message;
// Initial metadata stuff // Initial metadata stuff
grpc_metadata_batch *recv_initial_metadata; grpc_metadata_batch *recv_initial_metadata;
grpc_chttp2_incoming_metadata_buffer initial_metadata;
// Trailing metadata stuff // Trailing metadata stuff
grpc_metadata_batch *recv_trailing_metadata; grpc_metadata_batch *recv_trailing_metadata;
grpc_chttp2_incoming_metadata_buffer imb; grpc_chttp2_incoming_metadata_buffer imb;
bool imb_valid; // true if there are any valid entries in imb.
// This mutex protects receive state machine execution // This mutex protects receive state machine execution
gpr_mu recv_mu; gpr_mu recv_mu;
// we can queue up up to 2 callbacks for each OP
grpc_closure *callback_list[CB_NUM_CALLBACKS][2]; // Callbacks to be called when operations complete
grpc_closure *cb_recv_initial_metadata_ready;
grpc_closure *cb_recv_message_ready;
grpc_closure *on_complete;
// storage for header // storage for header
cronet_bidirectional_stream_header *headers; cronet_bidirectional_stream_header *headers;
@ -156,34 +164,63 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
grpc_transport *gt, grpc_stream *gs, grpc_transport *gt, grpc_stream *gs,
grpc_pollset_set *pollset_set) {} grpc_pollset_set *pollset_set) {}
static void enqueue_callbacks(grpc_closure *callback_list[]) { // Client creates a bunch of operations and invokes "call_start_batch"
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; // call_start_batch creates a stream_op structure. this structure has info
if (callback_list[0]) { // needed for executing all the ops. It has on_complete callback that needs
grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); // to be called when all ops are executed. This function keeps track of all
callback_list[0] = NULL; // outstanding operations. It returns true if all operations that were part of
// the stream_op have been completed.
static bool is_op_complete(stream_obj *s) {
int i;
// Check if any requested op is pending
for (i = 0; i < OP_NUM_CALLBACKS; i++) {
if (s->op_requested[i] && !s->op_done[i]) {
gpr_log(GPR_DEBUG, "is_op_complete is FALSE because of %d", i);
return false;
}
}
// Clear the requested/done bits and return true
for (i = 0; i < OP_NUM_CALLBACKS; i++) {
s->op_requested[i] = s->op_done[i] = false;
} }
if (callback_list[1]) { return true;
grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL);
callback_list[1] = NULL;
} }
static void enqueue_callback(grpc_closure *callback) {
GPR_ASSERT(callback);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx_sched(&exec_ctx, callback, GRPC_ERROR_NONE, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
static void on_canceled(cronet_bidirectional_stream *stream) { static void on_canceled(cronet_bidirectional_stream *stream) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "on_canceled %p", stream); gpr_log(GPR_DEBUG, "on_canceled(%p)", stream);
} }
stream_obj *s = (stream_obj *)stream->annotation;
s->op_done[OP_CANCEL_ERROR] = true;
// Terminate any read callback
if (s->cb_recv_message_ready) {
enqueue_callback(s->cb_recv_message_ready);
s->cb_recv_message_ready = 0;
s->op_done[OP_RECV_MESSAGE] = true;
}
// Don't wait to get any trailing metadata
s->op_done[OP_RECV_TRAILING_METADATA] = true;
next_send_step(s);
} }
static void on_failed(cronet_bidirectional_stream *stream, int net_error) { static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error); gpr_log(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
} }
} }
static void on_succeeded(cronet_bidirectional_stream *stream) { static void on_succeeded(cronet_bidirectional_stream *stream) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "on_succeeded %p", stream); gpr_log(GPR_DEBUG, "on_succeeded(%p)", stream);
} }
} }
@ -191,31 +228,38 @@ static void on_response_trailers_received(
cronet_bidirectional_stream *stream, cronet_bidirectional_stream *stream,
const cronet_bidirectional_stream_header_array *trailers) { const cronet_bidirectional_stream_header_array *trailers) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "R: on_response_trailers_received"); gpr_log(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);
} }
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
memset(&s->imb, 0, sizeof(s->imb)); memset(&s->imb, 0, sizeof(s->imb));
s->imb_valid = false;
grpc_chttp2_incoming_metadata_buffer_init(&s->imb); grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
unsigned int i = 0; unsigned int i = 0;
for (i = 0; i < trailers->count; i++) { for (i = 0; i < trailers->count; i++) {
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
trailers->headers[i].value);
}
grpc_chttp2_incoming_metadata_buffer_add( grpc_chttp2_incoming_metadata_buffer_add(
&s->imb, grpc_mdelem_from_metadata_strings( &s->imb, grpc_mdelem_from_metadata_strings(
grpc_mdstr_from_string(trailers->headers[i].key), grpc_mdstr_from_string(trailers->headers[i].key),
grpc_mdstr_from_string(trailers->headers[i].value))); grpc_mdstr_from_string(trailers->headers[i].value)));
s->imb_valid = true;
} }
s->response_trailers_received = true; s->op_done[OP_RECV_TRAILING_METADATA] = true;
next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED); next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
} }
static void on_write_completed(cronet_bidirectional_stream *stream, static void on_write_completed(cronet_bidirectional_stream *stream,
const char *data) { const char *data) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "W: on_write_completed"); gpr_log(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
} }
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]); s->op_done[OP_SEND_MESSAGE] = true;
s->cronet_send_state = CRONET_WRITE_COMPLETED;
next_send_step(s); next_send_step(s);
} }
@ -245,14 +289,14 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
int count) { int count) {
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d", gpr_log(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count);
count, s->total_read_bytes, s->remaining_read_bytes);
} }
if (count > 0) { if (count > 0) {
GPR_ASSERT(s->recv_message); GPR_ASSERT(s->recv_message);
s->remaining_read_bytes -= count; s->remaining_read_bytes -= count;
next_recv_step(s, ON_READ_COMPLETE); next_recv_step(s, ON_READ_COMPLETE);
} else { } else {
gpr_log(GPR_DEBUG, "read_closed = true");
s->read_closed = true; s->read_closed = true;
next_recv_step(s, ON_READ_COMPLETE); next_recv_step(s, ON_READ_COMPLETE);
} }
@ -263,21 +307,39 @@ static void on_response_headers_received(
const cronet_bidirectional_stream_header_array *headers, const cronet_bidirectional_stream_header_array *headers,
const char *negotiated_protocol) { const char *negotiated_protocol) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "R: on_response_headers_received"); gpr_log(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol);
} }
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
s->response_headers_received = true; memset(&s->initial_metadata, 0, sizeof(s->initial_metadata));
grpc_chttp2_incoming_metadata_buffer_init(&s->initial_metadata);
unsigned int i = 0;
for (i = 0; i < headers->count; i++) {
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "header key=%s, value=%s", headers->headers[i].key,
headers->headers[i].value);
}
grpc_chttp2_incoming_metadata_buffer_add(
&s->initial_metadata,
grpc_mdelem_from_metadata_strings(
grpc_mdstr_from_string(headers->headers[i].key),
grpc_mdstr_from_string(headers->headers[i].value)));
}
grpc_chttp2_incoming_metadata_buffer_publish(&s->initial_metadata,
s->recv_initial_metadata);
enqueue_callback(s->cb_recv_initial_metadata_ready);
s->op_done[OP_RECV_INITIAL_METADATA] = true;
next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED); next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
} }
static void on_request_headers_sent(cronet_bidirectional_stream *stream) { static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "W: on_request_headers_sent"); gpr_log(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
} }
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]); s->op_done[OP_SEND_INITIAL_METADATA] = true;
s->cronet_send_state = CRONET_SEND_HEADER;
next_send_step(s); next_send_step(s);
} }
@ -293,11 +355,13 @@ static cronet_bidirectional_stream_callback callbacks = {
on_canceled}; on_canceled};
static void invoke_closing_callback(stream_obj *s) { static void invoke_closing_callback(stream_obj *s) {
if (!is_op_complete(s)) return;
if (s->imb_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(&s->imb, grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
s->recv_trailing_metadata); s->recv_trailing_metadata);
if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
} }
enqueue_callback(s->on_complete);
} }
static void set_recv_state(stream_obj *s, enum recv_state state) { static void set_recv_state(stream_obj *s, enum recv_state state) {
@ -309,29 +373,35 @@ static void set_recv_state(stream_obj *s, enum recv_state state) {
// This is invoked from perform_stream_op, and all on_xxxx callbacks. // This is invoked from perform_stream_op, and all on_xxxx callbacks.
static void next_recv_step(stream_obj *s, enum e_caller caller) { static void next_recv_step(stream_obj *s, enum e_caller caller) {
// gpr_log(GPR_DEBUG, "locking mutex %p", &s->recv_mu);
gpr_mu_lock(&s->recv_mu); gpr_mu_lock(&s->recv_mu);
switch (s->cronet_recv_state) { switch (s->cronet_recv_state) {
case CRONET_RECV_IDLE: case CRONET_RECV_IDLE:
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE"); gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE, caller=%d",
caller);
} }
if (caller == PERFORM_STREAM_OP || if (caller == PERFORM_STREAM_OP ||
caller == ON_RESPONSE_HEADERS_RECEIVED) { caller == ON_RESPONSE_HEADERS_RECEIVED) {
if (s->read_closed && s->response_trailers_received) { if (s->read_closed && s->op_done[OP_RECV_TRAILING_METADATA]) {
invoke_closing_callback(s);
set_recv_state(s, CRONET_RECV_CLOSED); set_recv_state(s, CRONET_RECV_CLOSED);
} else if (s->response_headers_received == true && } else if (s->op_done[OP_RECV_INITIAL_METADATA] == true &&
s->read_requested == true) { s->op_requested[OP_RECV_MESSAGE]) {
set_recv_state(s, CRONET_RECV_READ_LENGTH); set_recv_state(s, CRONET_RECV_READ_LENGTH);
s->total_read_bytes = s->remaining_read_bytes = s->total_read_bytes = s->remaining_read_bytes =
GRPC_HEADER_SIZE_IN_BYTES; GRPC_HEADER_SIZE_IN_BYTES;
GPR_ASSERT(s->read_buffer); GPR_ASSERT(s->read_buffer);
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read(%p,%p,%d)",
s->cbs, s->read_buffer, s->remaining_read_bytes);
} }
cronet_bidirectional_stream_read(s->cbs, s->read_buffer, cronet_bidirectional_stream_read(s->cbs, s->read_buffer,
s->remaining_read_bytes); s->remaining_read_bytes);
} }
} else if (caller == ON_RESPONSE_TRAILERS_RECEIVED) {
// We get here when we receive trailers directly, i.e. without
// going through a data read operation.
set_recv_state(s, CRONET_RECV_CLOSED);
} }
break; break;
case CRONET_RECV_READ_LENGTH: case CRONET_RECV_READ_LENGTH:
@ -340,8 +410,8 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) {
} }
if (caller == ON_READ_COMPLETE) { if (caller == ON_READ_COMPLETE) {
if (s->read_closed) { if (s->read_closed) {
invoke_closing_callback(s); enqueue_callback(s->cb_recv_message_ready);
enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); s->op_done[OP_RECV_MESSAGE] = true;
set_recv_state(s, CRONET_RECV_CLOSED); set_recv_state(s, CRONET_RECV_CLOSED);
} else { } else {
GPR_ASSERT(s->remaining_read_bytes == 0); GPR_ASSERT(s->remaining_read_bytes == 0);
@ -352,7 +422,8 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) {
gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes); gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes);
GPR_ASSERT(s->read_buffer); GPR_ASSERT(s->read_buffer);
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read(%p,%p,%d)",
s->cbs, s->read_buffer, s->remaining_read_bytes);
} }
if (s->remaining_read_bytes > 0) { if (s->remaining_read_bytes > 0) {
cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
@ -361,8 +432,8 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) {
// Calling the closing callback directly since this is a 0 byte read // Calling the closing callback directly since this is a 0 byte read
// for an empty message. // for an empty message.
process_recv_message(s, NULL); process_recv_message(s, NULL);
enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); enqueue_callback(s->cb_recv_message_ready);
invoke_closing_callback(s); s->op_done[OP_RECV_MESSAGE] = true;
set_recv_state(s, CRONET_RECV_CLOSED); set_recv_state(s, CRONET_RECV_CLOSED);
} }
} }
@ -386,7 +457,8 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) {
uint8_t *p = (uint8_t *)s->read_buffer; uint8_t *p = (uint8_t *)s->read_buffer;
process_recv_message(s, p); process_recv_message(s, p);
set_recv_state(s, CRONET_RECV_IDLE); set_recv_state(s, CRONET_RECV_IDLE);
enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); enqueue_callback(s->cb_recv_message_ready);
s->op_done[OP_RECV_MESSAGE] = true;
} }
} }
break; break;
@ -396,7 +468,9 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) {
GPR_ASSERT(0); // Should not reach here GPR_ASSERT(0); // Should not reach here
break; break;
} }
invoke_closing_callback(s);
gpr_mu_unlock(&s->recv_mu); gpr_mu_unlock(&s->recv_mu);
// gpr_log(GPR_DEBUG, "unlocking mutex %p", &s->recv_mu);
} }
// This function takes the data from s->write_slice_buffer and assembles into // This function takes the data from s->write_slice_buffer and assembles into
@ -417,27 +491,69 @@ static void create_grpc_frame(stream_obj *s) {
// append actual data // append actual data
memcpy(p, raw_data, length); memcpy(p, raw_data, length);
} }
// Return false if there is no data to write
static void do_write(stream_obj *s) { static bool do_write(stream_obj *s) {
gpr_slice_buffer *sb = &s->write_slice_buffer; gpr_slice_buffer *sb = &s->write_slice_buffer;
GPR_ASSERT(sb->count <= 1); GPR_ASSERT(sb->count <= 1);
if (sb->count > 0) { if (sb->count > 0) {
create_grpc_frame(s); create_grpc_frame(s);
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write(%p,%p,%d,%d)",
s->cbs, s->write_buffer, (int)s->write_buffer_size, false);
} }
cronet_bidirectional_stream_write(s->cbs, s->write_buffer, cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
(int)s->write_buffer_size, false); (int)s->write_buffer_size, false);
return true;
} else {
return false;
}
}
static bool init_cronet_stream(stream_obj *s, grpc_transport *gt) {
GPR_ASSERT(s->cbs == NULL);
grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
GPR_ASSERT(ct->engine);
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
}
s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
GPR_ASSERT(s->cbs);
s->read_closed = false;
for (int i = 0; i < OP_NUM_CALLBACKS; i++) {
s->op_requested[i] = s->op_done[i] = false;
} }
s->cronet_send_state = CRONET_SEND_IDLE;
s->cronet_recv_state = CRONET_RECV_IDLE;
}
static bool do_close_connection(stream_obj *s) {
s->op_done[OP_SEND_TRAILING_METADATA] = true;
if (s->cbs) {
// Send an "empty" write to the far end to signal that we're done.
// This will induce the server to send down trailers.
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write length 0");
}
cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
return true;
} else {
// We never created a stream. This was probably an empty request.
invoke_closing_callback(s);
return true;
}
return false;
} }
// //
static void next_send_step(stream_obj *s) { static void next_send_step(stream_obj *s) {
gpr_log(GPR_DEBUG, "next_send_step cronet_send_state=%d",
s->cronet_send_state);
switch (s->cronet_send_state) { switch (s->cronet_send_state) {
case CRONET_SEND_IDLE: case CRONET_SEND_IDLE:
GPR_ASSERT( GPR_ASSERT(
s->cbs); // cronet_bidirectional_stream is not initialized yet. s->cbs); // cronet_bidirectional_stream is not initialized yet.
s->cronet_send_state = CRONET_REQ_STARTED; s->cronet_send_state = CRONET_SEND_HEADER;
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url); gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url);
} }
@ -447,11 +563,51 @@ static void next_send_step(stream_obj *s) {
gpr_free(s->header_array.headers); gpr_free(s->header_array.headers);
break; break;
case CRONET_SEND_HEADER: case CRONET_SEND_HEADER:
do_write(s); if (s->op_requested[OP_CANCEL_ERROR]) {
s->cronet_send_state = CRONET_WRITE; cronet_bidirectional_stream_cancel(s->cbs);
gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
s->cronet_send_state = CRONET_WAIT_FOR_CANCEL;
} else if (do_write(s) == false &&
s->op_requested[OP_SEND_TRAILING_METADATA]) {
if (do_close_connection(s)) {
s->cronet_send_state = CRONET_STREAM_CLOSED;
}
} else {
s->cronet_send_state = CRONET_WRITE_PENDING;
}
break;
case CRONET_WRITE_PENDING:
if (s->op_requested[OP_CANCEL_ERROR]) {
cronet_bidirectional_stream_cancel(s->cbs);
gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
s->cronet_send_state = CRONET_WAIT_FOR_CANCEL;
} else if (do_write(s) == false &&
s->op_requested[OP_SEND_TRAILING_METADATA]) {
if (do_close_connection(s)) {
s->cronet_send_state = CRONET_STREAM_CLOSED;
}
} else {
s->cronet_send_state = CRONET_WRITE_COMPLETED;
}
break; break;
case CRONET_WRITE_COMPLETED: case CRONET_WRITE_COMPLETED:
do_write(s); if (s->op_requested[OP_CANCEL_ERROR]) {
cronet_bidirectional_stream_cancel(s->cbs);
gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
s->cronet_send_state = CRONET_WAIT_FOR_CANCEL;
} else if (do_write(s) == false &&
s->op_requested[OP_SEND_TRAILING_METADATA]) {
if (do_close_connection(s)) {
s->cronet_send_state = CRONET_STREAM_CLOSED;
}
}
break;
case CRONET_STREAM_CLOSED:
s->cronet_send_state = CRONET_SEND_IDLE;
break;
case CRONET_WAIT_FOR_CANCEL:
invoke_closing_callback(s);
s->cronet_send_state = CRONET_SEND_IDLE;
break; break;
default: default:
GPR_ASSERT(0); GPR_ASSERT(0);
@ -493,7 +649,7 @@ static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
// Create URL by appending :path value to the hostname // Create URL by appending :path value to the hostname
gpr_asprintf(&s->url, "https://%s%s", host, value); gpr_asprintf(&s->url, "https://%s%s", host, value);
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "extracted URL = %s", s->url); // gpr_log(GPR_DEBUG, "extracted URL = %s", s->url);
} }
continue; continue;
} }
@ -511,6 +667,13 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
GPR_ASSERT(ct->engine); GPR_ASSERT(ct->engine);
stream_obj *s = (stream_obj *)gs; stream_obj *s = (stream_obj *)gs;
// Initialize a cronet bidirectional stream if it doesn't exist.
if (s->cbs == NULL) {
init_cronet_stream(s, gt);
}
s->on_complete = op->on_complete;
if (op->recv_trailing_metadata) { if (op->recv_trailing_metadata) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
@ -518,8 +681,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
op->on_complete); op->on_complete);
} }
s->recv_trailing_metadata = op->recv_trailing_metadata; s->recv_trailing_metadata = op->recv_trailing_metadata;
GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]); s->op_requested[OP_RECV_TRAILING_METADATA] = true;
s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
} }
if (op->recv_message) { if (op->recv_message) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
@ -527,24 +689,19 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
op->on_complete); op->on_complete);
} }
s->recv_message = (grpc_byte_buffer **)op->recv_message; s->recv_message = (grpc_byte_buffer **)op->recv_message;
GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]); s->cb_recv_message_ready = op->recv_message_ready;
GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]); s->op_requested[OP_RECV_MESSAGE] = true;
s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
s->read_requested = true;
next_recv_step(s, PERFORM_STREAM_OP);
} }
if (op->recv_initial_metadata) { if (op->recv_initial_metadata) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p", gpr_log(GPR_DEBUG,
op->on_complete); "perform_stream_op - recv_initial_metadata on_complete=%p, "
"on_ready=%p",
op->on_complete, op->recv_initial_metadata_ready);
} }
s->recv_initial_metadata = op->recv_initial_metadata; s->recv_initial_metadata = op->recv_initial_metadata;
GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]); s->cb_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]); s->op_requested[OP_RECV_INITIAL_METADATA] = true;
s->callback_list[CB_RECV_INITIAL_METADATA][0] =
op->recv_initial_metadata_ready;
s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
} }
if (op->send_initial_metadata) { if (op->send_initial_metadata) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
@ -558,8 +715,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->header_array.count = s->num_headers; s->header_array.count = s->num_headers;
s->header_array.capacity = s->num_headers; s->header_array.capacity = s->num_headers;
s->header_array.headers = s->headers; s->header_array.headers = s->headers;
GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]); s->op_requested[OP_SEND_INITIAL_METADATA] = true;
s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
} }
if (op->send_message) { if (op->send_message) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
@ -572,21 +728,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// TODO (makdharma): add compression support // TODO (makdharma): add compression support
GPR_ASSERT(op->send_message->flags == 0); GPR_ASSERT(op->send_message->flags == 0);
gpr_slice_buffer_add(&s->write_slice_buffer, s->slice); gpr_slice_buffer_add(&s->write_slice_buffer, s->slice);
if (s->cbs == NULL) { s->op_requested[OP_SEND_MESSAGE] = true;
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
}
s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
GPR_ASSERT(s->cbs);
s->read_closed = false;
s->response_trailers_received = false;
s->response_headers_received = false;
s->cronet_send_state = CRONET_SEND_IDLE;
s->cronet_recv_state = CRONET_RECV_IDLE;
}
GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
next_send_step(s);
} }
if (op->send_trailing_metadata) { if (op->send_trailing_metadata) {
if (grpc_cronet_trace) { if (grpc_cronet_trace) {
@ -594,27 +736,24 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
"perform_stream_op - send_trailing_metadata: on_complete=%p", "perform_stream_op - send_trailing_metadata: on_complete=%p",
op->on_complete); op->on_complete);
} }
GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]); s->op_requested[OP_SEND_TRAILING_METADATA] = true;
s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
if (s->cbs) {
// Send an "empty" write to the far end to signal that we're done.
// This will induce the server to send down trailers.
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
} }
cronet_bidirectional_stream_write(s->cbs, "abc", 0, true); if (op->cancel_error) {
} else { if (grpc_cronet_trace) {
// We never created a stream. This was probably an empty request. gpr_log(GPR_DEBUG, "perform_stream_op - cancel_error: on_complete=%p",
invoke_closing_callback(s); op->on_complete);
} }
s->op_requested[OP_CANCEL_ERROR] = true;
} }
next_send_step(s);
next_recv_step(s, PERFORM_STREAM_OP);
} }
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount, grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) { const void *server_data) {
stream_obj *s = (stream_obj *)gs; stream_obj *s = (stream_obj *)gs;
memset(s->callback_list, 0, sizeof(s->callback_list)); memset(s, 0, sizeof(stream_obj));
s->cbs = NULL; s->cbs = NULL;
gpr_mu_init(&s->recv_mu); gpr_mu_init(&s->recv_mu);
s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
@ -636,6 +775,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free(s->read_buffer); gpr_free(s->read_buffer);
gpr_free(s->write_buffer); gpr_free(s->write_buffer);
gpr_free(s->url); gpr_free(s->url);
gpr_log(GPR_DEBUG, "destroying %p", &s->recv_mu);
gpr_mu_destroy(&s->recv_mu); gpr_mu_destroy(&s->recv_mu);
if (and_free_memory) { if (and_free_memory) {
gpr_free(and_free_memory); gpr_free(and_free_memory);
@ -650,13 +790,28 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
} }
} }
static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "Unimplemented method");
}
return NULL;
}
static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "Unimplemented method");
}
return NULL;
}
const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
"cronet_http", "cronet_http",
init_stream, init_stream,
set_pollset_do_nothing, set_pollset_do_nothing,
set_pollset_set_do_nothing, set_pollset_set_do_nothing,
perform_stream_op, perform_stream_op,
NULL, perform_op,
destroy_stream, destroy_stream,
destroy_transport, destroy_transport,
NULL}; get_peer};

Loading…
Cancel
Save