Merge remote-tracking branch 'upstream/master' into upmerge

pull/5540/head
Masood Malekghassemi 9 years ago
commit 4aaf47469e
  1. 1
      grpc.def
  2. 4
      include/grpc/impl/codegen/sync.h
  3. 16
      src/core/iomgr/iomgr.c
  4. 6
      src/core/iomgr/iomgr_internal.h
  5. 7
      src/core/support/sync.c
  6. 18
      src/core/transport/chttp2/internal.h
  7. 6
      src/core/transport/chttp2/parsing.c
  8. 38
      src/core/transport/chttp2/stream_lists.c
  9. 37
      src/core/transport/chttp2/writing.c
  10. 43
      src/core/transport/chttp2_transport.c
  11. 8
      src/core/transport/metadata.c
  12. 2
      src/core/transport/transport.c
  13. 14
      src/python/grpcio/README.rst
  14. 7
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
  15. 40
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  16. 2
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  17. 2
      src/python/grpcio/grpc/_cython/imports.generated.c
  18. 3
      src/python/grpcio/grpc/_cython/imports.generated.h
  19. 11
      src/python/grpcio/tests/_runner.py
  20. 62
      src/python/grpcio/tests/tests.json
  21. 30
      src/python/grpcio/tests/unit/_sanity/__init__.py
  22. 53
      src/python/grpcio/tests/unit/_sanity/_sanity_test.py
  23. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  24. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  25. 2
      tools/run_tests/build_python.sh
  26. 7
      tools/run_tests/run_python.sh
  27. 26
      tools/run_tests/run_tests.py

@ -182,6 +182,7 @@ EXPORTS
gpr_event_wait
gpr_ref_init
gpr_ref
gpr_ref_non_zero
gpr_refn
gpr_unref
gpr_stats_init

@ -182,6 +182,10 @@ GPRAPI void gpr_ref_init(gpr_refcount *r, int n);
/* Increment the reference count *r. Requires *r initialized. */
GPRAPI void gpr_ref(gpr_refcount *r);
/* Increment the reference count *r. Requires *r initialized.
Crashes if refcount is zero */
GPRAPI void gpr_ref_non_zero(gpr_refcount *r);
/* Increment the reference count *r by n. Requires *r initialized, n > 0. */
GPRAPI void gpr_refn(gpr_refcount *r, int n);

@ -41,9 +41,11 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include <grpc/support/useful.h>
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/timer.h"
#include "src/core/support/env.h"
#include "src/core/support/string.h"
static gpr_mu g_mu;
@ -116,6 +118,9 @@ void grpc_iomgr_shutdown(void) {
"memory leaks are likely",
count_objects());
dump_objects("LEAKED");
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
}
break;
}
@ -154,3 +159,14 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_mu_unlock(&g_mu);
gpr_free(obj->name);
}
bool grpc_iomgr_abort_on_leaks(void) {
char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS");
if (env == NULL) return false;
static const char *truthy[] = {"yes", "Yes", "YES", "true",
"True", "TRUE", "1"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == strcmp(env, truthy[i])) return true;
}
return false;
}

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
#include <stdbool.h>
#include "src/core/iomgr/iomgr.h"
#include <grpc/support/sync.h>
@ -55,4 +57,6 @@ void grpc_iomgr_platform_flush(void);
/** tear down all platform specific global iomgr structures */
void grpc_iomgr_platform_shutdown(void);
bool grpc_iomgr_abort_on_leaks(void);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -98,6 +98,11 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); }
void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); }
void gpr_ref_non_zero(gpr_refcount *r) {
gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1);
GPR_ASSERT(prior > 0);
}
void gpr_refn(gpr_refcount *r, int n) {
gpr_atm_no_barrier_fetch_add(&r->count, n);
}

@ -417,7 +417,7 @@ typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
uint8_t fetching;
uint8_t sent_initial_metadata;
bool sent_initial_metadata;
uint8_t sent_message;
uint8_t sent_trailing_metadata;
uint8_t read_closed;
@ -509,7 +509,7 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
void grpc_chttp2_list_add_writable_stream(
bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
/** Get a writable stream
@ -519,14 +519,13 @@ int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_remove_writable_stream(
bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
grpc_chttp2_stream_global *stream_global) GRPC_MUST_USE_RESULT;
/* returns 1 if stream added, 0 if it was already present */
int grpc_chttp2_list_add_writing_stream(
void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT;
grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_list_pop_writing_stream(
@ -770,4 +769,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *parsing,
const uint8_t *opaque_8bytes);
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
#endif

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -149,7 +149,7 @@ void grpc_chttp2_publish_reads(
if (was_zero && !is_zero) {
while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
&stream_global)) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@ -178,7 +178,7 @@ void grpc_chttp2_publish_reads(
outgoing_window);
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}
stream_global->max_recv_bytes -= (uint32_t)GPR_MIN(

@ -100,11 +100,14 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
}
}
static void stream_list_maybe_remove(grpc_chttp2_transport *t,
static bool stream_list_maybe_remove(grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
stream_list_remove(t, s, id);
return true;
} else {
return false;
}
}
@ -125,23 +128,24 @@ static void stream_list_add_tail(grpc_chttp2_transport *t,
s->included[id] = 1;
}
static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
static bool stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
return 0;
return false;
}
stream_list_add_tail(t, s, id);
return 1;
return true;
}
/* wrappers for specializations */
void grpc_chttp2_list_add_writable_stream(
bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
return stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
}
int grpc_chttp2_list_pop_writable_stream(
@ -159,20 +163,20 @@ int grpc_chttp2_list_pop_writable_stream(
return r;
}
void grpc_chttp2_list_remove_writable_stream(
bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
}
int grpc_chttp2_list_add_writing_stream(
void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_WRITING);
GPR_ASSERT(stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_WRITING));
}
int grpc_chttp2_list_have_writing_streams(
@ -332,7 +336,7 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport(
while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
if (is_window_available) {
grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global);
grpc_chttp2_become_writable(&transport->global, &stream->global);
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
&stream->writing);

@ -83,7 +83,8 @@ int grpc_chttp2_unlocking_check_writes(
(according to available window sizes) and add to the output buffer */
while (grpc_chttp2_list_pop_writable_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
uint8_t sent_initial_metadata;
bool sent_initial_metadata = stream_writing->sent_initial_metadata;
bool become_writable = false;
stream_writing->id = stream_global->id;
stream_writing->read_closed = stream_global->read_closed;
@ -92,16 +93,12 @@ int grpc_chttp2_unlocking_check_writes(
outgoing_window, stream_global,
outgoing_window);
sent_initial_metadata = stream_writing->sent_initial_metadata;
if (!sent_initial_metadata && stream_global->send_initial_metadata) {
stream_writing->send_initial_metadata =
stream_global->send_initial_metadata;
stream_global->send_initial_metadata = NULL;
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
sent_initial_metadata = 1;
become_writable = true;
sent_initial_metadata = true;
}
if (sent_initial_metadata) {
if (stream_global->send_message != NULL) {
@ -128,10 +125,7 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->flow_controlled_buffer.length > 0) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
become_writable = true;
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing);
@ -141,10 +135,7 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->send_trailing_metadata =
stream_global->send_trailing_metadata;
stream_global->send_trailing_metadata = NULL;
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
become_writable = true;
}
}
@ -153,10 +144,13 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
announce_window, stream_global,
unannounced_incoming_window_for_writing);
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
become_writable = true;
}
if (become_writable) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
}
@ -310,10 +304,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
(stream_writing->send_message && !stream_writing->fetching)) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
/* do nothing - already reffed */
}
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);

@ -142,7 +142,7 @@ static void incoming_byte_stream_update_flow_control(
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global);
/*
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@ -521,7 +521,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.id) == NULL);
}
grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
&s->global);
grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global);
@ -583,7 +582,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
return &accepting->parsing;
}
/*
/*******************************************************************************
* LOCK MANAGEMENT
*/
@ -611,10 +610,18 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_END("unlock", 0);
}
/*
/*******************************************************************************
* OUTPUT PROCESSING
*/
void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed &&
grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
}
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
@ -732,7 +739,7 @@ static void maybe_start_some_streams(
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = 1;
transport_global->concurrent_stream_count++;
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@ -821,7 +828,7 @@ static void perform_stream_op_locked(
maybe_start_some_streams(exec_ctx, transport_global);
} else {
GPR_ASSERT(stream_global->id != 0);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}
} else {
grpc_chttp2_complete_closure_step(
@ -838,7 +845,7 @@ static void perform_stream_op_locked(
exec_ctx, &stream_global->send_message_finished, 0);
} else if (stream_global->id != 0) {
stream_global->send_message = op->send_message;
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@ -858,7 +865,7 @@ static void perform_stream_op_locked(
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@ -999,7 +1006,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
}
/*
/*******************************************************************************
* INPUT PROCESSING
*/
@ -1064,7 +1071,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (!s) {
s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
}
grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
GPR_ASSERT(s);
s->global.in_stream_map = 0;
if (t->parsing.incoming_stream == &s->parsing) {
@ -1080,6 +1086,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(exec_ctx, t);
}
if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
}
new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
grpc_chttp2_stream_map_size(&t->new_stream_map);
@ -1331,7 +1340,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@ -1426,7 +1435,7 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
GPR_TIMER_END("recv_data", 0);
}
/*
/*******************************************************************************
* CALLBACK LOOP
*/
@ -1440,7 +1449,7 @@ static void connectivity_state_set(
state, reason);
}
/*
/*******************************************************************************
* POLLSET STUFF
*/
@ -1468,7 +1477,7 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
unlock(exec_ctx, t);
}
/*
/*******************************************************************************
* BYTE STREAM
*/
@ -1508,7 +1517,7 @@ static void incoming_byte_stream_update_flow_control(
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@ -1623,7 +1632,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
return incoming_byte_stream;
}
/*
/*******************************************************************************
* TRACING
*/
@ -1709,7 +1718,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
gpr_free(prefix);
}
/*
/*******************************************************************************
* INTEGRATION GLUE
*/

@ -43,11 +43,13 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/profiling/timers.h"
#include "src/core/support/murmur_hash.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2/bin_encoder.h"
#include "src/core/transport/static_metadata.h"
#include "src/core/iomgr/iomgr_internal.h"
/* There are two kinds of mdelem and mdstr instances.
* Static instances are declared in static_metadata.{h,c} and
@ -227,6 +229,9 @@ void grpc_mdctx_global_shutdown(void) {
if (shard->count != 0) {
gpr_log(GPR_DEBUG, "WARNING: %d metadata elements were leaked",
shard->count);
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
}
gpr_free(shard->elems);
}
@ -237,6 +242,9 @@ void grpc_mdctx_global_shutdown(void) {
if (shard->count != 0) {
gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked",
shard->count);
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
}
gpr_free(shard->strs);
}

@ -45,7 +45,7 @@ void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
#else
void grpc_stream_ref(grpc_stream_refcount *refcount) {
#endif
gpr_ref(&refcount->refs);
gpr_ref_non_zero(&refcount->refs);
}
#ifdef GRPC_STREAM_REFCOUNT_DEBUG

@ -42,3 +42,17 @@ package named :code:`python-dev`).
Note that :code:`$REPO_ROOT` can be assigned to whatever directory name floats
your fancy.
Troubleshooting
~~~~~~~~~~~~~~~
Help, I ...
* **... see a** :code:`pkg_resources.VersionConflict` **when I try to install
grpc!**
This is likely because :code:`pip` doesn't own the offending dependency,
which in turn is likely because your operating system's package manager owns
it. You'll need to force the installation of the dependency:
:code:`pip install --ignore-installed $OFFENDING_DEPENDENCY`

@ -1,4 +1,4 @@
# Copyright 2015, Google Inc.
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -31,8 +31,9 @@
cdef class CompletionQueue:
cdef grpc_completion_queue *c_completion_queue
cdef object poll_condition
cdef bint is_polling
cdef object pluck_condition
cdef int num_plucking
cdef int num_polling
cdef bint is_shutting_down
cdef bint is_shutdown

@ -39,8 +39,9 @@ cdef class CompletionQueue:
self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False
self.is_shutdown = False
self.poll_condition = threading.Condition()
self.is_polling = False
self.pluck_condition = threading.Condition()
self.num_plucking = 0
self.num_polling = 0
cdef _interpret_event(self, grpc_event event):
cdef OperationTag tag = None
@ -87,19 +88,15 @@ cdef class CompletionQueue:
c_deadline = deadline.c_time
cdef grpc_event event
# Poll within a critical section
# TODO(atash) consider making queue polling contention a hard error to
# enable easier bug discovery
with self.poll_condition:
while self.is_polling:
self.poll_condition.wait(float(deadline) - time.time())
self.is_polling = True
# Poll within a critical section to detect contention
with self.pluck_condition:
assert self.num_plucking == 0, 'cannot simultaneously pluck and poll'
self.num_polling += 1
with nogil:
event = grpc_completion_queue_next(
self.c_completion_queue, c_deadline, NULL)
with self.poll_condition:
self.is_polling = False
self.poll_condition.notify()
with self.pluck_condition:
self.num_polling -= 1
return self._interpret_event(event)
def pluck(self, OperationTag tag, Timespec deadline=None):
@ -111,19 +108,18 @@ cdef class CompletionQueue:
c_deadline = deadline.c_time
cdef grpc_event event
# Poll within a critical section
# TODO(atash) consider making queue polling contention a hard error to
# enable easier bug discovery
with self.poll_condition:
while self.is_polling:
self.poll_condition.wait(float(deadline) - time.time())
self.is_polling = True
# Pluck within a critical section to detect contention
with self.pluck_condition:
assert self.num_polling == 0, 'cannot simultaneously pluck and poll'
assert self.num_plucking < GRPC_MAX_COMPLETION_QUEUE_PLUCKERS, (
'cannot pluck more than {} times simultaneously'.format(
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS))
self.num_plucking += 1
with nogil:
event = grpc_completion_queue_pluck(
self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL)
with self.poll_condition:
self.is_polling = False
self.poll_condition.notify()
with self.pluck_condition:
self.num_plucking -= 1
return self._interpret_event(event)
def shutdown(self):

@ -138,6 +138,8 @@ cdef extern from "grpc/_cython/loader.h":
const int GRPC_WRITE_NO_COMPRESS
const int GRPC_WRITE_USED_MASK
const int GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
ctypedef struct grpc_completion_queue:
# We don't care about the internals (and in fact don't know them)
pass

@ -220,6 +220,7 @@ gpr_event_get_type gpr_event_get_import;
gpr_event_wait_type gpr_event_wait_import;
gpr_ref_init_type gpr_ref_init_import;
gpr_ref_type gpr_ref_import;
gpr_ref_non_zero_type gpr_ref_non_zero_import;
gpr_refn_type gpr_refn_import;
gpr_unref_type gpr_unref_import;
gpr_stats_init_type gpr_stats_init_import;
@ -485,6 +486,7 @@ void pygrpc_load_imports(HMODULE library) {
gpr_event_wait_import = (gpr_event_wait_type) GetProcAddress(library, "gpr_event_wait");
gpr_ref_init_import = (gpr_ref_init_type) GetProcAddress(library, "gpr_ref_init");
gpr_ref_import = (gpr_ref_type) GetProcAddress(library, "gpr_ref");
gpr_ref_non_zero_import = (gpr_ref_non_zero_type) GetProcAddress(library, "gpr_ref_non_zero");
gpr_refn_import = (gpr_refn_type) GetProcAddress(library, "gpr_refn");
gpr_unref_import = (gpr_unref_type) GetProcAddress(library, "gpr_unref");
gpr_stats_init_import = (gpr_stats_init_type) GetProcAddress(library, "gpr_stats_init");

@ -610,6 +610,9 @@ extern gpr_ref_init_type gpr_ref_init_import;
typedef void(*gpr_ref_type)(gpr_refcount *r);
extern gpr_ref_type gpr_ref_import;
#define gpr_ref gpr_ref_import
typedef void(*gpr_ref_non_zero_type)(gpr_refcount *r);
extern gpr_ref_non_zero_type gpr_ref_non_zero_import;
#define gpr_ref_non_zero gpr_ref_non_zero_import
typedef void(*gpr_refn_type)(gpr_refcount *r, int n);
extern gpr_refn_type gpr_refn_import;
#define gpr_refn gpr_refn_import

@ -1,4 +1,4 @@
# Copyright 2015, Google Inc.
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -143,10 +143,17 @@ class Runner(object):
def run(self, suite):
"""See setuptools' test_runner setup argument for information."""
# only run test cases with id starting with given prefix
testcase_filter = os.getenv('GPRC_PYTHON_TESTRUNNER_FILTER')
filtered_cases = []
for case in _loader.iterate_suite_cases(suite):
if not testcase_filter or case.id().startswith(testcase_filter):
filtered_cases.append(case)
# Ensure that every test case has no collision with any other test case in
# the augmented results.
augmented_cases = [AugmentedCase(case, uuid.uuid4())
for case in _loader.iterate_suite_cases(suite)]
for case in filtered_cases]
case_id_by_case = dict((augmented_case.case, augmented_case.id)
for augmented_case in augmented_cases)
result_out = StringIO.StringIO()

@ -0,0 +1,62 @@
[
"_base_interface_test.AsyncEasyTest",
"_base_interface_test.AsyncPeasyTest",
"_base_interface_test.SyncEasyTest",
"_base_interface_test.SyncPeasyTest",
"_beta_features_test.BetaFeaturesTest",
"_beta_features_test.ContextManagementAndLifecycleTest",
"_channel_test.ChannelTest",
"_connectivity_channel_test.ChannelConnectivityTest",
"_core_over_links_base_interface_test.AsyncEasyTest",
"_core_over_links_base_interface_test.AsyncPeasyTest",
"_core_over_links_base_interface_test.SyncEasyTest",
"_core_over_links_base_interface_test.SyncPeasyTest",
"_crust_over_core_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_over_links_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_over_links_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_over_links_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
"_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest",
"_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
"_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest",
"_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
"_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest",
"_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_implementations_test.ChannelCredentialsTest",
"_insecure_interop_test.InsecureInteropTest",
"_intermediary_low_test.CancellationTest",
"_intermediary_low_test.EchoTest",
"_intermediary_low_test.ExpirationTest",
"_intermediary_low_test.LonelyClientTest",
"_later_test.LaterTest",
"_logging_pool_test.LoggingPoolTest",
"_lonely_invocation_link_test.LonelyInvocationLinkTest",
"_low_test.HangingServerShutdown",
"_low_test.InsecureServerInsecureClient",
"_not_found_test.NotFoundTest",
"_sanity_test.Sanity",
"_secure_interop_test.SecureInteropTest",
"_transmission_test.RoundTripTest",
"_transmission_test.TransmissionTest",
"_utilities_test.ChannelConnectivityTest",
"beta_python_plugin_test.PythonPluginTest",
"cygrpc_test.InsecureServerInsecureClient",
"cygrpc_test.SecureServerSecureClient",
"cygrpc_test.TypeSmokeTest"
]

@ -0,0 +1,30 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,53 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import json
import unittest
import tests
class Sanity(unittest.TestCase):
def testTestsJsonUpToDate(self):
"""Autodiscovers all test suites and checks that tests.json is up to date"""
loader = tests.Loader()
loader.loadTestsFromNames(['tests'])
test_suite_names = [
test_case_class.id().rsplit('.', 1)[0]
for test_case_class in tests._loader.iterate_suite_cases(loader.suite)]
test_suite_names = sorted(set(test_suite_names))
with open('src/python/grpcio/tests/tests.json') as tests_json_file:
tests_json = json.load(tests_json_file)
self.assertListEqual(test_suite_names, tests_json)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -220,6 +220,7 @@ gpr_event_get_type gpr_event_get_import;
gpr_event_wait_type gpr_event_wait_import;
gpr_ref_init_type gpr_ref_init_import;
gpr_ref_type gpr_ref_import;
gpr_ref_non_zero_type gpr_ref_non_zero_import;
gpr_refn_type gpr_refn_import;
gpr_unref_type gpr_unref_import;
gpr_stats_init_type gpr_stats_init_import;
@ -481,6 +482,7 @@ void grpc_rb_load_imports(HMODULE library) {
gpr_event_wait_import = (gpr_event_wait_type) GetProcAddress(library, "gpr_event_wait");
gpr_ref_init_import = (gpr_ref_init_type) GetProcAddress(library, "gpr_ref_init");
gpr_ref_import = (gpr_ref_type) GetProcAddress(library, "gpr_ref");
gpr_ref_non_zero_import = (gpr_ref_non_zero_type) GetProcAddress(library, "gpr_ref_non_zero");
gpr_refn_import = (gpr_refn_type) GetProcAddress(library, "gpr_refn");
gpr_unref_import = (gpr_unref_type) GetProcAddress(library, "gpr_unref");
gpr_stats_init_import = (gpr_stats_init_type) GetProcAddress(library, "gpr_stats_init");

@ -610,6 +610,9 @@ extern gpr_ref_init_type gpr_ref_init_import;
typedef void(*gpr_ref_type)(gpr_refcount *r);
extern gpr_ref_type gpr_ref_import;
#define gpr_ref gpr_ref_import
typedef void(*gpr_ref_non_zero_type)(gpr_refcount *r);
extern gpr_ref_non_zero_type gpr_ref_non_zero_import;
#define gpr_ref_non_zero gpr_ref_non_zero_import
typedef void(*gpr_refn_type)(gpr_refcount *r, int n);
extern gpr_refn_type gpr_refn_import;
#define gpr_refn gpr_refn_import

@ -45,3 +45,5 @@ export GRPC_PYTHON_ENABLE_CYTHON_TRACING=1
tox --notest
$ROOT/.tox/py27/bin/python $ROOT/setup.py build
$ROOT/.tox/py27/bin/python $ROOT/setup.py build_py
$ROOT/.tox/py27/bin/python $ROOT/setup.py gather --test

@ -42,7 +42,12 @@ export LDFLAGS="-L$ROOT/libs/$CONFIG"
export GRPC_PYTHON_BUILD_WITH_CYTHON=1
export GRPC_PYTHON_ENABLE_CYTHON_TRACING=1
tox
if [ "$CONFIG" = "gcov" ]
then
tox
else
$ROOT/.tox/py27/bin/python $ROOT/setup.py test
fi
mkdir -p $ROOT/reports
rm -rf $ROOT/reports/python-coverage

@ -353,15 +353,27 @@ class PythonLanguage(object):
_check_compiler(self.args.compiler, ['default'])
def test_specs(self):
# load list of known test suites
with open('src/python/grpcio/tests/tests.json') as tests_json_file:
tests_json = json.load(tests_json_file)
environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS)
environment['PYVER'] = '2.7'
return [self.config.job_spec(
['tools/run_tests/run_python.sh'],
None,
environ=environment,
shortname='py.test',
timeout_seconds=15*60
)]
if self.config.build_config != 'gcov':
return [self.config.job_spec(
['tools/run_tests/run_python.sh'],
None,
environ=dict(environment.items() +
[('GPRC_PYTHON_TESTRUNNER_FILTER', suite_name)]),
shortname='py.test.%s' % suite_name,
timeout_seconds=5*60)
for suite_name in tests_json]
else:
return [self.config.job_spec(['tools/run_tests/run_python.sh'],
None,
environ=environment,
shortname='py.test.coverage',
timeout_seconds=15*60)]
def pre_build_steps(self):
return []

Loading…
Cancel
Save