Dynamically enable packet coalescing by channel args

pull/9246/head
Muxi Yan 8 years ago
parent eb5ee45eec
commit 0a2fae9aed
  1. 1
      BUILD
  2. 1
      build.yaml
  3. 3
      gRPC-Core.podspec
  4. 4
      include/grpc/impl/codegen/grpc_types.h
  5. 13
      src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
  6. 181
      src/core/ext/transport/cronet/transport/cronet_transport.c
  7. 43
      src/core/ext/transport/cronet/transport/cronet_transport.h
  8. 20
      src/objective-c/tests/CronetUnitTests/CronetUnitTests.m
  9. 1
      src/objective-c/tests/Podfile
  10. 3
      templates/gRPC-Core.podspec.template
  11. 4
      tools/run_tests/generated/sources_and_headers.json

@ -1028,6 +1028,7 @@ grpc_cc_library(
], ],
hdrs = [ hdrs = [
"third_party/Cronet/bidirectional_stream_c.h", "third_party/Cronet/bidirectional_stream_c.h",
"src/core/ext/transport/cronet/transport/cronet_transport.h",
], ],
language = "c", language = "c",
public_hdrs = [ public_hdrs = [

@ -684,6 +684,7 @@ filegroups:
- include/grpc/grpc_security.h - include/grpc/grpc_security.h
- include/grpc/grpc_security_constants.h - include/grpc/grpc_security_constants.h
headers: headers:
- src/core/ext/transport/cronet/transport/cronet_transport.h
- third_party/Cronet/bidirectional_stream_c.h - third_party/Cronet/bidirectional_stream_c.h
src: src:
- src/core/ext/transport/cronet/client/secure/cronet_channel_create.c - src/core/ext/transport/cronet/client/secure/cronet_channel_create.c

@ -848,7 +848,8 @@ Pod::Spec.new do |s|
s.subspec 'Cronet-Interface' do |ss| s.subspec 'Cronet-Interface' do |ss|
ss.header_mappings_dir = 'include/grpc' ss.header_mappings_dir = 'include/grpc'
ss.source_files = 'include/grpc/grpc_cronet.h' ss.source_files = 'include/grpc/grpc_cronet.h',
'src/core/ext/transport/cronet/transport/cronet_transport.h'
end end
s.subspec 'Cronet-Implementation' do |ss| s.subspec 'Cronet-Implementation' do |ss|

@ -217,6 +217,10 @@ typedef struct {
#define GRPC_ARG_LB_POLICY_NAME "grpc.lb_policy_name" #define GRPC_ARG_LB_POLICY_NAME "grpc.lb_policy_name"
/** The grpc_socket_mutator instance that set the socket options. A pointer. */ /** The grpc_socket_mutator instance that set the socket options. A pointer. */
#define GRPC_ARG_SOCKET_MUTATOR "grpc.socket_mutator" #define GRPC_ARG_SOCKET_MUTATOR "grpc.socket_mutator"
/** If non-zero, Cronet transport will coalesce packets to fewer frames when
* possible. */
#define GRPC_ARG_USE_CRONET_PACKET_COALESCING \
"grpc.use_cronet_packet_coalescing"
/** \} */ /** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a /** Result of a grpc call. If the caller satisfies the prerequisites of a

@ -39,6 +39,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/ext/transport/cronet/transport/cronet_transport.h"
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/transport_impl.h" #include "src/core/lib/transport/transport_impl.h"
@ -54,16 +55,14 @@ extern grpc_transport_vtable grpc_cronet_vtable;
GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( GRPCAPI grpc_channel *grpc_cronet_secure_channel_create(
void *engine, const char *target, const grpc_channel_args *args, void *engine, const char *target, const grpc_channel_args *args,
void *reserved) { void *reserved) {
cronet_transport *ct = gpr_malloc(sizeof(cronet_transport));
ct->base.vtable = &grpc_cronet_vtable;
ct->engine = engine;
ct->host = gpr_malloc(strlen(target) + 1);
strcpy(ct->host, target);
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"grpc_create_cronet_transport: stream_engine = %p, target=%s", engine, "grpc_create_cronet_transport: stream_engine = %p, target=%s", engine,
ct->host); target);
grpc_transport *ct =
grpc_create_cronet_transport(engine, target, args, reserved);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
return grpc_channel_create(&exec_ctx, target, args, return grpc_channel_create(&exec_ctx, target, args,
GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); GRPC_CLIENT_DIRECT_CHANNEL, ct);
} }

@ -112,6 +112,7 @@ struct grpc_cronet_transport {
grpc_transport base; /* must be first element in this structure */ grpc_transport base; /* must be first element in this structure */
stream_engine *engine; stream_engine *engine;
char *host; char *host;
bool use_packet_coalescing;
}; };
typedef struct grpc_cronet_transport grpc_cronet_transport; typedef struct grpc_cronet_transport grpc_cronet_transport;
@ -150,10 +151,8 @@ struct op_state {
bool state_callback_received[OP_NUM_OPS]; bool state_callback_received[OP_NUM_OPS];
bool fail_state; bool fail_state;
bool flush_read; bool flush_read;
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
bool flush_cronet_when_ready; bool flush_cronet_when_ready;
bool pending_write_for_trailer; bool pending_write_for_trailer;
#endif
bool unprocessed_send_message; bool unprocessed_send_message;
grpc_error *cancel_error; grpc_error *cancel_error;
/* data structure for storing data coming from server */ /* data structure for storing data coming from server */
@ -178,7 +177,7 @@ struct op_storage {
struct stream_obj { struct stream_obj {
struct op_and_state *oas; struct op_and_state *oas;
grpc_transport_stream_op *curr_op; grpc_transport_stream_op *curr_op;
grpc_cronet_transport curr_ct; grpc_cronet_transport *curr_ct;
grpc_stream *curr_gs; grpc_stream *curr_gs;
bidirectional_stream *cbs; bidirectional_stream *cbs;
bidirectional_stream_header_array header_array; bidirectional_stream_header_array header_array;
@ -415,6 +414,7 @@ static void on_succeeded(bidirectional_stream *stream) {
static void on_stream_ready(bidirectional_stream *stream) { static void on_stream_ready(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
@ -425,12 +425,12 @@ static void on_stream_ready(bidirectional_stream *stream) {
} }
/* Send the initial metadata on wire if there is no SEND_MESSAGE or /* Send the initial metadata on wire if there is no SEND_MESSAGE or
* SEND_TRAILING_METADATA ops pending */ * SEND_TRAILING_METADATA ops pending */
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING if (t->use_packet_coalescing) {
if (s->state.flush_cronet_when_ready) { if (s->state.flush_cronet_when_ready) {
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
bidirectional_stream_flush(stream); bidirectional_stream_flush(stream);
}
} }
#endif
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
@ -540,6 +540,7 @@ static void on_response_trailers_received(
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers); trailers);
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
memset(&s->state.rs.trailing_metadata, 0, memset(&s->state.rs.trailing_metadata, 0,
sizeof(s->state.rs.trailing_metadata)); sizeof(s->state.rs.trailing_metadata));
@ -568,10 +569,10 @@ static void on_response_trailers_received(
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
s->state.state_callback_received[OP_SEND_MESSAGE] = false; s->state.state_callback_received[OP_SEND_MESSAGE] = false;
bidirectional_stream_write(s->cbs, "", 0, true); bidirectional_stream_write(s->cbs, "", 0, true);
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING if (t->use_packet_coalescing) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
bidirectional_stream_flush(s->cbs); bidirectional_stream_flush(s->cbs);
#endif }
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
@ -695,8 +696,10 @@ static bool header_has_authority(grpc_linked_mdelem *head) {
executed. This is the heart of the state machine. executed. This is the heart of the state machine.
*/ */
static bool op_can_be_run(grpc_transport_stream_op *curr_op, static bool op_can_be_run(grpc_transport_stream_op *curr_op,
struct op_state *stream_state, struct stream_obj *s, struct op_state *op_state,
struct op_state *op_state, enum e_op_id op_id) { enum e_op_id op_id) {
struct op_state *stream_state = &s->state;
grpc_cronet_transport *t = s->curr_ct;
bool result = true; bool result = true;
/* When call is canceled, every op can be run, except under following /* When call is canceled, every op can be run, except under following
conditions conditions
@ -768,11 +771,9 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
result = false; result = false;
/* we haven't got on_write_completed for the send yet */ /* we haven't got on_write_completed for the send yet */
else if (stream_state->state_op_done[OP_SEND_MESSAGE] && else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
!stream_state->state_callback_received[OP_SEND_MESSAGE] !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING !(t->use_packet_coalescing &&
&& !stream_state->pending_write_for_trailer stream_state->pending_write_for_trailer))
#endif
)
result = false; result = false;
} else if (op_id == OP_CANCEL_ERROR) { } else if (op_id == OP_CANCEL_ERROR) {
/* already executed */ /* already executed */
@ -845,42 +846,41 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas) { struct op_and_state *oas) {
grpc_transport_stream_op *stream_op = &oas->op; grpc_transport_stream_op *stream_op = &oas->op;
struct stream_obj *s = oas->s; struct stream_obj *s = oas->s;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
struct op_state *stream_state = &s->state; struct op_state *stream_state = &s->state;
enum e_op_result result = NO_ACTION_POSSIBLE; enum e_op_result result = NO_ACTION_POSSIBLE;
if (stream_op->send_initial_metadata && if (stream_op->send_initial_metadata &&
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
OP_SEND_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
/* Start new cronet stream. It is destroyed in on_succeeded, on_canceled, /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
* on_failed */ * on_failed */
GPR_ASSERT(s->cbs == NULL); GPR_ASSERT(s->cbs == NULL);
GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]); GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, s->cbs = bidirectional_stream_create(t->engine, s->curr_gs,
&cronet_callbacks); &cronet_callbacks);
CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs); CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING if (t->use_packet_coalescing) {
bidirectional_stream_disable_auto_flush(s->cbs, true); bidirectional_stream_disable_auto_flush(s->cbs, true);
bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
#endif }
char *url = NULL; char *url = NULL;
const char *method = "POST"; const char *method = "POST";
s->header_array.headers = NULL; s->header_array.headers = NULL;
convert_metadata_to_cronet_headers( convert_metadata_to_cronet_headers(
stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, stream_op->send_initial_metadata->list.head, t->host, &url,
&s->header_array.headers, &s->header_array.count, &method); &s->header_array.headers, &s->header_array.count, &method);
s->header_array.capacity = s->header_array.count; s->header_array.capacity = s->header_array.count;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING if (t->use_packet_coalescing) {
if (!stream_op->send_message && !stream_op->send_trailing_metadata) { if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
s->state.flush_cronet_when_ready = true; s->state.flush_cronet_when_ready = true;
}
} }
#endif
result = ACTION_TAKEN_WITH_CALLBACK; result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_op->send_message && } else if (stream_op->send_message &&
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
OP_SEND_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
stream_state->unprocessed_send_message = false; stream_state->unprocessed_send_message = false;
if (stream_state->state_callback_received[OP_FAILED]) { if (stream_state->state_callback_received[OP_FAILED]) {
@ -913,19 +913,18 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
(int)write_buffer_size, false); (int)write_buffer_size, false);
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING if (t->use_packet_coalescing) {
if (!stream_op->send_trailing_metadata) { if (!stream_op->send_trailing_metadata) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
s->cbs); bidirectional_stream_flush(s->cbs);
bidirectional_stream_flush(s->cbs); result = ACTION_TAKEN_WITH_CALLBACK;
result = ACTION_TAKEN_WITH_CALLBACK; } else {
stream_state->pending_write_for_trailer = true;
result = ACTION_TAKEN_NO_CALLBACK;
}
} else { } else {
stream_state->pending_write_for_trailer = true; result = ACTION_TAKEN_WITH_CALLBACK;
result = ACTION_TAKEN_NO_CALLBACK;
} }
#else
result = ACTION_TAKEN_WITH_CALLBACK;
#endif
} else { } else {
result = NO_ACTION_POSSIBLE; result = NO_ACTION_POSSIBLE;
} }
@ -933,7 +932,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->state_op_done[OP_SEND_MESSAGE] = true; stream_state->state_op_done[OP_SEND_MESSAGE] = true;
oas->state.state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true;
} else if (stream_op->send_trailing_metadata && } else if (stream_op->send_trailing_metadata &&
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, s, &oas->state,
OP_SEND_TRAILING_METADATA)) { OP_SEND_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
if (stream_state->state_callback_received[OP_FAILED]) { if (stream_state->state_callback_received[OP_FAILED]) {
@ -944,15 +943,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
s->cbs); s->cbs);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
bidirectional_stream_write(s->cbs, "", 0, true); bidirectional_stream_write(s->cbs, "", 0, true);
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING if (t->use_packet_coalescing) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
bidirectional_stream_flush(s->cbs); bidirectional_stream_flush(s->cbs);
#endif }
result = ACTION_TAKEN_WITH_CALLBACK; result = ACTION_TAKEN_WITH_CALLBACK;
} }
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
} else if (stream_op->recv_initial_metadata && } else if (stream_op->recv_initial_metadata &&
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, s, &oas->state,
OP_RECV_INITIAL_METADATA)) { OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
@ -971,8 +970,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK; result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_op->recv_message && } else if (stream_op->recv_message &&
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
@ -1084,7 +1082,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
result = ACTION_TAKEN_NO_CALLBACK; result = ACTION_TAKEN_NO_CALLBACK;
} }
} else if (stream_op->recv_trailing_metadata && } else if (stream_op->recv_trailing_metadata &&
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, s, &oas->state,
OP_RECV_TRAILING_METADATA)) { OP_RECV_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
if (oas->s->state.rs.trailing_metadata_valid) { if (oas->s->state.rs.trailing_metadata_valid) {
@ -1096,8 +1094,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK; result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_op->cancel_error && } else if (stream_op->cancel_error &&
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
OP_CANCEL_ERROR)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) { if (s->cbs) {
@ -1111,8 +1108,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error);
} }
} else if (stream_op->on_complete && } else if (stream_op->on_complete &&
op_can_be_run(stream_op, stream_state, &oas->state, op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
OP_ON_COMPLETE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) { if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_closure_sched(exec_ctx, stream_op->on_complete, grpc_closure_sched(exec_ctx, stream_op->on_complete,
@ -1176,10 +1172,12 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
sizeof(s->state.state_callback_received)); sizeof(s->state.state_callback_received));
s->state.fail_state = s->state.flush_read = false; s->state.fail_state = s->state.flush_read = false;
s->state.cancel_error = NULL; s->state.cancel_error = NULL;
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false;
#endif
s->state.unprocessed_send_message = false; s->state.unprocessed_send_message = false;
s->curr_gs = gs;
s->curr_ct = (grpc_cronet_transport *)gt;
gpr_mu_init(&s->mu); gpr_mu_init(&s->mu);
return 0; return 0;
} }
@ -1195,8 +1193,6 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) { grpc_stream *gs, grpc_transport_stream_op *op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op"); CRONET_LOG(GPR_DEBUG, "perform_stream_op");
stream_obj *s = (stream_obj *)gs; stream_obj *s = (stream_obj *)gs;
s->curr_gs = gs;
memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport));
add_to_storage(s, op); add_to_storage(s, op);
if (op->send_initial_metadata && if (op->send_initial_metadata &&
header_has_authority(op->send_initial_metadata->list.head)) { header_has_authority(op->send_initial_metadata->list.head)) {
@ -1244,14 +1240,55 @@ static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx,
static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {} grpc_transport_op *op) {}
const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), static const grpc_transport_vtable grpc_cronet_vtable = {
"cronet_http", sizeof(stream_obj),
init_stream, "cronet_http",
set_pollset_do_nothing, init_stream,
set_pollset_set_do_nothing, set_pollset_do_nothing,
perform_stream_op, set_pollset_set_do_nothing,
perform_op, perform_stream_op,
destroy_stream, perform_op,
destroy_transport, destroy_stream,
get_peer, destroy_transport,
get_endpoint}; get_peer,
get_endpoint};
grpc_transport *grpc_create_cronet_transport(void *engine, const char *target,
const grpc_channel_args *args,
void *reserved) {
grpc_cronet_transport *ct = gpr_malloc(sizeof(grpc_cronet_transport));
if (!ct) {
goto error;
}
ct->base.vtable = &grpc_cronet_vtable;
ct->engine = engine;
ct->host = gpr_malloc(strlen(target) + 1);
if (!ct->host) {
goto error;
}
strcpy(ct->host, target);
ct->use_packet_coalescing = true;
for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
if (args->args[i].type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
GRPC_ARG_USE_CRONET_PACKET_COALESCING);
} else {
ct->use_packet_coalescing = (args->args[i].value.integer != 0);
}
}
}
return &ct->base;
error:
if (ct) {
if (ct->host) {
gpr_free(ct->host);
}
gpr_free(ct);
}
return NULL;
}

@ -0,0 +1,43 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H
#define GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H
#include "src/core/lib/transport/transport.h"
grpc_transport *grpc_create_cronet_transport(void *engine, const char *target,
const grpc_channel_args *args,
void *reserved);
#endif /* GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H */

@ -269,7 +269,12 @@ unsigned int parse_h2_length(const char *field) {
grpc_completion_queue_destroy(cq); grpc_completion_queue_destroy(cq);
} }
- (void)testPacketCoalescing { - (void)PacketCoalescing:(bool)use_coalescing {
grpc_arg arg;
arg.key = GRPC_ARG_USE_CRONET_PACKET_COALESCING;
arg.type = GRPC_ARG_INTEGER;
arg.value.integer = use_coalescing ? 1:0;
grpc_channel_args *args = grpc_channel_args_copy_and_add(NULL, &arg, 1);
grpc_call *c; grpc_call *c;
grpc_slice request_payload_slice = grpc_slice request_payload_slice =
grpc_slice_from_copied_string("hello world"); grpc_slice_from_copied_string("hello world");
@ -285,8 +290,8 @@ unsigned int parse_h2_length(const char *field) {
gpr_join_host_port(&addr, "127.0.0.1", port); gpr_join_host_port(&addr, "127.0.0.1", port);
grpc_completion_queue *cq = grpc_completion_queue_create(NULL); grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
stream_engine *cronetEngine = [Cronet getGlobalEngine]; stream_engine *cronetEngine = [Cronet getGlobalEngine];
grpc_channel *client = grpc_cronet_secure_channel_create(cronetEngine, addr, grpc_channel *client =
NULL, NULL); grpc_cronet_secure_channel_create(cronetEngine, addr, args, NULL);
cq_verifier *cqv = cq_verifier_create(cq); cq_verifier *cqv = cq_verifier_create(cq);
grpc_op ops[6]; grpc_op ops[6];
@ -379,7 +384,7 @@ unsigned int parse_h2_length(const char *field) {
long len; long len;
bool coalesced = false; bool coalesced = false;
while ((len = SSL_read(ssl, buf, sizeof(buf))) > 0) { while ((len = SSL_read(ssl, buf, sizeof(buf))) > 0) {
NSLog(@"Read len: %ld", len); gpr_log(GPR_DEBUG, "Read len: %ld", len);
// Analyze the HTTP/2 frames in the same TLS PDU to identify if // Analyze the HTTP/2 frames in the same TLS PDU to identify if
// coalescing is successful // coalescing is successful
@ -404,7 +409,7 @@ unsigned int parse_h2_length(const char *field) {
} }
} }
XCTAssert(coalesced); XCTAssert(coalesced == use_coalescing);
SSL_free(ssl); SSL_free(ssl);
SSL_CTX_free(ctx); SSL_CTX_free(ctx);
close(s); close(s);
@ -433,4 +438,9 @@ unsigned int parse_h2_length(const char *field) {
grpc_completion_queue_destroy(cq); grpc_completion_queue_destroy(cq);
} }
- (void)testPacketCoalescing {
[self PacketCoalescing:true];
[self PacketCoalescing:false];
}
@end @end

@ -97,7 +97,6 @@ post_install do |installer|
# GPR_UNREACHABLE_CODE causes "Control may reach end of non-void # GPR_UNREACHABLE_CODE causes "Control may reach end of non-void
# function" warning # function" warning
config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO' config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO'
config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1'
end end
end end

@ -161,7 +161,8 @@
s.subspec 'Cronet-Interface' do |ss| s.subspec 'Cronet-Interface' do |ss|
ss.header_mappings_dir = 'include/grpc' ss.header_mappings_dir = 'include/grpc'
ss.source_files = 'include/grpc/grpc_cronet.h' ss.source_files = 'include/grpc/grpc_cronet.h',
'src/core/ext/transport/cronet/transport/cronet_transport.h'
end end
s.subspec 'Cronet-Implementation' do |ss| s.subspec 'Cronet-Implementation' do |ss|

@ -7873,6 +7873,7 @@
"include/grpc/grpc_cronet.h", "include/grpc/grpc_cronet.h",
"include/grpc/grpc_security.h", "include/grpc/grpc_security.h",
"include/grpc/grpc_security_constants.h", "include/grpc/grpc_security_constants.h",
"src/core/ext/transport/cronet/transport/cronet_transport.h",
"third_party/Cronet/bidirectional_stream_c.h" "third_party/Cronet/bidirectional_stream_c.h"
], ],
"is_filegroup": true, "is_filegroup": true,
@ -7884,7 +7885,8 @@
"include/grpc/grpc_security_constants.h", "include/grpc/grpc_security_constants.h",
"src/core/ext/transport/cronet/client/secure/cronet_channel_create.c", "src/core/ext/transport/cronet/client/secure/cronet_channel_create.c",
"src/core/ext/transport/cronet/transport/cronet_api_dummy.c", "src/core/ext/transport/cronet/transport/cronet_api_dummy.c",
"src/core/ext/transport/cronet/transport/cronet_transport.c" "src/core/ext/transport/cronet/transport/cronet_transport.c",
"src/core/ext/transport/cronet/transport/cronet_transport.h"
], ],
"third_party": false, "third_party": false,
"type": "filegroup" "type": "filegroup"

Loading…
Cancel
Save