From 6b86080912d347efd336d9ebba61439cd7950ed6 Mon Sep 17 00:00:00 2001 From: Makarand Dharmapurikar Date: Mon, 25 Jul 2016 16:10:35 -0700 Subject: [PATCH] fixing issues from cronet end to end testing changed core logic for executing multiple ops. added call cancellation logic simplified the code. --- .../cronet/transport/cronet_transport.c | 373 +++++++++++++----- 1 file changed, 264 insertions(+), 109 deletions(-) diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 25d8aca2508..d589d750be3 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -65,10 +65,11 @@ typedef struct grpc_cronet_transport grpc_cronet_transport; enum send_state { CRONET_SEND_IDLE = 0, - CRONET_REQ_STARTED, CRONET_SEND_HEADER, - CRONET_WRITE, + CRONET_WRITE_PENDING, CRONET_WRITE_COMPLETED, + CRONET_WAIT_FOR_CANCEL, + CRONET_STREAM_CLOSED, }; enum recv_state { @@ -91,13 +92,14 @@ enum e_caller { }; enum callback_id { - CB_SEND_INITIAL_METADATA = 0, - CB_SEND_MESSAGE, - CB_SEND_TRAILING_METADATA, - CB_RECV_MESSAGE, - CB_RECV_INITIAL_METADATA, - CB_RECV_TRAILING_METADATA, - CB_NUM_CALLBACKS + OP_SEND_INITIAL_METADATA = 0, + OP_SEND_MESSAGE, + OP_SEND_TRAILING_METADATA, + OP_RECV_MESSAGE, + OP_RECV_INITIAL_METADATA, + OP_RECV_TRAILING_METADATA, + OP_CANCEL_ERROR, + OP_NUM_CALLBACKS }; struct stream_obj { @@ -117,23 +119,29 @@ struct stream_obj { // Hold the URL char *url; - bool response_headers_received; - bool read_requested; - bool response_trailers_received; + // One bit per operation + bool op_requested[OP_NUM_CALLBACKS]; + bool op_done[OP_NUM_CALLBACKS]; + // Set to true when server indicates no more data will be sent bool read_closed; // Recv message stuff grpc_byte_buffer **recv_message; // Initial metadata stuff grpc_metadata_batch *recv_initial_metadata; + grpc_chttp2_incoming_metadata_buffer initial_metadata; // Trailing metadata stuff grpc_metadata_batch *recv_trailing_metadata; grpc_chttp2_incoming_metadata_buffer imb; + bool imb_valid; // true if there are any valid entries in imb. // This mutex protects receive state machine execution gpr_mu recv_mu; - // we can queue up up to 2 callbacks for each OP - grpc_closure *callback_list[CB_NUM_CALLBACKS][2]; + + // Callbacks to be called when operations complete + grpc_closure *cb_recv_initial_metadata_ready; + grpc_closure *cb_recv_message_ready; + grpc_closure *on_complete; // storage for header cronet_bidirectional_stream_header *headers; @@ -156,34 +164,63 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset_set *pollset_set) {} -static void enqueue_callbacks(grpc_closure *callback_list[]) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - if (callback_list[0]) { - grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); - callback_list[0] = NULL; +// Client creates a bunch of operations and invokes "call_start_batch" +// call_start_batch creates a stream_op structure. this structure has info +// needed for executing all the ops. It has on_complete callback that needs +// to be called when all ops are executed. This function keeps track of all +// outstanding operations. It returns true if all operations that were part of +// the stream_op have been completed. +static bool is_op_complete(stream_obj *s) { + int i; + // Check if any requested op is pending + for (i = 0; i < OP_NUM_CALLBACKS; i++) { + if (s->op_requested[i] && !s->op_done[i]) { + gpr_log(GPR_DEBUG, "is_op_complete is FALSE because of %d", i); + return false; + } } - if (callback_list[1]) { - grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); - callback_list[1] = NULL; + // Clear the requested/done bits and return true + for (i = 0; i < OP_NUM_CALLBACKS; i++) { + s->op_requested[i] = s->op_done[i] = false; } + return true; +} + +static void enqueue_callback(grpc_closure *callback) { + GPR_ASSERT(callback); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx_sched(&exec_ctx, callback, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_finish(&exec_ctx); } static void on_canceled(cronet_bidirectional_stream *stream) { if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "on_canceled %p", stream); + gpr_log(GPR_DEBUG, "on_canceled(%p)", stream); } + stream_obj *s = (stream_obj *)stream->annotation; + s->op_done[OP_CANCEL_ERROR] = true; + + // Terminate any read callback + if (s->cb_recv_message_ready) { + enqueue_callback(s->cb_recv_message_ready); + s->cb_recv_message_ready = 0; + s->op_done[OP_RECV_MESSAGE] = true; + } + // Don't wait to get any trailing metadata + s->op_done[OP_RECV_TRAILING_METADATA] = true; + + next_send_step(s); } static void on_failed(cronet_bidirectional_stream *stream, int net_error) { if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error); + gpr_log(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); } } static void on_succeeded(cronet_bidirectional_stream *stream) { if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "on_succeeded %p", stream); + gpr_log(GPR_DEBUG, "on_succeeded(%p)", stream); } } @@ -191,31 +228,38 @@ static void on_response_trailers_received( cronet_bidirectional_stream *stream, const cronet_bidirectional_stream_header_array *trailers) { if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: on_response_trailers_received"); + gpr_log(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, + trailers); } stream_obj *s = (stream_obj *)stream->annotation; memset(&s->imb, 0, sizeof(s->imb)); + s->imb_valid = false; grpc_chttp2_incoming_metadata_buffer_init(&s->imb); unsigned int i = 0; for (i = 0; i < trailers->count; i++) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, + trailers->headers[i].value); + } + grpc_chttp2_incoming_metadata_buffer_add( &s->imb, grpc_mdelem_from_metadata_strings( grpc_mdstr_from_string(trailers->headers[i].key), grpc_mdstr_from_string(trailers->headers[i].value))); + s->imb_valid = true; } - s->response_trailers_received = true; + s->op_done[OP_RECV_TRAILING_METADATA] = true; next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED); } static void on_write_completed(cronet_bidirectional_stream *stream, const char *data) { if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "W: on_write_completed"); + gpr_log(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); } stream_obj *s = (stream_obj *)stream->annotation; - enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]); - s->cronet_send_state = CRONET_WRITE_COMPLETED; + s->op_done[OP_SEND_MESSAGE] = true; next_send_step(s); } @@ -245,14 +289,14 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data, int count) { stream_obj *s = (stream_obj *)stream->annotation; if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d", - count, s->total_read_bytes, s->remaining_read_bytes); + gpr_log(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); } if (count > 0) { GPR_ASSERT(s->recv_message); s->remaining_read_bytes -= count; next_recv_step(s, ON_READ_COMPLETE); } else { + gpr_log(GPR_DEBUG, "read_closed = true"); s->read_closed = true; next_recv_step(s, ON_READ_COMPLETE); } @@ -263,21 +307,39 @@ static void on_response_headers_received( const cronet_bidirectional_stream_header_array *headers, const char *negotiated_protocol) { if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: on_response_headers_received"); + gpr_log(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, + headers, negotiated_protocol); } stream_obj *s = (stream_obj *)stream->annotation; - enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]); - s->response_headers_received = true; + + memset(&s->initial_metadata, 0, sizeof(s->initial_metadata)); + grpc_chttp2_incoming_metadata_buffer_init(&s->initial_metadata); + unsigned int i = 0; + for (i = 0; i < headers->count; i++) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "header key=%s, value=%s", headers->headers[i].key, + headers->headers[i].value); + } + grpc_chttp2_incoming_metadata_buffer_add( + &s->initial_metadata, + grpc_mdelem_from_metadata_strings( + grpc_mdstr_from_string(headers->headers[i].key), + grpc_mdstr_from_string(headers->headers[i].value))); + } + + grpc_chttp2_incoming_metadata_buffer_publish(&s->initial_metadata, + s->recv_initial_metadata); + enqueue_callback(s->cb_recv_initial_metadata_ready); + s->op_done[OP_RECV_INITIAL_METADATA] = true; next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED); } static void on_request_headers_sent(cronet_bidirectional_stream *stream) { if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "W: on_request_headers_sent"); + gpr_log(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); } stream_obj *s = (stream_obj *)stream->annotation; - enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]); - s->cronet_send_state = CRONET_SEND_HEADER; + s->op_done[OP_SEND_INITIAL_METADATA] = true; next_send_step(s); } @@ -293,11 +355,13 @@ static cronet_bidirectional_stream_callback callbacks = { on_canceled}; static void invoke_closing_callback(stream_obj *s) { - grpc_chttp2_incoming_metadata_buffer_publish(&s->imb, - s->recv_trailing_metadata); - if (s->callback_list[CB_RECV_TRAILING_METADATA]) { - enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]); + if (!is_op_complete(s)) return; + + if (s->imb_valid) { + grpc_chttp2_incoming_metadata_buffer_publish(&s->imb, + s->recv_trailing_metadata); } + enqueue_callback(s->on_complete); } static void set_recv_state(stream_obj *s, enum recv_state state) { @@ -309,29 +373,35 @@ static void set_recv_state(stream_obj *s, enum recv_state state) { // This is invoked from perform_stream_op, and all on_xxxx callbacks. static void next_recv_step(stream_obj *s, enum e_caller caller) { + // gpr_log(GPR_DEBUG, "locking mutex %p", &s->recv_mu); gpr_mu_lock(&s->recv_mu); switch (s->cronet_recv_state) { case CRONET_RECV_IDLE: if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE"); + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE, caller=%d", + caller); } if (caller == PERFORM_STREAM_OP || caller == ON_RESPONSE_HEADERS_RECEIVED) { - if (s->read_closed && s->response_trailers_received) { - invoke_closing_callback(s); + if (s->read_closed && s->op_done[OP_RECV_TRAILING_METADATA]) { set_recv_state(s, CRONET_RECV_CLOSED); - } else if (s->response_headers_received == true && - s->read_requested == true) { + } else if (s->op_done[OP_RECV_INITIAL_METADATA] == true && + s->op_requested[OP_RECV_MESSAGE]) { set_recv_state(s, CRONET_RECV_READ_LENGTH); s->total_read_bytes = s->remaining_read_bytes = GRPC_HEADER_SIZE_IN_BYTES; GPR_ASSERT(s->read_buffer); if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read(%p,%p,%d)", + s->cbs, s->read_buffer, s->remaining_read_bytes); } cronet_bidirectional_stream_read(s->cbs, s->read_buffer, s->remaining_read_bytes); } + } else if (caller == ON_RESPONSE_TRAILERS_RECEIVED) { + // We get here when we receive trailers directly, i.e. without + // going through a data read operation. + set_recv_state(s, CRONET_RECV_CLOSED); } break; case CRONET_RECV_READ_LENGTH: @@ -340,8 +410,8 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) { } if (caller == ON_READ_COMPLETE) { if (s->read_closed) { - invoke_closing_callback(s); - enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + enqueue_callback(s->cb_recv_message_ready); + s->op_done[OP_RECV_MESSAGE] = true; set_recv_state(s, CRONET_RECV_CLOSED); } else { GPR_ASSERT(s->remaining_read_bytes == 0); @@ -352,7 +422,8 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) { gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes); GPR_ASSERT(s->read_buffer); if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read(%p,%p,%d)", + s->cbs, s->read_buffer, s->remaining_read_bytes); } if (s->remaining_read_bytes > 0) { cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, @@ -361,8 +432,8 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) { // Calling the closing callback directly since this is a 0 byte read // for an empty message. process_recv_message(s, NULL); - enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); - invoke_closing_callback(s); + enqueue_callback(s->cb_recv_message_ready); + s->op_done[OP_RECV_MESSAGE] = true; set_recv_state(s, CRONET_RECV_CLOSED); } } @@ -386,7 +457,8 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) { uint8_t *p = (uint8_t *)s->read_buffer; process_recv_message(s, p); set_recv_state(s, CRONET_RECV_IDLE); - enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + enqueue_callback(s->cb_recv_message_ready); + s->op_done[OP_RECV_MESSAGE] = true; } } break; @@ -396,7 +468,9 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) { GPR_ASSERT(0); // Should not reach here break; } + invoke_closing_callback(s); gpr_mu_unlock(&s->recv_mu); + // gpr_log(GPR_DEBUG, "unlocking mutex %p", &s->recv_mu); } // This function takes the data from s->write_slice_buffer and assembles into @@ -417,27 +491,69 @@ static void create_grpc_frame(stream_obj *s) { // append actual data memcpy(p, raw_data, length); } - -static void do_write(stream_obj *s) { +// Return false if there is no data to write +static bool do_write(stream_obj *s) { gpr_slice_buffer *sb = &s->write_slice_buffer; GPR_ASSERT(sb->count <= 1); if (sb->count > 0) { create_grpc_frame(s); if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write(%p,%p,%d,%d)", + s->cbs, s->write_buffer, (int)s->write_buffer_size, false); } cronet_bidirectional_stream_write(s->cbs, s->write_buffer, (int)s->write_buffer_size, false); + return true; + } else { + return false; + } +} + +static bool init_cronet_stream(stream_obj *s, grpc_transport *gt) { + GPR_ASSERT(s->cbs == NULL); + grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; + GPR_ASSERT(ct->engine); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create"); + } + s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks); + GPR_ASSERT(s->cbs); + s->read_closed = false; + + for (int i = 0; i < OP_NUM_CALLBACKS; i++) { + s->op_requested[i] = s->op_done[i] = false; + } + s->cronet_send_state = CRONET_SEND_IDLE; + s->cronet_recv_state = CRONET_RECV_IDLE; +} + +static bool do_close_connection(stream_obj *s) { + s->op_done[OP_SEND_TRAILING_METADATA] = true; + if (s->cbs) { + // Send an "empty" write to the far end to signal that we're done. + // This will induce the server to send down trailers. + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write length 0"); + } + cronet_bidirectional_stream_write(s->cbs, "abc", 0, true); + return true; + } else { + // We never created a stream. This was probably an empty request. + invoke_closing_callback(s); + return true; } + return false; } // static void next_send_step(stream_obj *s) { + gpr_log(GPR_DEBUG, "next_send_step cronet_send_state=%d", + s->cronet_send_state); switch (s->cronet_send_state) { case CRONET_SEND_IDLE: GPR_ASSERT( s->cbs); // cronet_bidirectional_stream is not initialized yet. - s->cronet_send_state = CRONET_REQ_STARTED; + s->cronet_send_state = CRONET_SEND_HEADER; if (grpc_cronet_trace) { gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url); } @@ -447,11 +563,51 @@ static void next_send_step(stream_obj *s) { gpr_free(s->header_array.headers); break; case CRONET_SEND_HEADER: - do_write(s); - s->cronet_send_state = CRONET_WRITE; + if (s->op_requested[OP_CANCEL_ERROR]) { + cronet_bidirectional_stream_cancel(s->cbs); + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs); + s->cronet_send_state = CRONET_WAIT_FOR_CANCEL; + } else if (do_write(s) == false && + s->op_requested[OP_SEND_TRAILING_METADATA]) { + if (do_close_connection(s)) { + s->cronet_send_state = CRONET_STREAM_CLOSED; + } + } else { + s->cronet_send_state = CRONET_WRITE_PENDING; + } + break; + case CRONET_WRITE_PENDING: + if (s->op_requested[OP_CANCEL_ERROR]) { + cronet_bidirectional_stream_cancel(s->cbs); + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs); + s->cronet_send_state = CRONET_WAIT_FOR_CANCEL; + } else if (do_write(s) == false && + s->op_requested[OP_SEND_TRAILING_METADATA]) { + if (do_close_connection(s)) { + s->cronet_send_state = CRONET_STREAM_CLOSED; + } + } else { + s->cronet_send_state = CRONET_WRITE_COMPLETED; + } break; case CRONET_WRITE_COMPLETED: - do_write(s); + if (s->op_requested[OP_CANCEL_ERROR]) { + cronet_bidirectional_stream_cancel(s->cbs); + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs); + s->cronet_send_state = CRONET_WAIT_FOR_CANCEL; + } else if (do_write(s) == false && + s->op_requested[OP_SEND_TRAILING_METADATA]) { + if (do_close_connection(s)) { + s->cronet_send_state = CRONET_STREAM_CLOSED; + } + } + break; + case CRONET_STREAM_CLOSED: + s->cronet_send_state = CRONET_SEND_IDLE; + break; + case CRONET_WAIT_FOR_CANCEL: + invoke_closing_callback(s); + s->cronet_send_state = CRONET_SEND_IDLE; break; default: GPR_ASSERT(0); @@ -493,7 +649,7 @@ static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head, // Create URL by appending :path value to the hostname gpr_asprintf(&s->url, "https://%s%s", host, value); if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "extracted URL = %s", s->url); + // gpr_log(GPR_DEBUG, "extracted URL = %s", s->url); } continue; } @@ -511,6 +667,13 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; GPR_ASSERT(ct->engine); stream_obj *s = (stream_obj *)gs; + // Initialize a cronet bidirectional stream if it doesn't exist. + if (s->cbs == NULL) { + init_cronet_stream(s, gt); + } + + s->on_complete = op->on_complete; + if (op->recv_trailing_metadata) { if (grpc_cronet_trace) { gpr_log(GPR_DEBUG, @@ -518,8 +681,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, op->on_complete); } s->recv_trailing_metadata = op->recv_trailing_metadata; - GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]); - s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete; + s->op_requested[OP_RECV_TRAILING_METADATA] = true; } if (op->recv_message) { if (grpc_cronet_trace) { @@ -527,24 +689,19 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, op->on_complete); } s->recv_message = (grpc_byte_buffer **)op->recv_message; - GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]); - GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]); - s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready; - s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete; - s->read_requested = true; - next_recv_step(s, PERFORM_STREAM_OP); + s->cb_recv_message_ready = op->recv_message_ready; + s->op_requested[OP_RECV_MESSAGE] = true; } if (op->recv_initial_metadata) { if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p", - op->on_complete); + gpr_log(GPR_DEBUG, + "perform_stream_op - recv_initial_metadata on_complete=%p, " + "on_ready=%p", + op->on_complete, op->recv_initial_metadata_ready); } s->recv_initial_metadata = op->recv_initial_metadata; - GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]); - GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]); - s->callback_list[CB_RECV_INITIAL_METADATA][0] = - op->recv_initial_metadata_ready; - s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete; + s->cb_recv_initial_metadata_ready = op->recv_initial_metadata_ready; + s->op_requested[OP_RECV_INITIAL_METADATA] = true; } if (op->send_initial_metadata) { if (grpc_cronet_trace) { @@ -558,8 +715,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->header_array.count = s->num_headers; s->header_array.capacity = s->num_headers; s->header_array.headers = s->headers; - GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]); - s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete; + s->op_requested[OP_SEND_INITIAL_METADATA] = true; } if (op->send_message) { if (grpc_cronet_trace) { @@ -572,21 +728,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, // TODO (makdharma): add compression support GPR_ASSERT(op->send_message->flags == 0); gpr_slice_buffer_add(&s->write_slice_buffer, s->slice); - if (s->cbs == NULL) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create"); - } - s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks); - GPR_ASSERT(s->cbs); - s->read_closed = false; - s->response_trailers_received = false; - s->response_headers_received = false; - s->cronet_send_state = CRONET_SEND_IDLE; - s->cronet_recv_state = CRONET_RECV_IDLE; - } - GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]); - s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete; - next_send_step(s); + s->op_requested[OP_SEND_MESSAGE] = true; } if (op->send_trailing_metadata) { if (grpc_cronet_trace) { @@ -594,27 +736,24 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, "perform_stream_op - send_trailing_metadata: on_complete=%p", op->on_complete); } - GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]); - s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete; - if (s->cbs) { - // Send an "empty" write to the far end to signal that we're done. - // This will induce the server to send down trailers. - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); - } - cronet_bidirectional_stream_write(s->cbs, "abc", 0, true); - } else { - // We never created a stream. This was probably an empty request. - invoke_closing_callback(s); + s->op_requested[OP_SEND_TRAILING_METADATA] = true; + } + if (op->cancel_error) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - cancel_error: on_complete=%p", + op->on_complete); } + s->op_requested[OP_CANCEL_ERROR] = true; } + next_send_step(s); + next_recv_step(s, PERFORM_STREAM_OP); } static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data) { stream_obj *s = (stream_obj *)gs; - memset(s->callback_list, 0, sizeof(s->callback_list)); + memset(s, 0, sizeof(stream_obj)); s->cbs = NULL; gpr_mu_init(&s->recv_mu); s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); @@ -636,6 +775,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(s->read_buffer); gpr_free(s->write_buffer); gpr_free(s->url); + gpr_log(GPR_DEBUG, "destroying %p", &s->recv_mu); gpr_mu_destroy(&s->recv_mu); if (and_free_memory) { gpr_free(and_free_memory); @@ -650,13 +790,28 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { } } +static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Unimplemented method"); + } + return NULL; +} + +static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_transport_op *op) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Unimplemented method"); + } + return NULL; +} + const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), "cronet_http", init_stream, set_pollset_do_nothing, set_pollset_set_do_nothing, perform_stream_op, - NULL, + perform_op, destroy_stream, destroy_transport, - NULL}; + get_peer};