Merge github.com:grpc/grpc into bm_closure

pull/9693/head
Craig Tiller 8 years ago
commit d3b8cd6617
  1. 22
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 54
      src/core/ext/transport/chttp2/transport/internal.h
  3. 20
      src/core/ext/transport/chttp2/transport/parsing.c
  4. 18
      src/core/ext/transport/chttp2/transport/writing.c
  5. 15
      src/core/lib/channel/http_server_filter.c
  6. 11
      src/core/lib/iomgr/ev_epoll_linux.c
  7. 11
      src/core/lib/iomgr/ev_poll_posix.c
  8. 4
      src/core/lib/iomgr/ev_posix.c
  9. 1
      src/core/lib/iomgr/ev_posix.h
  10. 3
      src/core/lib/iomgr/pollset.h
  11. 5
      src/core/lib/iomgr/pollset_uv.c
  12. 10
      src/core/lib/iomgr/pollset_windows.c
  13. 6
      src/core/lib/surface/channel.c
  14. 46
      src/core/lib/surface/completion_queue.c
  15. 3
      src/core/lib/surface/completion_queue.h
  16. 2
      src/core/lib/surface/init.c
  17. 32
      src/python/grpcio/grpc/_channel.py
  18. 1
      test/core/internal_api_canaries/iomgr.c
  19. 1
      test/cpp/microbenchmarks/bm_fullstack.cc
  20. 41
      tools/internal_ci/linux/grpc_interop.cfg
  21. 42
      tools/internal_ci/linux/grpc_interop.sh
  22. 1
      tools/run_tests/python_utils/start_port_server.py
  23. 52
      tools/run_tests/run_microbenchmark.py

@ -265,7 +265,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
.gain_d = 0,
.initial_control_value = log2(DEFAULT_WINDOW),
.min_control_value = -1,
.max_control_value = 22,
.max_control_value = 25,
.integral_range = 10});
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
@ -569,6 +569,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
"destroy", t, s, s->incoming_window_delta);
} else if (s->incoming_window_delta < 0) {
GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA(
"destroy", t, s, -s->incoming_window_delta);
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
GPR_TIMER_END("destroy_stream", 0);
@ -1801,13 +1809,13 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) {
return;
}
if (grpc_bdp_estimator_trace) {
gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string,
(int)bdp);
}
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp);
}
/*******************************************************************************
* INPUT PROCESSING - PARSING
*/
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_http_parser parser;
@ -2054,8 +2062,8 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
(int64_t)have_already) {
write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
}
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA("op", t, s,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
add_max_recv_bytes);
if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size -

@ -271,10 +271,6 @@ struct grpc_chttp2_transport {
/** data to write next write */
grpc_slice_buffer qbuf;
/** window available to announce to peer */
int64_t announce_incoming_window;
/** how much window would we like to have for incoming_window */
uint32_t connection_window_target;
/** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
*/
uint32_t write_buffer_size;
@ -328,6 +324,16 @@ struct grpc_chttp2_transport {
/** window available for peer to send to us */
int64_t incoming_window;
/** calculating what we should give for incoming window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
has an outstanding read)
and the total amount of flow control under initial window size across all
streams: this is data we've read early
we want to adjust incoming_window such that:
incoming_window = total_over - max(bdp - total_under, 0) */
int64_t stream_total_over_incoming_window;
int64_t stream_total_under_incoming_window;
/* deframing */
grpc_chttp2_deframe_transport_state deframe_state;
@ -634,6 +640,44 @@ typedef enum {
GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
amount)
#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE( \
phase, transport, dst_context) \
if (dst_context->incoming_window_delta < 0) { \
transport->stream_total_under_incoming_window += \
dst_context->incoming_window_delta; \
} else if (dst_context->incoming_window_delta > 0) { \
transport->stream_total_over_incoming_window -= \
dst_context->incoming_window_delta; \
}
#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE( \
phase, transport, dst_context) \
if (dst_context->incoming_window_delta < 0) { \
transport->stream_total_under_incoming_window -= \
dst_context->incoming_window_delta; \
} else if (dst_context->incoming_window_delta > 0) { \
transport->stream_total_over_incoming_window += \
dst_context->incoming_window_delta; \
}
#define GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( \
phase, transport, dst_context, amount) \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
dst_context); \
GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, \
incoming_window_delta, amount); \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
dst_context);
#define GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( \
phase, transport, dst_context, amount) \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
dst_context); \
GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, \
incoming_window_delta, amount); \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
dst_context);
#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \
dst_var, amount) \
do { \
@ -752,4 +796,6 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */

@ -376,15 +376,6 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
return err;
}
uint32_t target_incoming_window = GPR_MAX(
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
1024);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
if (t->incoming_window <= target_incoming_window / 2) {
grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
}
if (s != NULL) {
if (incoming_frame_size >
s->incoming_window_delta +
@ -402,8 +393,8 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
return err;
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta,
incoming_frame_size);
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s,
incoming_frame_size);
if ((int64_t)t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] +
(int64_t)s->incoming_window_delta - (int64_t)s->announce_window <=
@ -417,6 +408,13 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
s->received_bytes += incoming_frame_size;
}
uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
if (t->incoming_window <= target_incoming_window / 2) {
grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
}
return GRPC_ERROR_NONE;
}

@ -152,6 +152,17 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
return true;
}
uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) {
return (uint32_t)GPR_MAX(
(int64_t)((1u << 31) - 1),
t->stream_total_over_incoming_window +
(int64_t)GPR_MAX(
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
t->stream_total_under_incoming_window,
0));
}
bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
@ -310,13 +321,12 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
uint32_t target_incoming_window = GPR_MAX(
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
1024);
uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
uint32_t threshold_to_send_transport_window_update =
t->outbuf.count > 0 ? 3 * target_incoming_window / 4
: target_incoming_window / 2;
if (t->incoming_window <= threshold_to_send_transport_window_update) {
if (t->incoming_window <= threshold_to_send_transport_window_update &&
t->incoming_window != target_incoming_window) {
maybe_initiate_ping(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
uint32_t announced = (uint32_t)GPR_CLAMP(

@ -198,14 +198,17 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_STR_KEY, ":path"));
}
if (b->idx.named.host != NULL) {
if (b->idx.named.host != NULL && b->idx.named.authority == NULL) {
grpc_linked_mdelem *el = b->idx.named.host;
grpc_mdelem md = GRPC_MDELEM_REF(el->md);
grpc_metadata_batch_remove(exec_ctx, b, el);
add_error(
error_name, &error,
grpc_metadata_batch_substitute(
exec_ctx, b, b->idx.named.host,
grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.host->md)))));
grpc_metadata_batch_add_head(
exec_ctx, b, el, grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_ref_internal(GRPC_MDVALUE(md)))));
GRPC_MDELEM_UNREF(exec_ctx, md);
}
if (b->idx.named.authority == NULL) {

@ -1405,16 +1405,6 @@ static void pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu);
}
static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(!pollset_has_workers(pollset));
pollset->shutting_down = false;
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
GPR_ASSERT(pollset->po.pi == NULL);
}
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
polling_island *pi) {
if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
@ -1958,7 +1948,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
.pollset_reset = pollset_reset,
.pollset_destroy = pollset_destroy,
.pollset_work = pollset_work,
.pollset_kick = pollset_kick,

@ -815,16 +815,6 @@ static void pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(!pollset_has_workers(pollset));
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
GPR_ASSERT(pollset->fd_count == 0);
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
pollset->kicked_without_pollers = 0;
}
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
@ -1514,7 +1504,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
.pollset_reset = pollset_reset,
.pollset_destroy = pollset_destroy,
.pollset_work = pollset_work,
.pollset_kick = pollset_kick,

@ -191,10 +191,6 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
g_event_engine->pollset_shutdown(exec_ctx, pollset, closure);
}
void grpc_pollset_reset(grpc_pollset *pollset) {
g_event_engine->pollset_reset(pollset);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
g_event_engine->pollset_destroy(pollset);
}

@ -64,7 +64,6 @@ typedef struct grpc_event_engine_vtable {
void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu);
void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
void (*pollset_reset)(grpc_pollset *pollset);
void (*pollset_destroy)(grpc_pollset *pollset);
grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,

@ -58,9 +58,6 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu);
* pollset's mutex must be held */
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
/** Reset the pollset to its initial state (perhaps with some cached objects);
* must have been previously shutdown */
void grpc_pollset_reset(grpc_pollset *pollset);
void grpc_pollset_destroy(grpc_pollset *pollset);
/* Do some work on a pollset.

@ -97,11 +97,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
}
}
void grpc_pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
pollset->shutting_down = 0;
}
static void timer_run_cb(uv_timer_t *timer) {}
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,

@ -117,16 +117,6 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
void grpc_pollset_destroy(grpc_pollset *pollset) {}
void grpc_pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(
!has_workers(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET));
pollset->shutting_down = 0;
pollset->is_iocp_worker = 0;
pollset->kicked_without_pollers = 0;
pollset->on_shutdown = NULL;
}
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {

@ -128,7 +128,8 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
}
channel->default_authority = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_from_copied_string(args->args[i].value.string));
grpc_slice_intern(
grpc_slice_from_static_string(args->args[i].value.string)));
}
} else if (0 ==
strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
@ -144,7 +145,8 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
} else {
channel->default_authority = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_from_copied_string(args->args[i].value.string));
grpc_slice_intern(
grpc_slice_from_static_string(args->args[i].value.string)));
}
}
} else if (0 == strcmp(args->args[i].key,

@ -96,9 +96,6 @@ struct grpc_completion_queue {
#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1)
static gpr_mu g_freelist_mu;
static grpc_completion_queue *g_freelist;
int grpc_cq_pluck_trace;
int grpc_cq_event_timeout_trace;
@ -113,21 +110,6 @@ int grpc_cq_event_timeout_trace;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
grpc_error *error);
void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); }
void grpc_cq_global_shutdown(void) {
gpr_mu_destroy(&g_freelist_mu);
while (g_freelist) {
grpc_completion_queue *next = g_freelist->next_free;
grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist));
#ifndef NDEBUG
gpr_free(g_freelist->outstanding_tags);
#endif
gpr_free(g_freelist);
g_freelist = next;
}
}
grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc;
GPR_ASSERT(!reserved);
@ -136,22 +118,12 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
gpr_mu_lock(&g_freelist_mu);
if (g_freelist == NULL) {
gpr_mu_unlock(&g_freelist_mu);
cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
#ifndef NDEBUG
cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0;
cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0;
#endif
} else {
cc = g_freelist;
g_freelist = g_freelist->next_free;
gpr_mu_unlock(&g_freelist_mu);
/* pollset already initialized */
}
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
@ -203,11 +175,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
grpc_pollset_reset(POLLSET_FROM_CQ(cc));
gpr_mu_lock(&g_freelist_mu);
cc->next_free = g_freelist;
g_freelist = cc;
gpr_mu_unlock(&g_freelist_mu);
grpc_pollset_destroy(POLLSET_FROM_CQ(cc));
#ifndef NDEBUG
gpr_free(cc->outstanding_tags);
#endif
gpr_free(cc);
}
}

@ -99,7 +99,4 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
void grpc_cq_global_init(void);
void grpc_cq_global_shutdown(void);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */

@ -209,7 +209,6 @@ void grpc_init(void) {
grpc_iomgr_init();
grpc_executor_init();
gpr_timers_global_init();
grpc_cq_global_init();
grpc_handshaker_factory_registry_init();
grpc_security_init();
for (i = 0; i < g_number_of_plugins; i++) {
@ -236,7 +235,6 @@ void grpc_shutdown(void) {
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
grpc_executor_shutdown(&exec_ctx);
grpc_cq_global_shutdown();
grpc_iomgr_shutdown(&exec_ctx);
gpr_timers_global_destroy();
grpc_tracer_shutdown();

@ -444,10 +444,10 @@ def _start_unary_request(request, timeout, request_serializer):
return deadline, deadline_timespec, serialized_request, None
def _end_unary_response_blocking(state, with_call, deadline):
def _end_unary_response_blocking(state, call, with_call, deadline):
if state.code is grpc.StatusCode.OK:
if with_call:
rendezvous = _Rendezvous(state, None, None, deadline)
rendezvous = _Rendezvous(state, call, None, deadline)
return state.response, rendezvous
else:
return state.response
@ -499,17 +499,17 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
_check_call_error(call_error, metadata)
_handle_event(completion_queue.poll(), state,
self._response_deserializer)
return state, deadline
return state, call, deadline
def __call__(self, request, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(request, timeout, metadata,
credentials)
return _end_unary_response_blocking(state, False, deadline)
state, call, deadline = self._blocking(request, timeout, metadata,
credentials)
return _end_unary_response_blocking(state, call, False, deadline)
def with_call(self, request, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(request, timeout, metadata,
credentials)
return _end_unary_response_blocking(state, True, deadline)
state, call, deadline = self._blocking(request, timeout, metadata,
credentials)
return _end_unary_response_blocking(state, call, True, deadline)
def future(self, request, timeout=None, metadata=None, credentials=None):
state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
@ -619,25 +619,25 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
state.condition.notify_all()
if not state.due:
break
return state, deadline
return state, call, deadline
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
state, deadline, = self._blocking(request_iterator, timeout, metadata,
credentials)
return _end_unary_response_blocking(state, False, deadline)
state, call, deadline = self._blocking(request_iterator, timeout,
metadata, credentials)
return _end_unary_response_blocking(state, call, False, deadline)
def with_call(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
state, deadline, = self._blocking(request_iterator, timeout, metadata,
credentials)
return _end_unary_response_blocking(state, True, deadline)
state, call, deadline = self._blocking(request_iterator, timeout,
metadata, credentials)
return _end_unary_response_blocking(state, call, True, deadline)
def future(self,
request_iterator,

@ -105,7 +105,6 @@ static void test_code(void) {
grpc_pollset_size();
grpc_pollset_init(NULL, NULL);
grpc_pollset_shutdown(NULL, NULL, NULL);
grpc_pollset_reset(NULL);
grpc_pollset_destroy(NULL);
GRPC_ERROR_UNREF(grpc_pollset_work(NULL, NULL, NULL,
gpr_now(GPR_CLOCK_REALTIME),

@ -698,6 +698,7 @@ static void BM_StreamingPingPongMsgs(benchmark::State& state) {
}
while (state.KeepRunning()) {
GPR_TIMER_SCOPE("BenchmarkCycle", 0);
request_rw->Write(send_request, tag(0)); // Start client send
response_rw.Read(&recv_request, tag(1)); // Start server recv
request_rw->Read(&recv_response, tag(2)); // Start client recv

@ -0,0 +1,41 @@
#!/bin/bash
# Copyright 2017, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/linux/grpc_interop.sh"
# grpc_interop tests can take 6+ hours to complete.
timeout_mins: 480
action {
define_artifacts {
regex: "**/sponge_log.xml"
}
}

@ -0,0 +1,42 @@
#!/usr/bin/env bash
# Copyright 2017, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set -ex
export LANG=en_US.UTF-8
# Enter the gRPC repo root
cd $(dirname $0)/../../..
git submodule update --init
tools/run_tests/run_interop_tests.py -l all -s all --cloud_to_prod --cloud_to_prod_auth --use_docker --http2_interop -t -j 12 $@ || FAILED="true"
tools/run_tests/run_interop_tests.py -l java --use_docker --http2_badserver_interop $@ || FAILED="true"
tools/run_tests/run_interop_tests.py -l python --use_docker --http2_badserver_interop $@ || FAILED="true"

@ -36,6 +36,7 @@ import tempfile
import sys
import time
import jobset
import socket
def start_port_server(port_server_port):
# check if a compatible port server is running

@ -126,23 +126,45 @@ def collect_perf(bm_name, args):
subprocess.check_call(
['make', bm_name,
'CONFIG=mutrace', '-j', '%d' % multiprocessing.cpu_count()])
benchmarks = []
profile_analysis = []
cleanup = []
for line in subprocess.check_output(['bins/mutrace/%s' % bm_name,
'--benchmark_list_tests']).splitlines():
subprocess.check_call(['perf', 'record', '-o', '%s-perf.data' % fnize(line),
'-g', '-c', '1000',
'bins/mutrace/%s' % bm_name,
'--benchmark_filter=^%s$' % line,
'--benchmark_min_time=10'])
env = os.environ.copy()
env.update({
'PERF_BASE_NAME': fnize(line),
'OUTPUT_DIR': 'reports',
'OUTPUT_FILENAME': fnize(line),
})
subprocess.check_call(['tools/run_tests/performance/process_local_perf_flamegraphs.sh'],
env=env)
subprocess.check_call(['rm', '%s-perf.data' % fnize(line)])
subprocess.check_call(['rm', '%s-out.perf' % fnize(line)])
link(line, '%s.svg' % fnize(line))
benchmarks.append(
jobset.JobSpec(['perf', 'record', '-o', '%s-perf.data' % fnize(line),
'-g', '-F', '997',
'bins/mutrace/%s' % bm_name,
'--benchmark_filter=^%s$' % line,
'--benchmark_min_time=10']))
profile_analysis.append(
jobset.JobSpec(['tools/run_tests/performance/process_local_perf_flamegraphs.sh'],
environ = {
'PERF_BASE_NAME': fnize(line),
'OUTPUT_DIR': 'reports',
'OUTPUT_FILENAME': fnize(line),
}))
cleanup.append(jobset.JobSpec(['rm', '%s-perf.data' % fnize(line)]))
cleanup.append(jobset.JobSpec(['rm', '%s-out.perf' % fnize(line)]))
# periodically flush out the list of jobs: temporary space required for this
# processing is large
if len(benchmarks) >= 20:
# run up to half the cpu count: each benchmark can use up to two cores
# (one for the microbenchmark, one for the data flush)
jobset.run(benchmarks, maxjobs=1,
add_env={'GRPC_TEST_PORT_SERVER': 'localhost:%d' % port_server_port})
jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count())
jobset.run(cleanup, maxjobs=multiprocessing.cpu_count())
benchmarks = []
profile_analysis = []
cleanup = []
# run the remaining benchmarks that weren't flushed
if len(benchmarks):
jobset.run(benchmarks, maxjobs=1,
add_env={'GRPC_TEST_PORT_SERVER': 'localhost:%d' % port_server_port})
jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count())
jobset.run(cleanup, maxjobs=multiprocessing.cpu_count())
def collect_summary(bm_name, args):
heading('Summary: %s' % bm_name)

Loading…
Cancel
Save