Merge branch 'master' into return_unused_port

pull/6589/head
David Klempner 9 years ago
commit 2ecb08f5a7
  1. 6
      BUILD
  2. 6
      Makefile
  3. 1
      binding.gyp
  4. 10
      build.yaml
  5. 1
      config.m4
  6. 3
      gRPC.podspec
  7. 2
      grpc.gemspec
  8. 29
      include/grpc/impl/codegen/compression_types.h
  9. 2
      package.xml
  10. 9
      src/core/lib/channel/channel_args.c
  11. 2
      src/core/lib/iomgr/ev_poll_and_epoll_posix.c
  12. 1212
      src/core/lib/iomgr/ev_poll_posix.c
  13. 41
      src/core/lib/iomgr/ev_poll_posix.h
  14. 93
      src/core/lib/iomgr/ev_posix.c
  15. 6
      src/core/lib/iomgr/iomgr_posix.c
  16. 2
      src/cpp/common/channel_arguments.cc
  17. 2
      src/cpp/server/server_builder.cc
  18. 1
      src/php/ext/grpc/call.c
  19. 1
      src/python/grpcio/grpc_core_dependencies.py
  20. 295
      src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
  21. 51
      src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto
  22. 77
      src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto
  23. 47
      src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto
  24. 85
      src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
  25. 3
      test/core/channel/channel_args_test.c
  26. 44
      test/core/client_config/set_initial_connect_string_test.c
  27. 12
      test/core/iomgr/udp_server_test.c
  28. 13
      test/core/iomgr/workqueue_test.c
  29. 8
      test/cpp/interop/interop_client.cc
  30. 1
      tools/buildgen/plugins/make_fuzzer_tests.py
  31. 2
      tools/doxygen/Doxyfile.core.internal
  32. 3
      tools/run_tests/configs.json
  33. 1
      tools/run_tests/dockerize/docker_run_tests.sh
  34. 8
      tools/run_tests/jobset.py
  35. 100
      tools/run_tests/run_tests.py
  36. 3
      tools/run_tests/sources_and_headers.json
  37. 8
      tools/run_tests/stress_test/configs/java.json
  38. 7824
      tools/run_tests/tests.json
  39. 3
      vsprojects/vcxproj/grpc/grpc.vcxproj
  40. 6
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  41. 3
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  42. 6
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -179,6 +179,7 @@ cc_library(
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
@ -313,6 +314,7 @@ cc_library(
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_poll_posix.c",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/exec_ctx.c",
"src/core/lib/iomgr/executor.c",
@ -530,6 +532,7 @@ cc_library(
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
@ -650,6 +653,7 @@ cc_library(
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_poll_posix.c",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/exec_ctx.c",
"src/core/lib/iomgr/executor.c",
@ -1342,6 +1346,7 @@ objc_library(
"src/core/lib/iomgr/endpoint_pair_posix.c",
"src/core/lib/iomgr/endpoint_pair_windows.c",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_poll_posix.c",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/exec_ctx.c",
"src/core/lib/iomgr/executor.c",
@ -1538,6 +1543,7 @@ objc_library(
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",

@ -187,8 +187,8 @@ CC_ubsan = clang
CXX_ubsan = clang++
LD_ubsan = clang
LDXX_ubsan = clang++
CPPFLAGS_ubsan = -O1 -fsanitize-coverage=edge -fsanitize=undefined -fno-omit-frame-pointer -Wno-unused-command-line-argument
LDFLAGS_ubsan = -fsanitize=undefined
CPPFLAGS_ubsan = -O0 -fsanitize-coverage=edge -fsanitize=undefined,unsigned-integer-overflow -fno-omit-frame-pointer -Wno-unused-command-line-argument -Wvarargs
LDFLAGS_ubsan = -fsanitize=undefined,unsigned-integer-overflow
DEFINES_ubsan = NDEBUG
DEFINES_ubsan += GRPC_TEST_SLOWDOWN_BUILD_FACTOR=1.5
@ -2511,6 +2511,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_poll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \
src/core/lib/iomgr/executor.c \
@ -2857,6 +2858,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_poll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \
src/core/lib/iomgr/executor.c \

@ -582,6 +582,7 @@
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
'src/core/lib/iomgr/executor.c',

@ -166,6 +166,7 @@ filegroups:
- src/core/lib/iomgr/endpoint.h
- src/core/lib/iomgr/endpoint_pair.h
- src/core/lib/iomgr/ev_poll_and_epoll_posix.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
@ -240,6 +241,7 @@ filegroups:
- src/core/lib/iomgr/endpoint_pair_posix.c
- src/core/lib/iomgr/endpoint_pair_windows.c
- src/core/lib/iomgr/ev_poll_and_epoll_posix.c
- src/core/lib/iomgr/ev_poll_posix.c
- src/core/lib/iomgr/ev_posix.c
- src/core/lib/iomgr/exec_ctx.c
- src/core/lib/iomgr/executor.c
@ -3267,14 +3269,16 @@ configs:
timeout_multiplier: 5
ubsan:
CC: clang
CPPFLAGS: -O1 -fsanitize-coverage=edge -fsanitize=undefined -fno-omit-frame-pointer
-Wno-unused-command-line-argument
CPPFLAGS: -O0 -fsanitize-coverage=edge -fsanitize=undefined,unsigned-integer-overflow
-fno-omit-frame-pointer -Wno-unused-command-line-argument -Wvarargs
CXX: clang++
DEFINES: NDEBUG
LD: clang
LDFLAGS: -fsanitize=undefined
LDFLAGS: -fsanitize=undefined,unsigned-integer-overflow
LDXX: clang++
compile_the_world: true
test_environ:
UBSAN_OPTIONS: print_stacktrace=1
timeout_multiplier: 1.5
defaults:
boringssl:

@ -101,6 +101,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_poll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \
src/core/lib/iomgr/executor.c \

@ -182,6 +182,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
@ -350,6 +351,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
'src/core/lib/iomgr/executor.c',
@ -530,6 +532,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/endpoint.h',
'src/core/lib/iomgr/endpoint_pair.h',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',

@ -191,6 +191,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/endpoint.h )
s.files += %w( src/core/lib/iomgr/endpoint_pair.h )
s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.h )
s.files += %w( src/core/lib/iomgr/ev_poll_posix.h )
s.files += %w( src/core/lib/iomgr/ev_posix.h )
s.files += %w( src/core/lib/iomgr/exec_ctx.h )
s.files += %w( src/core/lib/iomgr/executor.h )
@ -329,6 +330,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/endpoint_pair_posix.c )
s.files += %w( src/core/lib/iomgr/endpoint_pair_windows.c )
s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.c )
s.files += %w( src/core/lib/iomgr/ev_poll_posix.c )
s.files += %w( src/core/lib/iomgr/ev_posix.c )
s.files += %w( src/core/lib/iomgr/exec_ctx.c )
s.files += %w( src/core/lib/iomgr/executor.c )

@ -41,10 +41,13 @@ extern "C" {
#endif
/** To be used in channel arguments */
#define GRPC_COMPRESSION_ALGORITHM_ARG "grpc.compression_algorithm"
#define GRPC_COMPRESSION_ALGORITHM_STATE_ARG "grpc.compression_algorithm_state"
#define GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM \
"grpc.default_compression_algorithm"
#define GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL "grpc.default_compression_level"
#define GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET \
"grpc.compression_enabled_algorithms_bitset"
/* The various compression algorithms supported by GRPC */
/* The various compression algorithms supported by gRPC */
typedef enum {
GRPC_COMPRESS_NONE = 0,
GRPC_COMPRESS_DEFLATE,
@ -53,6 +56,10 @@ typedef enum {
GRPC_COMPRESS_ALGORITHMS_COUNT
} grpc_compression_algorithm;
/** Compression levels allow a party with knowledge of its peer's accepted
* encodings to request compression in an abstract way. The level-algorithm
* mapping is performed internally and depends on the peer's supported
* compression algorithms. */
typedef enum {
GRPC_COMPRESS_LEVEL_NONE = 0,
GRPC_COMPRESS_LEVEL_LOW,
@ -62,8 +69,20 @@ typedef enum {
} grpc_compression_level;
typedef struct grpc_compression_options {
uint32_t enabled_algorithms_bitset; /**< All algs are enabled by default */
grpc_compression_algorithm default_compression_algorithm; /**< for channel */
/** All algs are enabled by default. This option corresponds to the channel
* argument key behind \a GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET
*/
uint32_t enabled_algorithms_bitset;
/** The default channel compression algorithm. It'll be used in the absence of
* call specific settings. This option corresponds to the channel argument key
* behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM */
grpc_compression_algorithm default_compression_algorithm;
/** The default channel compression level. It'll be used in the absence of
* call specific settings. This option corresponds to the channel argument key
* behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL */
grpc_compression_algorithm default_compression_level;
} grpc_compression_options;
#ifdef __cplusplus

@ -198,6 +198,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_and_epoll_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor.h" role="src" />
@ -336,6 +337,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_windows.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_and_epoll_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_poll_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor.c" role="src" />

@ -170,7 +170,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
if (a == NULL) return 0;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
!strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) {
!strcmp(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, a->args[i].key)) {
return (grpc_compression_algorithm)a->args[i].value.integer;
break;
}
@ -182,7 +182,7 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm) {
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG;
tmp.key = GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM;
tmp.value.integer = algorithm;
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
@ -196,7 +196,8 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a,
size_t i;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
!strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
!strcmp(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
a->args[i].key)) {
*states_arg = &a->args[i].value.integer;
return 1; /* GPR_TRUE */
}
@ -222,7 +223,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
/* create a new arg */
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
tmp.key = GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET;
/* all enabled by default */
tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
if (state != 0) {

@ -790,7 +790,6 @@ static void pollset_kick(grpc_pollset *p,
static void pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
grpc_wakeup_fd_global_init();
grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
@ -798,7 +797,6 @@ static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
grpc_wakeup_fd_global_destroy();
}
static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }

File diff suppressed because it is too large Load Diff

@ -0,0 +1,41 @@
/*
*
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
#define GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
#include "src/core/lib/iomgr/ev_posix.h"
const grpc_event_engine_vtable *grpc_init_poll_posix(void);
#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */

@ -37,23 +37,104 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
#include "src/core/lib/iomgr/ev_poll_posix.h"
#include "src/core/lib/support/env.h"
/** Default poll() function - a pointer so that it can be overridden by some
* tests */
grpc_poll_function_type grpc_poll_function = poll;
static const grpc_event_engine_vtable *g_event_engine;
grpc_poll_function_type grpc_poll_function = poll;
typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void);
typedef struct {
const char *name;
event_engine_factory_fn factory;
} event_engine_factory;
static const event_engine_factory g_factories[] = {
{"poll", grpc_init_poll_posix}, {"legacy", grpc_init_poll_and_epoll_posix},
};
static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
size_t n = *ns;
size_t np = n + 1;
char *s;
size_t len;
GPR_ASSERT(end >= beg);
len = (size_t)(end - beg);
s = gpr_malloc(len + 1);
memcpy(s, beg, len);
s[len] = 0;
*ss = gpr_realloc(*ss, sizeof(char **) * np);
(*ss)[n] = s;
*ns = np;
}
static void split(const char *s, char ***ss, size_t *ns) {
const char *c = strchr(s, ',');
if (c == NULL) {
add(s, s + strlen(s), ss, ns);
} else {
add(s, c, ss, ns);
split(c + 1, ss, ns);
}
}
static bool is(const char *want, const char *have) {
return 0 == strcmp(want, "all") || 0 == strcmp(want, have);
}
static void try_engine(const char *engine) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
if (is(engine, g_factories[i].name)) {
if ((g_event_engine = g_factories[i].factory())) {
gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name);
return;
}
}
}
}
void grpc_event_engine_init(void) {
if ((g_event_engine = grpc_init_poll_and_epoll_posix())) {
return;
char *s = gpr_getenv("GRPC_POLL_STRATEGY");
if (s == NULL) {
s = gpr_strdup("all");
}
char **strings = NULL;
size_t nstrings = 0;
split(s, &strings, &nstrings);
for (size_t i = 0; g_event_engine == NULL && i < nstrings; i++) {
try_engine(strings[i]);
}
for (size_t i = 0; i < nstrings; i++) {
gpr_free(strings[i]);
}
gpr_free(strings);
gpr_free(s);
if (g_event_engine == NULL) {
gpr_log(GPR_ERROR, "No event engine could be initialized");
abort();
}
gpr_log(GPR_ERROR, "No event engine could be initialized");
abort();
}
void grpc_event_engine_shutdown(void) { g_event_engine->shutdown_engine(); }
void grpc_event_engine_shutdown(void) {
g_event_engine->shutdown_engine();
g_event_engine = NULL;
}
grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);

@ -41,12 +41,16 @@
#include "src/core/lib/iomgr/tcp_posix.h"
void grpc_iomgr_platform_init(void) {
grpc_wakeup_fd_global_init();
grpc_event_engine_init();
grpc_register_tracer("tcp", &grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
void grpc_iomgr_platform_shutdown(void) { grpc_event_engine_shutdown(); }
void grpc_iomgr_platform_shutdown(void) {
grpc_event_engine_shutdown();
grpc_wakeup_fd_global_destroy();
}
#endif /* GRPC_POSIX_SOCKET */

@ -85,7 +85,7 @@ void ChannelArguments::Swap(ChannelArguments& other) {
void ChannelArguments::SetCompressionAlgorithm(
grpc_compression_algorithm algorithm) {
SetInt(GRPC_COMPRESSION_ALGORITHM_ARG, algorithm);
SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm);
}
// Note: a second call to this will add in front the result of the first call.

@ -123,7 +123,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
if (max_message_size_ > 0) {
args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_);
}
args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args));

@ -96,6 +96,7 @@ zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned) {
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(call_object TSRMLS_CC);
call->wrapped = wrapped;
call->owned = owned;
return call_object;
}

@ -95,6 +95,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
'src/core/lib/iomgr/executor.c',

@ -59,11 +59,12 @@ STUB_FACTORY_IDENTIFIER = 'beta_create_TestService_stub'
class _ServicerMethods(object):
def __init__(self, test_pb2):
def __init__(self, response_pb2, payload_pb2):
self._condition = threading.Condition()
self._paused = False
self._fail = False
self._test_pb2 = test_pb2
self._response_pb2 = response_pb2
self._payload_pb2 = payload_pb2
@contextlib.contextmanager
def pause(self): # pylint: disable=invalid-name
@ -90,22 +91,22 @@ class _ServicerMethods(object):
self._condition.wait()
def UnaryCall(self, request, unused_rpc_context):
response = self._test_pb2.SimpleResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response = self._response_pb2.SimpleResponse()
response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * request.response_size
self._control()
return response
def StreamingOutputCall(self, request, unused_rpc_context):
for parameter in request.response_parameters:
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response = self._response_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def StreamingInputCall(self, request_iter, unused_rpc_context):
response = self._test_pb2.StreamingInputCallResponse()
response = self._response_pb2.StreamingInputCallResponse()
aggregated_payload_size = 0
for request in request_iter:
aggregated_payload_size += len(request.payload.payload_compressable)
@ -116,8 +117,8 @@ class _ServicerMethods(object):
def FullDuplexCall(self, request_iter, unused_rpc_context):
for request in request_iter:
for parameter in request.response_parameters:
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response = self._response_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
@ -126,8 +127,8 @@ class _ServicerMethods(object):
responses = []
for request in request_iter:
for parameter in request.response_parameters:
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response = self._response_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
responses.append(response)
@ -136,23 +137,25 @@ class _ServicerMethods(object):
@contextlib.contextmanager
def _CreateService(test_pb2):
def _CreateService(service_pb2, response_pb2, payload_pb2):
"""Provides a servicer backend and a stub.
The servicer is just the implementation of the actual servicer passed to the
face player of the python RPC implementation; the two are detached.
Args:
test_pb2: The test_pb2 module generated by this test.
service_pb2: The service_pb2 module generated by this test.
response_pb2: The response_pb2 module generated by this test
payload_pb2: The payload_pb2 module generated by this test
Yields:
A (servicer_methods, stub) pair where servicer_methods is the back-end of
the service bound to the stub and and stub is the stub on which to invoke
RPCs.
"""
servicer_methods = _ServicerMethods(test_pb2)
servicer_methods = _ServicerMethods(response_pb2, payload_pb2)
class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
def UnaryCall(self, request, context):
return servicer_methods.UnaryCall(request, context)
@ -170,55 +173,52 @@ def _CreateService(test_pb2):
return servicer_methods.HalfDuplexCall(request_iter, context)
servicer = Servicer()
server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
channel = implementations.insecure_channel('localhost', port)
stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
yield servicer_methods, stub
stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
yield (servicer_methods, stub)
server.stop(0)
@contextlib.contextmanager
def _CreateIncompleteService(test_pb2):
def _CreateIncompleteService(service_pb2):
"""Provides a servicer backend that fails to implement methods and its stub.
The servicer is just the implementation of the actual servicer passed to the
face player of the python RPC implementation; the two are detached.
Args:
test_pb2: The test_pb2 module generated by this test.
service_pb2: The service_pb2 module generated by this test.
Yields:
A (servicer_methods, stub) pair where servicer_methods is the back-end of
the service bound to the stub and and stub is the stub on which to invoke
RPCs.
"""
servicer_methods = _ServicerMethods(test_pb2)
class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
pass
servicer = Servicer()
server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
channel = implementations.insecure_channel('localhost', port)
stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
yield servicer_methods, stub
stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
yield None, stub
server.stop(0)
def _streaming_input_request_iterator(test_pb2):
def _streaming_input_request_iterator(request_pb2, payload_pb2):
for _ in range(3):
request = test_pb2.StreamingInputCallRequest()
request.payload.payload_type = test_pb2.COMPRESSABLE
request = request_pb2.StreamingInputCallRequest()
request.payload.payload_type = payload_pb2.COMPRESSABLE
request.payload.payload_compressable = 'a'
yield request
def _streaming_output_request(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
def _streaming_output_request(request_pb2):
request = request_pb2.StreamingOutputCallRequest()
sizes = [1, 2, 3]
request.response_parameters.add(size=sizes[0], interval_us=0)
request.response_parameters.add(size=sizes[1], interval_us=0)
@ -226,11 +226,11 @@ def _streaming_output_request(test_pb2):
return request
def _full_duplex_request_iterator(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
def _full_duplex_request_iterator(request_pb2):
request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
request = test_pb2.StreamingOutputCallRequest()
request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@ -250,8 +250,6 @@ class PythonPluginTest(unittest.TestCase):
protoc_command = 'protoc'
protoc_plugin_filename = distutils.spawn.find_executable(
'grpc_python_plugin')
test_proto_filename = pkg_resources.resource_filename(
'tests.protoc_plugin', 'protoc_plugin_test.proto')
if not os.path.isfile(protoc_command):
# Assume that if we haven't built protoc that it's on the system.
protoc_command = 'protoc'
@ -259,19 +257,44 @@ class PythonPluginTest(unittest.TestCase):
# Ensure that the output directory exists.
self.outdir = tempfile.mkdtemp()
# Find all proto files
paths = []
root_dir = os.path.dirname(os.path.realpath(__file__))
proto_dir = os.path.join(root_dir, 'protos')
for walk_root, _, filenames in os.walk(proto_dir):
for filename in filenames:
if filename.endswith('.proto'):
path = os.path.join(walk_root, filename)
paths.append(path)
# Invoke protoc with the plugin.
cmd = [
protoc_command,
'--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
'-I .',
'-I %s' % root_dir,
'--python_out=%s' % self.outdir,
'--python-grpc_out=%s' % self.outdir,
os.path.basename(test_proto_filename),
]
'--python-grpc_out=%s' % self.outdir
] + paths
subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
cwd=os.path.dirname(test_proto_filename))
cwd=os.path.dirname(os.path.realpath(__file__)))
# Generated proto directories dont include __init__.py, but
# these are needed for python package resolution
for walk_root, _, _ in os.walk(os.path.join(self.outdir, 'protos')):
path = os.path.join(walk_root, '__init__.py')
open(path, 'a').close()
sys.path.insert(0, self.outdir)
import protos.payload.test_payload_pb2 as payload_pb2 # pylint: disable=g-import-not-at-top
import protos.requests.r.test_requests_pb2 as request_pb2 # pylint: disable=g-import-not-at-top
import protos.responses.test_responses_pb2 as response_pb2 # pylint: disable=g-import-not-at-top
import protos.service.test_service_pb2 as service_pb2 # pylint: disable=g-import-not-at-top
self._payload_pb2 = payload_pb2
self._request_pb2 = request_pb2
self._response_pb2 = response_pb2
self._service_pb2 = service_pb2
def tearDown(self):
try:
shutil.rmtree(self.outdir)
@ -282,43 +305,40 @@ class PythonPluginTest(unittest.TestCase):
def testImportAttributes(self):
# check that we can access the generated module and its members.
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
self.assertIsNotNone(
getattr(self._service_pb2, SERVICER_IDENTIFIER, None))
self.assertIsNotNone(
getattr(self._service_pb2, STUB_IDENTIFIER, None))
self.assertIsNotNone(
getattr(self._service_pb2, SERVER_FACTORY_IDENTIFIER, None))
self.assertIsNotNone(
getattr(self._service_pb2, STUB_FACTORY_IDENTIFIER, None))
def testUpDown(self):
import protoc_plugin_test_pb2 as test_pb2
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (servicer, stub):
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(
self._service_pb2, self._response_pb2, self._payload_pb2):
self._request_pb2.SimpleRequest(response_size=13)
def testIncompleteServicer(self):
import protoc_plugin_test_pb2 as test_pb2
moves.reload_module(test_pb2)
with _CreateIncompleteService(test_pb2) as (servicer, stub):
request = test_pb2.SimpleRequest(response_size=13)
with _CreateIncompleteService(self._service_pb2) as (_, stub):
request = self._request_pb2.SimpleRequest(response_size=13)
try:
response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
except face.AbortionError as error:
self.assertEqual(interfaces.StatusCode.UNIMPLEMENTED, error.code)
def testUnaryCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request = self._request_pb2.SimpleRequest(response_size=13)
response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
expected_response = methods.UnaryCall(request, 'not a real context!')
self.assertEqual(expected_response, response)
def testUnaryCallFuture(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request = self._request_pb2.SimpleRequest(response_size=13)
# Check that the call does not block waiting for the server to respond.
with methods.pause():
response_future = stub.UnaryCall.future(
@ -328,10 +348,9 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testUnaryCallFutureExpired(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request = self._request_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(
request, test_constants.SHORT_TIMEOUT)
@ -339,30 +358,27 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testUnaryCallFutureCancelled(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request = self._request_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(request, 1)
response_future.cancel()
self.assertTrue(response_future.cancelled())
def testUnaryCallFutureFailed(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request = self._request_pb2.SimpleRequest(response_size=13)
with methods.fail():
response_future = stub.UnaryCall.future(
request, test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testStreamingOutputCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request = _streaming_output_request(self._request_pb2)
responses = stub.StreamingOutputCall(
request, test_constants.LONG_TIMEOUT)
expected_responses = methods.StreamingOutputCall(
@ -372,10 +388,9 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testStreamingOutputCallExpired(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request = _streaming_output_request(self._request_pb2)
with methods.pause():
responses = stub.StreamingOutputCall(
request, test_constants.SHORT_TIMEOUT)
@ -383,10 +398,9 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testStreamingOutputCallCancelled(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2) as (unused_methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request = _streaming_output_request(self._request_pb2)
responses = stub.StreamingOutputCall(
request, test_constants.LONG_TIMEOUT)
next(responses)
@ -395,10 +409,9 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingOutputCallFailed(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request = _streaming_output_request(self._request_pb2)
with methods.fail():
responses = stub.StreamingOutputCall(request, 1)
self.assertIsNotNone(responses)
@ -406,36 +419,38 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingInputCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
response = stub.StreamingInputCall(
_streaming_input_request_iterator(test_pb2),
_streaming_input_request_iterator(
self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
expected_response = methods.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
_streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFuture(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
_streaming_input_request_iterator(test_pb2),
_streaming_input_request_iterator(
self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
response = response_future.result()
expected_response = methods.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
_streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFutureExpired(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
_streaming_input_request_iterator(test_pb2),
_streaming_input_request_iterator(
self._request_pb2, self._payload_pb2),
test_constants.SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError):
response_future.result()
@ -443,12 +458,12 @@ class PythonPluginTest(unittest.TestCase):
response_future.exception(), face.ExpirationError)
def testStreamingInputCallFutureCancelled(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
_streaming_input_request_iterator(test_pb2),
_streaming_input_request_iterator(
self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
response_future.cancel()
self.assertTrue(response_future.cancelled())
@ -456,32 +471,32 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testStreamingInputCallFutureFailed(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
with methods.fail():
response_future = stub.StreamingInputCall.future(
_streaming_input_request_iterator(test_pb2),
_streaming_input_request_iterator(
self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testFullDuplexCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
responses = stub.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), test_constants.LONG_TIMEOUT)
_full_duplex_request_iterator(self._request_pb2),
test_constants.LONG_TIMEOUT)
expected_responses = methods.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
_full_duplex_request_iterator(self._request_pb2),
'not a real RpcContext!')
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
def testFullDuplexCallExpired(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
request_iterator = _full_duplex_request_iterator(self._request_pb2)
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
with methods.pause():
responses = stub.FullDuplexCall(
request_iterator, test_constants.SHORT_TIMEOUT)
@ -489,10 +504,9 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testFullDuplexCallCancelled(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
request_iterator = _full_duplex_request_iterator(self._request_pb2)
responses = stub.FullDuplexCall(
request_iterator, test_constants.LONG_TIMEOUT)
next(responses)
@ -501,10 +515,9 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testFullDuplexCallFailed(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
request_iterator = _full_duplex_request_iterator(self._request_pb2)
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
with methods.fail():
responses = stub.FullDuplexCall(
request_iterator, test_constants.LONG_TIMEOUT)
@ -513,14 +526,13 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testHalfDuplexCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
request = test_pb2.StreamingOutputCallRequest()
request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@ -533,8 +545,6 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
moves.reload_module(test_pb2)
condition = threading.Condition()
wait_cell = [False]
@contextlib.contextmanager
@ -547,13 +557,14 @@ class PythonPluginTest(unittest.TestCase):
wait_cell[0] = False
condition.notify_all()
def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
with condition:
while wait_cell[0]:
condition.wait()
with _CreateService(test_pb2) as (methods, stub):
with _CreateService(self._service_pb2, self._response_pb2,
self._payload_pb2) as (methods, stub):
with wait():
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), test_constants.SHORT_TIMEOUT)
@ -563,5 +574,5 @@ class PythonPluginTest(unittest.TestCase):
if __name__ == '__main__':
os.chdir(os.path.dirname(sys.argv[0]))
#os.chdir(os.path.dirname(sys.argv[0]))
unittest.main(verbosity=2)

@ -0,0 +1,51 @@
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package grpc_protoc_plugin;
enum PayloadType {
// Compressable text format.
COMPRESSABLE= 0;
// Uncompressable binary format.
UNCOMPRESSABLE = 1;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 2;
}
message Payload {
PayloadType payload_type = 1;
oneof payload_body {
string payload_compressable = 2;
bytes payload_uncompressable = 3;
}
}

@ -0,0 +1,77 @@
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
import "protos/payload/test_payload.proto";
package grpc_protoc_plugin;
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
PayloadType response_type = 1;
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
int32 response_size = 2;
// input payload sent along with the request.
Payload payload = 3;
}
message StreamingInputCallRequest {
// input payload sent along with the request.
Payload payload = 1;
// Not expecting any payload from the response.
}
message ResponseParameters {
// Desired payload sizes in responses from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
int32 size = 1;
// Desired interval between consecutive responses in the response stream in
// microseconds.
int32 interval_us = 2;
}
message StreamingOutputCallRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, the payload from each response in the stream
// might be of different types. This is to simulate a mixed type of payload
// stream.
PayloadType response_type = 1;
repeated ResponseParameters response_parameters = 2;
// input payload sent along with the request.
Payload payload = 3;
}

@ -0,0 +1,47 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
import "protos/payload/test_payload.proto";
package grpc_protoc_plugin;
message SimpleResponse {
Payload payload = 1;
}
message StreamingInputCallResponse {
// Aggregated size of payloads received from the client.
int32 aggregated_payload_size = 1;
}
message StreamingOutputCallResponse {
Payload payload = 1;
}

@ -1,4 +1,4 @@
// Copyright 2015, Google Inc.
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@ -27,87 +27,12 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
// This file is duplicated around the code base. See GitHub issue #526.
syntax = "proto2";
syntax = "proto3";
package grpc_protoc_plugin;
enum PayloadType {
// Compressable text format.
COMPRESSABLE= 1;
// Uncompressable binary format.
UNCOMPRESSABLE = 2;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 3;
}
message Payload {
required PayloadType payload_type = 1;
oneof payload_body {
string payload_compressable = 2;
bytes payload_uncompressable = 3;
}
}
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
optional PayloadType response_type = 1 [default=COMPRESSABLE];
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 response_size = 2;
// Optional input payload sent along with the request.
optional Payload payload = 3;
}
message SimpleResponse {
optional Payload payload = 1;
}
message StreamingInputCallRequest {
// Optional input payload sent along with the request.
optional Payload payload = 1;
import "protos/requests/r/test_requests.proto";
import "protos/responses/test_responses.proto";
// Not expecting any payload from the response.
}
message StreamingInputCallResponse {
// Aggregated size of payloads received from the client.
optional int32 aggregated_payload_size = 1;
}
message ResponseParameters {
// Desired payload sizes in responses from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
required int32 size = 1;
// Desired interval between consecutive responses in the response stream in
// microseconds.
required int32 interval_us = 2;
}
message StreamingOutputCallRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, the payload from each response in the stream
// might be of different types. This is to simulate a mixed type of payload
// stream.
optional PayloadType response_type = 1 [default=COMPRESSABLE];
repeated ResponseParameters response_parameters = 2;
// Optional input payload sent along with the request.
optional Payload payload = 3;
}
message StreamingOutputCallResponse {
optional Payload payload = 1;
}
package grpc_protoc_plugin;
service TestService {
// One request followed by one response.

@ -77,7 +77,8 @@ static void test_set_compression_algorithm(void) {
ch_args =
grpc_channel_args_set_compression_algorithm(NULL, GRPC_COMPRESS_GZIP);
GPR_ASSERT(ch_args->num_args == 1);
GPR_ASSERT(strcmp(ch_args->args[0].key, GRPC_COMPRESSION_ALGORITHM_ARG) == 0);
GPR_ASSERT(strcmp(ch_args->args[0].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM) == 0);
GPR_ASSERT(ch_args->args[0].type == GRPC_ARG_INTEGER);
grpc_channel_args_destroy(ch_args);

@ -37,6 +37,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
#include <grpc/support/thd.h>
#include "src/core/ext/client_config/initial_connect_string.h"
#include "src/core/lib/iomgr/sockaddr.h"
@ -56,7 +57,7 @@ struct rpc_state {
gpr_slice_buffer incoming_buffer;
gpr_slice_buffer temp_incoming_buffer;
grpc_endpoint *tcp;
int done;
gpr_atm done_atm;
};
static const char *magic_connect_string = "magic initial string";
@ -69,7 +70,7 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
gpr_slice_buffer_move_into(&state.temp_incoming_buffer,
&state.incoming_buffer);
if (state.incoming_buffer.length > strlen(magic_connect_string)) {
state.done = 1;
gpr_atm_rel_store(&state.done_atm, 1);
grpc_endpoint_shutdown(exec_ctx, state.tcp);
grpc_endpoint_destroy(exec_ctx, state.tcp);
} else {
@ -116,7 +117,7 @@ static gpr_timespec n_sec_deadline(int seconds) {
}
static void start_rpc(int use_creds, int target_port) {
state.done = 0;
gpr_atm_rel_store(&state.done_atm, 0);
state.cq = grpc_completion_queue_create(NULL);
if (use_creds) {
state.creds = grpc_fake_transport_security_credentials_create();
@ -139,7 +140,7 @@ static void start_rpc(int use_creds, int target_port) {
state.op.reserved = NULL;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(state.call, &state.op,
(size_t)(1), NULL, NULL));
grpc_completion_queue_next(state.cq, n_sec_deadline(1), NULL);
grpc_completion_queue_next(state.cq, n_sec_deadline(5), NULL);
}
static void cleanup_rpc(void) {
@ -157,12 +158,29 @@ static void cleanup_rpc(void) {
gpr_free(state.target);
}
static void poll_server_until_read_done(test_tcp_server *server) {
gpr_timespec deadline = n_sec_deadline(5);
while (state.done == 0 &&
typedef struct {
test_tcp_server *server;
gpr_event *signal_when_done;
} poll_args;
static void actually_poll_server(void *arg) {
poll_args *pa = arg;
gpr_timespec deadline = n_sec_deadline(10);
while (gpr_atm_acq_load(&state.done_atm) == 0 &&
gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0) {
test_tcp_server_poll(server, 1);
test_tcp_server_poll(pa->server, 1);
}
gpr_event_set(pa->signal_when_done, (void *)1);
gpr_free(pa);
}
static void poll_server_until_read_done(test_tcp_server *server,
gpr_event *signal_when_done) {
gpr_thd_id id;
poll_args *pa = gpr_malloc(sizeof(*pa));
pa->server = server;
pa->signal_when_done = signal_when_done;
gpr_thd_new(&id, actually_poll_server, pa, NULL);
}
static void match_initial_magic_string(gpr_slice_buffer *buffer) {
@ -180,20 +198,26 @@ static void match_initial_magic_string(gpr_slice_buffer *buffer) {
}
static void test_initial_string(test_tcp_server *server, int secure) {
gpr_event ev;
gpr_event_init(&ev);
grpc_test_set_initial_connect_string_function(set_magic_initial_string);
poll_server_until_read_done(server, &ev);
start_rpc(secure, server_port);
poll_server_until_read_done(server);
gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
match_initial_magic_string(&state.incoming_buffer);
cleanup_rpc();
}
static void test_initial_string_with_redirect(test_tcp_server *server,
int secure) {
gpr_event ev;
gpr_event_init(&ev);
int another_port = grpc_pick_unused_port_or_die();
grpc_test_set_initial_connect_string_function(
reset_addr_and_set_magic_string);
poll_server_until_read_done(server, &ev);
start_rpc(secure, another_port);
poll_server_until_read_done(server);
gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
match_initial_magic_string(&state.incoming_buffer);
cleanup_rpc();
}

@ -32,20 +32,22 @@
*/
#include "src/core/lib/iomgr/udp_server.h"
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "test/core/util/test_config.h"
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#ifdef GRPC_NEED_UDP
#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)

@ -73,8 +73,10 @@ static void test_add_closure(void) {
gpr_mu_lock(g_mu);
GPR_ASSERT(!done);
grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(deadline.clock_type),
deadline);
while (!done) {
grpc_pollset_work(&exec_ctx, g_pollset, &worker,
gpr_now(deadline.clock_type), deadline);
}
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
@ -97,9 +99,10 @@ static void test_flush(void) {
grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset);
gpr_mu_lock(g_mu);
GPR_ASSERT(!done);
grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(deadline.clock_type),
deadline);
while (!done) {
grpc_pollset_work(&exec_ctx, g_pollset, &worker,
gpr_now(deadline.clock_type), deadline);
}
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);

@ -413,7 +413,7 @@ bool InteropClient::DoRequestStreaming() {
}
bool InteropClient::DoResponseStreaming() {
gpr_log(GPR_DEBUG, "Receiving response steaming rpc ...");
gpr_log(GPR_DEBUG, "Receiving response streaming rpc ...");
ClientContext context;
StreamingOutputCallRequest request;
@ -465,7 +465,7 @@ bool InteropClient::DoResponseCompressedStreaming() {
CompressionType_Name(compression_types[j]).c_str(),
PayloadType_Name(payload_types[i]).c_str());
gpr_log(GPR_DEBUG, "Receiving response steaming rpc %s.", log_suffix);
gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix);
request.set_response_type(payload_types[i]);
request.set_response_compression(compression_types[j]);
@ -544,7 +544,7 @@ bool InteropClient::DoResponseCompressedStreaming() {
}
bool InteropClient::DoResponseStreamingWithSlowConsumer() {
gpr_log(GPR_DEBUG, "Receiving response steaming rpc with slow consumer ...");
gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ...");
ClientContext context;
StreamingOutputCallRequest request;
@ -677,7 +677,7 @@ bool InteropClient::DoPingPong() {
}
bool InteropClient::DoCancelAfterBegin() {
gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
gpr_log(GPR_DEBUG, "Sending request streaming rpc ...");
ClientContext context;
StreamingInputCallRequest request;

@ -50,6 +50,7 @@ def mako_plugin(dictionary):
'name': new_target['name'],
'args': [fn],
'exclude_configs': [],
'uses_polling': False,
'platforms': ['linux'],
'ci_platforms': ['linux'],
'flaky': False,

@ -808,6 +808,7 @@ src/core/lib/iomgr/closure.h \
src/core/lib/iomgr/endpoint.h \
src/core/lib/iomgr/endpoint_pair.h \
src/core/lib/iomgr/ev_poll_and_epoll_posix.h \
src/core/lib/iomgr/ev_poll_posix.h \
src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/exec_ctx.h \
src/core/lib/iomgr/executor.h \
@ -946,6 +947,7 @@ src/core/lib/iomgr/endpoint.c \
src/core/lib/iomgr/endpoint_pair_posix.c \
src/core/lib/iomgr/endpoint_pair_windows.c \
src/core/lib/iomgr/ev_poll_and_epoll_posix.c \
src/core/lib/iomgr/ev_poll_posix.c \
src/core/lib/iomgr/ev_posix.c \
src/core/lib/iomgr/exec_ctx.c \
src/core/lib/iomgr/executor.c \

@ -56,6 +56,9 @@
},
{
"config": "ubsan",
"environ": {
"UBSAN_OPTIONS": "print_stacktrace=1"
},
"timeout_multiplier": 1.5
},
{

@ -35,6 +35,7 @@ set -e
export CONFIG=$config
export ASAN_SYMBOLIZER_PATH=/usr/bin/llvm-symbolizer
export PATH=$PATH:/usr/bin/llvm-symbolizer
# Ensure that programs depending on current-user-ownership of cache directories
# are satisfied (it's being mounted from outside the image).

@ -344,6 +344,7 @@ class Jobset(object):
self._add_env = add_env
self.resultset = {}
self._remaining = None
self._start_time = time.time()
def set_remaining(self, remaining):
self._remaining = remaining
@ -413,6 +414,11 @@ class Jobset(object):
if dead: return
if (not self._travis):
rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
if self._remaining is not None and self._completed > 0:
now = time.time()
sofar = now - self._start_time
remaining = sofar / self._completed * (self._remaining + len(self._running))
rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
message('WAITING', '%s%d jobs running, %d complete, %d failed' % (
rstr, len(self._running), self._completed, self._failures))
if platform_string() == 'windows':
@ -457,7 +463,7 @@ def tag_remaining(xs):
staging = []
for x in xs:
staging.append(x)
if len(staging) > 1000:
if len(staging) > 5000:
yield (staging.pop(0), None)
n = len(staging)
for i, x in enumerate(staging):

@ -153,52 +153,64 @@ class CLanguage(object):
def test_specs(self):
out = []
binaries = get_c_tests(self.args.travis, self.test_lang)
POLLING_STRATEGIES = {
'windows': ['all'],
'mac': ['all'],
'posix': ['all'],
'linux': ['poll', 'legacy']
}
for target in binaries:
if self.config.build_config in target['exclude_configs']:
continue
if self.platform == 'windows':
binary = 'vsprojects/%s%s/%s.exe' % (
'x64/' if self.args.arch == 'x64' else '',
_MSBUILD_CONFIG[self.config.build_config],
target['name'])
else:
binary = 'bins/%s/%s' % (self.config.build_config, target['name'])
if os.path.isfile(binary):
if 'gtest' in target and target['gtest']:
# here we parse the output of --gtest_list_tests to build up a
# complete list of the tests contained in a binary
# for each test, we then add a job to run, filtering for just that
# test
with open(os.devnull, 'w') as fnull:
tests = subprocess.check_output([binary, '--gtest_list_tests'],
stderr=fnull)
base = None
for line in tests.split('\n'):
i = line.find('#')
if i >= 0: line = line[:i]
if not line: continue
if line[0] != ' ':
base = line.strip()
else:
assert base is not None
assert line[1] == ' '
test = base + line.strip()
cmdline = [binary] + ['--gtest_filter=%s' % test]
out.append(self.config.job_spec(cmdline, [binary],
shortname='%s:%s' % (binary, test),
cpu_cost=target['cpu_cost'],
environ={'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH':
_ROOT + '/src/core/lib/tsi/test_creds/ca.pem'}))
polling_strategies = (POLLING_STRATEGIES[self.platform]
if target.get('uses_polling', True)
else ['all'])
for polling_strategy in polling_strategies:
env={'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH':
_ROOT + '/src/core/lib/tsi/test_creds/ca.pem',
'GRPC_POLL_STRATEGY': polling_strategy}
shortname_ext = '' if polling_strategy=='all' else ' polling=%s' % polling_strategy
if self.config.build_config in target['exclude_configs']:
continue
if self.platform == 'windows':
binary = 'vsprojects/%s%s/%s.exe' % (
'x64/' if self.args.arch == 'x64' else '',
_MSBUILD_CONFIG[self.config.build_config],
target['name'])
else:
cmdline = [binary] + target['args']
out.append(self.config.job_spec(cmdline, [binary],
shortname=target.get('shortname', ' '.join(cmdline)),
cpu_cost=target['cpu_cost'],
flaky=target.get('flaky', False),
environ={'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH':
_ROOT + '/src/core/lib/tsi/test_creds/ca.pem'}))
elif self.args.regex == '.*' or self.platform == 'windows':
print '\nWARNING: binary not found, skipping', binary
binary = 'bins/%s/%s' % (self.config.build_config, target['name'])
if os.path.isfile(binary):
if 'gtest' in target and target['gtest']:
# here we parse the output of --gtest_list_tests to build up a
# complete list of the tests contained in a binary
# for each test, we then add a job to run, filtering for just that
# test
with open(os.devnull, 'w') as fnull:
tests = subprocess.check_output([binary, '--gtest_list_tests'],
stderr=fnull)
base = None
for line in tests.split('\n'):
i = line.find('#')
if i >= 0: line = line[:i]
if not line: continue
if line[0] != ' ':
base = line.strip()
else:
assert base is not None
assert line[1] == ' '
test = base + line.strip()
cmdline = [binary] + ['--gtest_filter=%s' % test]
out.append(self.config.job_spec(cmdline, [binary],
shortname='%s:%s %s' % (binary, test, shortname_ext),
cpu_cost=target['cpu_cost'],
environ=env))
else:
cmdline = [binary] + target['args']
out.append(self.config.job_spec(cmdline, [binary],
shortname=' '.join(cmdline) + shortname_ext,
cpu_cost=target['cpu_cost'],
flaky=target.get('flaky', False),
environ=env))
elif self.args.regex == '.*' or self.platform == 'windows':
print '\nWARNING: binary not found, skipping', binary
return sorted(out)
def make_targets(self):

@ -5646,6 +5646,7 @@
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_pair.h",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
@ -5746,6 +5747,8 @@
"src/core/lib/iomgr/endpoint_pair_windows.c",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.c",
"src/core/lib/iomgr/ev_poll_and_epoll_posix.h",
"src/core/lib/iomgr/ev_poll_posix.c",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.c",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/exec_ctx.c",

@ -21,6 +21,9 @@
"metricsArgs": {
"metrics_server_address": "localhost:8081",
"total_only": "true"
},
"env": {
"STRESSTEST_CLIENT_OPTS":"-Xmx3g -Xms3g -XX:NewSize=1.5g -XX:MaxNewSize=1.5g"
}
}
},
@ -44,7 +47,10 @@
"serverPort": 8080,
"serverArgs": {
"port": 8080,
"use_tls": "false"
"use_tls": "false"
},
"env": {
"TEST_SERVER_OPTS":"-Xmx3g -Xms3g -XX:NewSize=1.5g -XX:MaxNewSize=1.5g"
}
}
},

File diff suppressed because it is too large Load Diff

@ -317,6 +317,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\exec_ctx.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\executor.h" />
@ -476,6 +477,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\exec_ctx.c">

@ -58,6 +58,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -653,6 +656,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -305,6 +305,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\endpoint_pair.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\exec_ctx.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\executor.h" />
@ -451,6 +452,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\exec_ctx.c">

@ -61,6 +61,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -575,6 +578,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_and_epoll_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_poll_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\ev_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

Loading…
Cancel
Save