Merge github.com:grpc/grpc into bm_call_create

pull/9737/head
Craig Tiller 8 years ago
commit a8fd05745b
  1. 1
      .gitignore
  2. 5
      .pylintrc
  3. 22
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  4. 54
      src/core/ext/transport/chttp2/transport/internal.h
  5. 20
      src/core/ext/transport/chttp2/transport/parsing.c
  6. 18
      src/core/ext/transport/chttp2/transport/writing.c
  7. 11
      src/core/lib/iomgr/ev_epoll_linux.c
  8. 11
      src/core/lib/iomgr/ev_poll_posix.c
  9. 4
      src/core/lib/iomgr/ev_posix.c
  10. 1
      src/core/lib/iomgr/ev_posix.h
  11. 3
      src/core/lib/iomgr/pollset.h
  12. 5
      src/core/lib/iomgr/pollset_uv.c
  13. 10
      src/core/lib/iomgr/pollset_windows.c
  14. 6
      src/core/lib/surface/channel.c
  15. 46
      src/core/lib/surface/completion_queue.c
  16. 3
      src/core/lib/surface/completion_queue.h
  17. 2
      src/core/lib/surface/init.c
  18. 7
      src/python/grpcio/grpc/_auth.py
  19. 45
      src/python/grpcio/grpc/_common.py
  20. 1
      test/core/internal_api_canaries/iomgr.c
  21. 2
      tools/distrib/yapf_code.sh
  22. 41
      tools/internal_ci/linux/grpc_interop.cfg
  23. 42
      tools/internal_ci/linux/grpc_interop.sh
  24. 6
      tools/run_tests/run_microbenchmark.py

1
.gitignore vendored

@ -9,6 +9,7 @@ objs
cython_debug/ cython_debug/
python_build/ python_build/
python_format_venv/ python_format_venv/
python_pylint_venv/
.coverage* .coverage*
.eggs .eggs
htmlcov/ htmlcov/

@ -29,10 +29,7 @@
#TODO: Enable too-many-return-statements #TODO: Enable too-many-return-statements
#TODO: Enable too-many-nested-blocks #TODO: Enable too-many-nested-blocks
#TODO: Enable super-init-not-called #TODO: Enable super-init-not-called
#TODO: Enable simplifiable-if-statement
#TODO: Enable no-self-use #TODO: Enable no-self-use
#TODO: Enable no-member #TODO: Enable no-member
#TODO: Enable logging-format-interpolation
#TODO: Enable dangerous-default-value
disable=missing-docstring,too-few-public-methods,too-many-arguments,no-init,duplicate-code,invalid-name,suppressed-message,locally-disabled,protected-access,no-name-in-module,unused-argument,fixme,wrong-import-order,no-value-for-parameter,cyclic-import,unused-variable,redefined-outer-name,unused-import,too-many-instance-attributes,broad-except,too-many-locals,too-many-lines,redefined-variable-type,next-method-called,import-error,useless-else-on-loop,too-many-return-statements,too-many-nested-blocks,super-init-not-called,simplifiable-if-statement,no-self-use,no-member,logging-format-interpolation,dangerous-default-value disable=missing-docstring,too-few-public-methods,too-many-arguments,no-init,duplicate-code,invalid-name,suppressed-message,locally-disabled,protected-access,no-name-in-module,unused-argument,fixme,wrong-import-order,no-value-for-parameter,cyclic-import,unused-variable,redefined-outer-name,unused-import,too-many-instance-attributes,broad-except,too-many-locals,too-many-lines,redefined-variable-type,next-method-called,import-error,useless-else-on-loop,too-many-return-statements,too-many-nested-blocks,super-init-not-called,no-self-use,no-member

@ -265,7 +265,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
.gain_d = 0, .gain_d = 0,
.initial_control_value = log2(DEFAULT_WINDOW), .initial_control_value = log2(DEFAULT_WINDOW),
.min_control_value = -1, .min_control_value = -1,
.max_control_value = 22, .max_control_value = 25,
.integral_range = 10}); .integral_range = 10});
grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_goaway_parser_init(&t->goaway_parser);
@ -569,6 +569,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->write_closed_error);
if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
"destroy", t, s, s->incoming_window_delta);
} else if (s->incoming_window_delta < 0) {
GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA(
"destroy", t, s, -s->incoming_window_delta);
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
GPR_TIMER_END("destroy_stream", 0); GPR_TIMER_END("destroy_stream", 0);
@ -1801,13 +1809,13 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) {
return; return;
} }
if (grpc_bdp_estimator_trace) {
gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string,
(int)bdp);
}
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp); push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp);
} }
/*******************************************************************************
* INPUT PROCESSING - PARSING
*/
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) { grpc_chttp2_transport *t) {
grpc_http_parser parser; grpc_http_parser parser;
@ -2054,8 +2062,8 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
(int64_t)have_already) { (int64_t)have_already) {
write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
} }
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta, GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA("op", t, s,
add_max_recv_bytes); add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
add_max_recv_bytes); add_max_recv_bytes);
if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size - if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size -

@ -271,10 +271,6 @@ struct grpc_chttp2_transport {
/** data to write next write */ /** data to write next write */
grpc_slice_buffer qbuf; grpc_slice_buffer qbuf;
/** window available to announce to peer */
int64_t announce_incoming_window;
/** how much window would we like to have for incoming_window */
uint32_t connection_window_target;
/** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
*/ */
uint32_t write_buffer_size; uint32_t write_buffer_size;
@ -328,6 +324,16 @@ struct grpc_chttp2_transport {
/** window available for peer to send to us */ /** window available for peer to send to us */
int64_t incoming_window; int64_t incoming_window;
/** calculating what we should give for incoming window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
has an outstanding read)
and the total amount of flow control under initial window size across all
streams: this is data we've read early
we want to adjust incoming_window such that:
incoming_window = total_over - max(bdp - total_under, 0) */
int64_t stream_total_over_incoming_window;
int64_t stream_total_under_incoming_window;
/* deframing */ /* deframing */
grpc_chttp2_deframe_transport_state deframe_state; grpc_chttp2_deframe_transport_state deframe_state;
@ -634,6 +640,44 @@ typedef enum {
GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
amount) amount)
#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE( \
phase, transport, dst_context) \
if (dst_context->incoming_window_delta < 0) { \
transport->stream_total_under_incoming_window += \
dst_context->incoming_window_delta; \
} else if (dst_context->incoming_window_delta > 0) { \
transport->stream_total_over_incoming_window -= \
dst_context->incoming_window_delta; \
}
#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE( \
phase, transport, dst_context) \
if (dst_context->incoming_window_delta < 0) { \
transport->stream_total_under_incoming_window -= \
dst_context->incoming_window_delta; \
} else if (dst_context->incoming_window_delta > 0) { \
transport->stream_total_over_incoming_window += \
dst_context->incoming_window_delta; \
}
#define GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( \
phase, transport, dst_context, amount) \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
dst_context); \
GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, \
incoming_window_delta, amount); \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
dst_context);
#define GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( \
phase, transport, dst_context, amount) \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
dst_context); \
GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, \
incoming_window_delta, amount); \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
dst_context);
#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \ #define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \
dst_var, amount) \ dst_var, amount) \
do { \ do { \
@ -752,4 +796,6 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error); grpc_chttp2_stream *s, grpc_error *error);
uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */

@ -376,15 +376,6 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
return err; return err;
} }
uint32_t target_incoming_window = GPR_MAX(
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
1024);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
if (t->incoming_window <= target_incoming_window / 2) {
grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
}
if (s != NULL) { if (s != NULL) {
if (incoming_frame_size > if (incoming_frame_size >
s->incoming_window_delta + s->incoming_window_delta +
@ -402,8 +393,8 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
return err; return err;
} }
GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta, GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s,
incoming_frame_size); incoming_frame_size);
if ((int64_t)t->settings[GRPC_SENT_SETTINGS] if ((int64_t)t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] +
(int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <=
@ -417,6 +408,13 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
s->received_bytes += incoming_frame_size; s->received_bytes += incoming_frame_size;
} }
uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
if (t->incoming_window <= target_incoming_window / 2) {
grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
}
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }

@ -152,6 +152,17 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
return true; return true;
} }
uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) {
return (uint32_t)GPR_MAX(
(int64_t)((1u << 31) - 1),
t->stream_total_over_incoming_window +
(int64_t)GPR_MAX(
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
t->stream_total_under_incoming_window,
0));
}
bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) { grpc_chttp2_transport *t) {
grpc_chttp2_stream *s; grpc_chttp2_stream *s;
@ -310,13 +321,12 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
/* if the grpc_chttp2_transport is ready to send a window update, do so here /* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */ also; 3/4 is a magic number that will likely get tuned soon */
uint32_t target_incoming_window = GPR_MAX( uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
1024);
uint32_t threshold_to_send_transport_window_update = uint32_t threshold_to_send_transport_window_update =
t->outbuf.count > 0 ? 3 * target_incoming_window / 4 t->outbuf.count > 0 ? 3 * target_incoming_window / 4
: target_incoming_window / 2; : target_incoming_window / 2;
if (t->incoming_window <= threshold_to_send_transport_window_update) { if (t->incoming_window <= threshold_to_send_transport_window_update &&
t->incoming_window != target_incoming_window) {
maybe_initiate_ping(exec_ctx, t, maybe_initiate_ping(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
uint32_t announced = (uint32_t)GPR_CLAMP( uint32_t announced = (uint32_t)GPR_CLAMP(

@ -1405,16 +1405,6 @@ static void pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu); gpr_mu_destroy(&pollset->po.mu);
} }
static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(!pollset_has_workers(pollset));
pollset->shutting_down = false;
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
GPR_ASSERT(pollset->po.pi == NULL);
}
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
polling_island *pi) { polling_island *pi) {
if (gpr_mu_trylock(&pi->workqueue_read_mu)) { if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
@ -1958,7 +1948,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_init = pollset_init, .pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown, .pollset_shutdown = pollset_shutdown,
.pollset_reset = pollset_reset,
.pollset_destroy = pollset_destroy, .pollset_destroy = pollset_destroy,
.pollset_work = pollset_work, .pollset_work = pollset_work,
.pollset_kick = pollset_kick, .pollset_kick = pollset_kick,

@ -815,16 +815,6 @@ static void pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu); gpr_mu_destroy(&pollset->mu);
} }
static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(!pollset_has_workers(pollset));
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
GPR_ASSERT(pollset->fd_count == 0);
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
pollset->kicked_without_pollers = 0;
}
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) { grpc_fd *fd) {
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
@ -1514,7 +1504,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_init = pollset_init, .pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown, .pollset_shutdown = pollset_shutdown,
.pollset_reset = pollset_reset,
.pollset_destroy = pollset_destroy, .pollset_destroy = pollset_destroy,
.pollset_work = pollset_work, .pollset_work = pollset_work,
.pollset_kick = pollset_kick, .pollset_kick = pollset_kick,

@ -191,10 +191,6 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
g_event_engine->pollset_shutdown(exec_ctx, pollset, closure); g_event_engine->pollset_shutdown(exec_ctx, pollset, closure);
} }
void grpc_pollset_reset(grpc_pollset *pollset) {
g_event_engine->pollset_reset(pollset);
}
void grpc_pollset_destroy(grpc_pollset *pollset) { void grpc_pollset_destroy(grpc_pollset *pollset) {
g_event_engine->pollset_destroy(pollset); g_event_engine->pollset_destroy(pollset);
} }

@ -64,7 +64,6 @@ typedef struct grpc_event_engine_vtable {
void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu); void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu);
void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure); grpc_closure *closure);
void (*pollset_reset)(grpc_pollset *pollset);
void (*pollset_destroy)(grpc_pollset *pollset); void (*pollset_destroy)(grpc_pollset *pollset);
grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now, grpc_pollset_worker **worker, gpr_timespec now,

@ -58,9 +58,6 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu);
* pollset's mutex must be held */ * pollset's mutex must be held */
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure); grpc_closure *closure);
/** Reset the pollset to its initial state (perhaps with some cached objects);
* must have been previously shutdown */
void grpc_pollset_reset(grpc_pollset *pollset);
void grpc_pollset_destroy(grpc_pollset *pollset); void grpc_pollset_destroy(grpc_pollset *pollset);
/* Do some work on a pollset. /* Do some work on a pollset.

@ -97,11 +97,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
} }
} }
void grpc_pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
pollset->shutting_down = 0;
}
static void timer_run_cb(uv_timer_t *timer) {} static void timer_run_cb(uv_timer_t *timer) {}
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,

@ -117,16 +117,6 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
void grpc_pollset_destroy(grpc_pollset *pollset) {} void grpc_pollset_destroy(grpc_pollset *pollset) {}
void grpc_pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(
!has_workers(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET));
pollset->shutting_down = 0;
pollset->is_iocp_worker = 0;
pollset->kicked_without_pollers = 0;
pollset->on_shutdown = NULL;
}
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) { gpr_timespec now, gpr_timespec deadline) {

@ -128,7 +128,8 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
} }
channel->default_authority = grpc_mdelem_from_slices( channel->default_authority = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY, exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_from_copied_string(args->args[i].value.string)); grpc_slice_intern(
grpc_slice_from_static_string(args->args[i].value.string)));
} }
} else if (0 == } else if (0 ==
strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) { strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
@ -144,7 +145,8 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
} else { } else {
channel->default_authority = grpc_mdelem_from_slices( channel->default_authority = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY, exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_from_copied_string(args->args[i].value.string)); grpc_slice_intern(
grpc_slice_from_static_string(args->args[i].value.string)));
} }
} }
} else if (0 == strcmp(args->args[i].key, } else if (0 == strcmp(args->args[i].key,

@ -96,9 +96,6 @@ struct grpc_completion_queue {
#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) #define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1)
static gpr_mu g_freelist_mu;
static grpc_completion_queue *g_freelist;
int grpc_cq_pluck_trace; int grpc_cq_pluck_trace;
int grpc_cq_event_timeout_trace; int grpc_cq_event_timeout_trace;
@ -113,21 +110,6 @@ int grpc_cq_event_timeout_trace;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
grpc_error *error); grpc_error *error);
void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); }
void grpc_cq_global_shutdown(void) {
gpr_mu_destroy(&g_freelist_mu);
while (g_freelist) {
grpc_completion_queue *next = g_freelist->next_free;
grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist));
#ifndef NDEBUG
gpr_free(g_freelist->outstanding_tags);
#endif
gpr_free(g_freelist);
g_freelist = next;
}
}
grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc; grpc_completion_queue *cc;
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
@ -136,22 +118,12 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved)); GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
gpr_mu_lock(&g_freelist_mu); cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
if (g_freelist == NULL) { grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
gpr_mu_unlock(&g_freelist_mu);
cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
#ifndef NDEBUG #ifndef NDEBUG
cc->outstanding_tags = NULL; cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0; cc->outstanding_tag_capacity = 0;
#endif #endif
} else {
cc = g_freelist;
g_freelist = g_freelist->next_free;
gpr_mu_unlock(&g_freelist_mu);
/* pollset already initialized */
}
/* Initial ref is dropped by grpc_completion_queue_shutdown */ /* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1); gpr_ref_init(&cc->pending_events, 1);
@ -203,11 +175,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif #endif
if (gpr_unref(&cc->owning_refs)) { if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
grpc_pollset_reset(POLLSET_FROM_CQ(cc)); grpc_pollset_destroy(POLLSET_FROM_CQ(cc));
gpr_mu_lock(&g_freelist_mu); #ifndef NDEBUG
cc->next_free = g_freelist; gpr_free(cc->outstanding_tags);
g_freelist = cc; #endif
gpr_mu_unlock(&g_freelist_mu); gpr_free(cc);
} }
} }

@ -99,7 +99,4 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc);
void grpc_cq_global_init(void);
void grpc_cq_global_shutdown(void);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */

@ -209,7 +209,6 @@ void grpc_init(void) {
grpc_iomgr_init(); grpc_iomgr_init();
grpc_executor_init(); grpc_executor_init();
gpr_timers_global_init(); gpr_timers_global_init();
grpc_cq_global_init();
grpc_handshaker_factory_registry_init(); grpc_handshaker_factory_registry_init();
grpc_security_init(); grpc_security_init();
for (i = 0; i < g_number_of_plugins; i++) { for (i = 0; i < g_number_of_plugins; i++) {
@ -236,7 +235,6 @@ void grpc_shutdown(void) {
gpr_mu_lock(&g_init_mu); gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) { if (--g_initializations == 0) {
grpc_executor_shutdown(&exec_ctx); grpc_executor_shutdown(&exec_ctx);
grpc_cq_global_shutdown();
grpc_iomgr_shutdown(&exec_ctx); grpc_iomgr_shutdown(&exec_ctx);
gpr_timers_global_destroy(); gpr_timers_global_destroy();
grpc_tracer_shutdown(); grpc_tracer_shutdown();

@ -48,11 +48,8 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin):
# Hack to determine if these are JWT creds and we need to pass # Hack to determine if these are JWT creds and we need to pass
# additional_claims when getting a token # additional_claims when getting a token
if 'additional_claims' in inspect.getargspec( self._is_jwt = 'additional_claims' in inspect.getargspec(
credentials.get_access_token).args: credentials.get_access_token).args
self._is_jwt = True
else:
self._is_jwt = False
def __call__(self, context, callback): def __call__(self, context, callback):
# MetadataPlugins cannot block (see grpc.beta.interfaces.py) # MetadataPlugins cannot block (see grpc.beta.interfaces.py)

@ -92,7 +92,7 @@ def decode(b):
try: try:
return b.decode('utf8') return b.decode('utf8')
except UnicodeDecodeError: except UnicodeDecodeError:
logging.exception('Invalid encoding on {}'.format(b)) logging.exception('Invalid encoding on %s', b)
return b.decode('latin1') return b.decode('latin1')
@ -148,36 +148,23 @@ def fully_qualified_method(group, method):
class CleanupThread(threading.Thread): class CleanupThread(threading.Thread):
"""A threading.Thread subclass supporting custom behavior on join(). """A threading.Thread subclass supporting custom behavior on join().
On Python Interpreter exit, Python will attempt to join outstanding threads On Python Interpreter exit, Python will attempt to join outstanding threads
prior to garbage collection. We may need to do additional cleanup, and prior to garbage collection. We may need to do additional cleanup, and
we accomplish this by overriding the join() method. we accomplish this by overriding the join() method.
""" """
def __init__(self, def __init__(self, behavior, *args, **kwargs):
behavior,
group=None,
target=None,
name=None,
args=(),
kwargs={}):
"""Constructor. """Constructor.
Args: Args:
behavior (function): Function called on join() with a single behavior (function): Function called on join() with a single
argument, timeout, indicating the maximum duration of argument, timeout, indicating the maximum duration of
`behavior`, or None indicating `behavior` has no deadline. `behavior`, or None indicating `behavior` has no deadline.
`behavior` must be idempotent. `behavior` must be idempotent.
group (None): should be None. Reseved for future extensions args: Positional arguments passed to threading.Thread constructor.
when ThreadGroup is implemented. kwargs: Keyword arguments passed to threading.Thread constructor.
target (function): The function to invoke when this thread is """
run. Defaults to None. super(CleanupThread, self).__init__(*args, **kwargs)
name (str): The name of this thread. Defaults to None.
args (tuple[object]): A tuple of arguments to pass to `target`.
kwargs (dict[str,object]): A dictionary of keyword arguments to
pass to `target`.
"""
super(CleanupThread, self).__init__(
group=group, target=target, name=name, args=args, kwargs=kwargs)
self._behavior = behavior self._behavior = behavior
def join(self, timeout=None): def join(self, timeout=None):

@ -105,7 +105,6 @@ static void test_code(void) {
grpc_pollset_size(); grpc_pollset_size();
grpc_pollset_init(NULL, NULL); grpc_pollset_init(NULL, NULL);
grpc_pollset_shutdown(NULL, NULL, NULL); grpc_pollset_shutdown(NULL, NULL, NULL);
grpc_pollset_reset(NULL);
grpc_pollset_destroy(NULL); grpc_pollset_destroy(NULL);
GRPC_ERROR_UNREF(grpc_pollset_work(NULL, NULL, NULL, GRPC_ERROR_UNREF(grpc_pollset_work(NULL, NULL, NULL,
gpr_now(GPR_CLOCK_REALTIME), gpr_now(GPR_CLOCK_REALTIME),

@ -53,7 +53,7 @@ for dir in $DIRS; do
tempdir=`mktemp -d` tempdir=`mktemp -d`
cp -RT $dir $tempdir cp -RT $dir $tempdir
$PYTHON -m yapf -i -r -p $exclusion_args $dir $PYTHON -m yapf -i -r -p $exclusion_args $dir
if ! diff -rq $dir $tempdir; then if ! diff -r $dir $tempdir; then
script_result=1 script_result=1
fi fi
rm -rf $tempdir rm -rf $tempdir

@ -0,0 +1,41 @@
#!/bin/bash
# Copyright 2017, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/linux/grpc_interop.sh"
# grpc_interop tests can take 6+ hours to complete.
timeout_mins: 480
action {
define_artifacts {
regex: "**/sponge_log.xml"
}
}

@ -0,0 +1,42 @@
#!/usr/bin/env bash
# Copyright 2017, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set -ex
export LANG=en_US.UTF-8
# Enter the gRPC repo root
cd $(dirname $0)/../../..
git submodule update --init
tools/run_tests/run_interop_tests.py -l all -s all --cloud_to_prod --cloud_to_prod_auth --use_docker --http2_interop -t -j 12 $@ || FAILED="true"
tools/run_tests/run_interop_tests.py -l java --use_docker --http2_badserver_interop $@ || FAILED="true"
tools/run_tests/run_interop_tests.py -l python --use_docker --http2_badserver_interop $@ || FAILED="true"

@ -28,6 +28,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import cgi
import multiprocessing import multiprocessing
import os import os
import subprocess import subprocess
@ -71,11 +72,12 @@ def heading(name):
def link(txt, tgt): def link(txt, tgt):
global index_html global index_html
index_html += "<p><a href=\"%s\">%s</a></p>\n" % (tgt, txt) index_html += "<p><a href=\"%s\">%s</a></p>\n" % (
cgi.escape(tgt, quote=True), cgi.escape(txt))
def text(txt): def text(txt):
global index_html global index_html
index_html += "<p><pre>%s</pre></p>\n" % txt index_html += "<p><pre>%s</pre></p>\n" % cgi.escape(txt)
def collect_latency(bm_name, args): def collect_latency(bm_name, args):
"""generate latency profiles""" """generate latency profiles"""

Loading…
Cancel
Save