Merge github.com:grpc/grpc into chttp2_timer

pull/12894/head
Craig Tiller 7 years ago
commit fba6475805
  1. 12
      INSTALL.md
  2. 2
      Makefile
  3. 9
      build.yaml
  4. 7
      doc/load-balancing.md
  5. 4
      grpc.gemspec
  6. 2
      include/grpc++/server_builder.h
  7. 23
      include/grpc/impl/codegen/port_platform.h
  8. 8
      setup.py
  9. 12
      src/c-ares/gen_build_yaml.py
  10. 53
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  11. 627
      src/core/ext/transport/inproc/inproc_transport.cc
  12. 18
      src/core/lib/debug/stats_data.cc
  13. 27
      src/core/lib/debug/stats_data.h
  14. 21
      src/core/lib/debug/stats_data.yaml
  15. 9
      src/core/lib/debug/stats_data_bq_schema.sql
  16. 5
      src/core/lib/iomgr/call_combiner.cc
  17. 1
      src/core/lib/iomgr/combiner.cc
  18. 31
      src/core/lib/iomgr/ev_posix.cc
  19. 10
      src/core/lib/iomgr/port.h
  20. 8
      src/core/lib/profiling/basic_timers.cc
  21. 2
      src/core/lib/support/time_posix.cc
  22. 15
      src/core/lib/surface/completion_queue.cc
  23. 16
      src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
  24. 16
      src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
  25. 14
      src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
  26. 15
      src/csharp/Grpc.Core/AsyncUnaryCall.cs
  27. 41
      src/objective-c/tests/GRPCClientTests.m
  28. 83
      src/objective-c/tests/Tests.xcodeproj/project.pbxproj
  29. 27
      src/objective-c/tests/version.h
  30. 8
      src/php/lib/Grpc/BaseStub.php
  31. 4
      src/ruby/ext/grpc/extconf.rb
  32. 29
      templates/src/objective-c/tests/version.h.template
  33. 19
      test/core/end2end/gen_build_yaml.py
  34. 20
      test/core/end2end/generate_tests.bzl
  35. 1
      test/core/end2end/tests/streaming_error_response.c
  36. 182
      test/cpp/end2end/async_end2end_test.cc
  37. 502
      third_party/cares/config_openbsd/ares_config.h
  38. 2
      tools/internal_ci/linux/grpc_interop_matrix.sh
  39. 22
      tools/interop_matrix/run_interop_matrix_tests.py
  40. 4
      tools/run_tests/generated/sources_and_headers.json
  41. 92
      tools/run_tests/generated/tests.json
  42. 7
      tools/run_tests/performance/massage_qps_stats.py
  43. 70
      tools/run_tests/performance/scenario_result_schema.json
  44. 2
      tools/run_tests/run_interop_tests.py
  45. 2
      tools/run_tests/run_tests.py

@ -94,6 +94,7 @@ on experience with the tools involved.
### Building using CMake (RECOMMENDED)
Builds gRPC C and C++ with boringssl.
- Install Visual Studio 2015 or 2017 (Visual C++ compiler will be used).
- Install [CMake](https://cmake.org/download/).
- Install [Active State Perl](https://www.activestate.com/activeperl/) (`choco install activeperl`)
- Install [Ninja](https://ninja-build.org/) (`choco install ninja`)
@ -101,7 +102,9 @@ Builds gRPC C and C++ with boringssl.
- Install [yasm](http://yasm.tortall.net/) and add it to `PATH` (`choco install yasm`)
- Run these commands in the repo root directory
Using Ninja (faster build, supports boringssl's assembly optimizations)
#### cmake: Using Ninja (faster build, supports boringssl's assembly optimizations).
Please note that when using Ninja, you'll still need Visual C++ (part of Visual Studio)
installed to be able to compile the C/C++ sources.
```
> md .build
> cd .build
@ -110,7 +113,12 @@ Using Ninja (faster build, supports boringssl's assembly optimizations)
> cmake --build .
```
Using Visual Studio 2015 (can only build with OPENSSL_NO_ASM)
#### cmake: Using Visual Studio 2015 (can only build with OPENSSL_NO_ASM).
When using the "Visual Studio" generator,
cmake will generate a solution (`grpc.sln`) that contains a VS project for
every target defined in `CMakeLists.txt` (+ few extra convenience projects
added automatically by cmake). After opening the solution with Visual Studio
you will be able to browse and build the code as usual.
```
> md .build
> cd .build

@ -8424,7 +8424,7 @@ PUBLIC_HEADERS_C += \
LIBARES_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBARES_SRC))))
$(LIBARES_OBJS): CPPFLAGS += -Ithird_party/cares -Ithird_party/cares/cares $(if $(subst Linux,,$(SYSTEM)),,-Ithird_party/cares/config_linux) $(if $(subst Darwin,,$(SYSTEM)),,-Ithird_party/cares/config_darwin) -fvisibility=hidden -D_GNU_SOURCE -DWIN32_LEAN_AND_MEAN -D_HAS_EXCEPTIONS=0 -DNOMINMAX $(if $(subst MINGW32,,$(SYSTEM)),-DHAVE_CONFIG_H,)
$(LIBARES_OBJS): CPPFLAGS += -Ithird_party/cares -Ithird_party/cares/cares -fvisibility=hidden -D_GNU_SOURCE $(if $(subst Darwin,,$(SYSTEM)),,-Ithird_party/cares/config_darwin) $(if $(subst FreeBSD,,$(SYSTEM)),,-Ithird_party/cares/config_freebsd) $(if $(subst Linux,,$(SYSTEM)),,-Ithird_party/cares/config_linux) $(if $(subst OpenBSD,,$(SYSTEM)),,-Ithird_party/cares/config_openbsd) -DWIN32_LEAN_AND_MEAN -D_HAS_EXCEPTIONS=0 -DNOMINMAX $(if $(subst MINGW32,,$(SYSTEM)),-DHAVE_CONFIG_H,)
$(LIBARES_OBJS): CFLAGS += -Wno-sign-conversion $(if $(subst Darwin,,$(SYSTEM)),,-Wno-shorten-64-to-32) $(if $(subst MINGW32,,$(SYSTEM)),-Wno-invalid-source-encoding,)
$(LIBDIR)/$(CONFIG)/libares.a: $(ZLIB_DEP) $(LIBARES_OBJS)

@ -4873,10 +4873,11 @@ defaults:
ares:
CFLAGS: -Wno-sign-conversion $(if $(subst Darwin,,$(SYSTEM)),,-Wno-shorten-64-to-32)
$(if $(subst MINGW32,,$(SYSTEM)),-Wno-invalid-source-encoding,)
CPPFLAGS: -Ithird_party/cares -Ithird_party/cares/cares $(if $(subst Linux,,$(SYSTEM)),,-Ithird_party/cares/config_linux)
$(if $(subst Darwin,,$(SYSTEM)),,-Ithird_party/cares/config_darwin) -fvisibility=hidden
-D_GNU_SOURCE -DWIN32_LEAN_AND_MEAN -D_HAS_EXCEPTIONS=0 -DNOMINMAX $(if $(subst
MINGW32,,$(SYSTEM)),-DHAVE_CONFIG_H,)
CPPFLAGS: -Ithird_party/cares -Ithird_party/cares/cares -fvisibility=hidden -D_GNU_SOURCE
$(if $(subst Darwin,,$(SYSTEM)),,-Ithird_party/cares/config_darwin) $(if $(subst
FreeBSD,,$(SYSTEM)),,-Ithird_party/cares/config_freebsd) $(if $(subst Linux,,$(SYSTEM)),,-Ithird_party/cares/config_linux)
$(if $(subst OpenBSD,,$(SYSTEM)),,-Ithird_party/cares/config_openbsd) -DWIN32_LEAN_AND_MEAN
-D_HAS_EXCEPTIONS=0 -DNOMINMAX $(if $(subst MINGW32,,$(SYSTEM)),-DHAVE_CONFIG_H,)
benchmark:
CPPFLAGS: -Ithird_party/benchmark/include -DHAVE_POSIX_REGEX
boringssl:

@ -129,10 +129,9 @@ works:
by the resolver. It asks the balancer for the server addresses to
use for the server name originally requested by the client (i.e.,
the same one originally passed to the name resolver).
- Note: The `grpclb` policy currently ignores any non-balancer
addresses returned by the resolver. However, in the future, it
may be changed to use these addresses as a fallback in case no
balancers can be contacted.
- Note: In the `grpclb` policy, the non-balancer addresses returned
by the resolver are used as a fallback in case no balancers can be
contacted when the LB policy is started.
2. The gRPC servers to which the load balancer is directing the client
may report load to the load balancers, if that information is needed
by the load balancer's configuration.

@ -1124,8 +1124,10 @@ Gem::Specification.new do |s|
s.files += %w( third_party/cares/cares/config-win32.h )
s.files += %w( third_party/cares/cares/setup_once.h )
s.files += %w( third_party/cares/ares_build.h )
s.files += %w( third_party/cares/config_linux/ares_config.h )
s.files += %w( third_party/cares/config_darwin/ares_config.h )
s.files += %w( third_party/cares/config_freebsd/ares_config.h )
s.files += %w( third_party/cares/config_linux/ares_config.h )
s.files += %w( third_party/cares/config_openbsd/ares_config.h )
s.files += %w( third_party/cares/cares/ares__close_sockets.c )
s.files += %w( third_party/cares/cares/ares__get_hostent.c )
s.files += %w( third_party/cares/cares/ares__read_line.c )

@ -140,7 +140,7 @@ class ServerBuilder {
/// please use IPv6 any, i.e., [::]:<port>, which also accepts IPv4
/// connections. Valid values include dns:///localhost:1234, /
/// 192.168.1.1:31416, dns:///[::1]:27182, etc.).
/// \params creds The credentials associated with the server.
/// \param creds The credentials associated with the server.
/// \param selected_port[out] If not `nullptr`, gets populated with the port
/// number bound to the \a grpc::Server for the corresponding endpoint after
/// it is successfully bound, 0 otherwise.

@ -241,6 +241,29 @@
#else /* _LP64 */
#define GPR_ARCH_32 1
#endif /* _LP64 */
#elif defined(__OpenBSD__)
#define GPR_PLATFORM_STRING "openbsd"
#ifndef _BSD_SOURCE
#define _BSD_SOURCE
#endif
#define GPR_OPENBSD 1
#define GPR_CPU_POSIX 1
#define GPR_GCC_ATOMIC 1
#define GPR_GCC_TLS 1
#define GPR_POSIX_LOG 1
#define GPR_POSIX_ENV 1
#define GPR_POSIX_TMPFILE 1
#define GPR_POSIX_STRING 1
#define GPR_POSIX_SUBPROCESS 1
#define GPR_POSIX_SYNC 1
#define GPR_POSIX_TIME 1
#define GPR_GETPID_IN_UNISTD_H 1
#define GPR_SUPPORT_CHANNELS_FROM_FD 1
#ifdef _LP64
#define GPR_ARCH_64 1
#else /* _LP64 */
#define GPR_ARCH_32 1
#endif /* _LP64 */
#elif defined(__native_client__)
#define GPR_PLATFORM_STRING "nacl"
#ifndef _BSD_SOURCE

@ -40,10 +40,14 @@ ZLIB_INCLUDE = (os.path.join('third_party', 'zlib'),)
CARES_INCLUDE = (
os.path.join('third_party', 'cares'),
os.path.join('third_party', 'cares', 'cares'),)
if 'linux' in sys.platform:
CARES_INCLUDE += (os.path.join('third_party', 'cares', 'config_linux'),)
if 'darwin' in sys.platform:
CARES_INCLUDE += (os.path.join('third_party', 'cares', 'config_darwin'),)
if 'freebsd' in sys.platform:
CARES_INCLUDE += (os.path.join('third_party', 'cares', 'config_freebsd'),)
if 'linux' in sys.platform:
CARES_INCLUDE += (os.path.join('third_party', 'cares', 'config_linux'),)
if 'openbsd' in sys.platform:
CARES_INCLUDE += (os.path.join('third_party', 'cares', 'config_openbsd'),)
README = os.path.join(PYTHON_STEM, 'README.rst')
# Ensure we're in the proper directory whether or not we're being used by pip.

@ -29,10 +29,14 @@ try:
subprocess.call("third_party/cares/cares/configure", shell=True)
def config_platform(x):
if 'linux' in sys.platform:
return 'src/cares/cares/config_linux/ares_config.h'
if 'darwin' in sys.platform:
return 'src/cares/cares/config_darwin/ares_config.h'
if 'freebsd' in sys.platform:
return 'src/cares/cares/config_freebsd/ares_config.h'
if 'linux' in sys.platform:
return 'src/cares/cares/config_linux/ares_config.h'
if 'openbsd' in sys.platform:
return 'src/cares/cares/config_openbsd/ares_config.h'
if not os.path.isfile('third_party/cares/cares/ares_config.h'):
gen_ares_build(x)
return 'third_party/cares/cares/ares_config.h'
@ -124,8 +128,10 @@ try:
"third_party/cares/cares/config-win32.h",
"third_party/cares/cares/setup_once.h",
"third_party/cares/ares_build.h",
"third_party/cares/config_darwin/ares_config.h",
"third_party/cares/config_freebsd/ares_config.h",
"third_party/cares/config_linux/ares_config.h",
"third_party/cares/config_darwin/ares_config.h"
"third_party/cares/config_openbsd/ares_config.h"
],
}]
except:

@ -345,9 +345,6 @@ typedef struct glb_lb_policy {
/** are we currently updating lb_call? */
bool updating_lb_call;
/** are we currently updating lb_channel? */
bool updating_lb_channel;
/** are we already watching the LB channel's connectivity? */
bool watching_lb_channel;
@ -360,9 +357,6 @@ typedef struct glb_lb_policy {
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
/** args from the latest update received while already updating, or NULL */
grpc_lb_policy_args *pending_update_args;
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
@ -982,10 +976,6 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
grpc_subchannel_index_unref();
if (glb_policy->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
gpr_free(glb_policy->pending_update_args);
}
gpr_free(glb_policy);
}
@ -1752,45 +1742,22 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
const grpc_lb_addresses *addresses =
(const grpc_lb_addresses *)arg->value.pointer.p;
if (glb_policy->serverlist == NULL) {
// If a non-empty serverlist hasn't been received from the balancer,
// propagate the update to fallback_backend_addresses.
if (glb_policy->serverlist == NULL) {
fallback_update_locked(exec_ctx, glb_policy, addresses);
} else if (glb_policy->updating_lb_channel) {
// If we have recieved serverlist from the balancer, we need to defer update
// when there is an in-progress one.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"Update already in progress for grpclb %p. Deferring update.",
(void *)glb_policy);
}
if (glb_policy->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx,
glb_policy->pending_update_args->args);
gpr_free(glb_policy->pending_update_args);
}
glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc(
sizeof(*glb_policy->pending_update_args));
glb_policy->pending_update_args->client_channel_factory =
args->client_channel_factory;
glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
glb_policy->pending_update_args->combiner = args->combiner;
return;
}
glb_policy->updating_lb_channel = true;
GPR_ASSERT(glb_policy->lb_channel != NULL);
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.
grpc_channel_args *lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
/* Propagate updates to the LB channel (pick first) through the fake resolver
*/
grpc_fake_resolver_response_generator_set_response(
exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
// Start watching the LB channel connectivity for connection, if not
// already doing so.
if (!glb_policy->watching_lb_channel) {
// Watch the LB channel connectivity for connection.
glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
glb_policy->lb_channel, true /* try to connect */);
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
@ -1842,18 +1809,10 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
/* fallthrough */
case GRPC_CHANNEL_READY:
if (glb_policy->lb_call != NULL) {
glb_policy->updating_lb_channel = false;
glb_policy->updating_lb_call = true;
grpc_call_cancel(glb_policy->lb_call, NULL);
// lb_on_server_status_received will pick up the cancel and reinit
// lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
if (glb_policy->pending_update_args != NULL) {
grpc_lb_policy_args *args = glb_policy->pending_update_args;
glb_policy->pending_update_args = NULL;
glb_update_locked(exec_ctx, &glb_policy->base, args);
grpc_channel_args_destroy(exec_ctx, args->args);
gpr_free(args);
}
} else if (glb_policy->started_picking && !glb_policy->shutting_down) {
if (glb_policy->retry_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);

@ -62,96 +62,22 @@ typedef struct inproc_transport {
struct inproc_stream *stream_list;
} inproc_transport;
typedef struct sb_list_entry {
grpc_slice_buffer sb;
struct sb_list_entry *next;
} sb_list_entry;
// Specialize grpc_byte_stream for our use case
typedef struct {
grpc_byte_stream base;
sb_list_entry *le;
grpc_error *shutdown_error;
} inproc_slice_byte_stream;
typedef struct {
// TODO (vjpai): Add some inlined elements to avoid alloc in simple cases
sb_list_entry *head;
sb_list_entry *tail;
} slice_buffer_list;
static void slice_buffer_list_init(slice_buffer_list *l) {
l->head = NULL;
l->tail = NULL;
}
static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) {
grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb);
gpr_free(le);
}
static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx,
slice_buffer_list *l) {
sb_list_entry *curr = l->head;
while (curr != NULL) {
sb_list_entry *le = curr;
curr = curr->next;
sb_list_entry_destroy(exec_ctx, le);
}
l->head = NULL;
l->tail = NULL;
}
static bool slice_buffer_list_empty(slice_buffer_list *l) {
return l->head == NULL;
}
static void slice_buffer_list_append_entry(slice_buffer_list *l,
sb_list_entry *next) {
next->next = NULL;
if (l->tail) {
l->tail->next = next;
l->tail = next;
} else {
l->head = next;
l->tail = next;
}
}
static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) {
sb_list_entry *next = (sb_list_entry *)gpr_malloc(sizeof(*next));
grpc_slice_buffer_init(&next->sb);
slice_buffer_list_append_entry(l, next);
return &next->sb;
}
static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) {
sb_list_entry *ret = l->head;
l->head = l->head->next;
if (l->head == NULL) {
l->tail = NULL;
}
return ret;
}
typedef struct inproc_stream {
inproc_transport *t;
grpc_metadata_batch to_read_initial_md;
uint32_t to_read_initial_md_flags;
bool to_read_initial_md_filled;
slice_buffer_list to_read_message;
grpc_metadata_batch to_read_trailing_md;
bool to_read_trailing_md_filled;
bool reads_needed;
bool read_closure_scheduled;
grpc_closure read_closure;
bool ops_needed;
bool op_closure_scheduled;
grpc_closure op_closure;
// Write buffer used only during gap at init time when client-side
// stream is set up but server side stream is not yet set up
grpc_metadata_batch write_buffer_initial_md;
bool write_buffer_initial_md_filled;
uint32_t write_buffer_initial_md_flags;
grpc_millis write_buffer_deadline;
slice_buffer_list write_buffer_message;
grpc_metadata_batch write_buffer_trailing_md;
bool write_buffer_trailing_md_filled;
grpc_error *write_buffer_cancel_error;
@ -164,11 +90,15 @@ typedef struct inproc_stream {
gpr_arena *arena;
grpc_transport_stream_op_batch *send_message_op;
grpc_transport_stream_op_batch *send_trailing_md_op;
grpc_transport_stream_op_batch *recv_initial_md_op;
grpc_transport_stream_op_batch *recv_message_op;
grpc_transport_stream_op_batch *recv_trailing_md_op;
inproc_slice_byte_stream recv_message_stream;
grpc_slice_buffer recv_message;
grpc_slice_buffer_stream recv_stream;
bool recv_inited;
bool initial_md_sent;
bool trailing_md_sent;
@ -187,54 +117,11 @@ typedef struct inproc_stream {
struct inproc_stream *stream_list_next;
} inproc_stream;
static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs, size_t max,
grpc_closure *on_complete) {
// Because inproc transport always provides the entire message atomically,
// the byte stream always has data available when this function is called.
// Thus, this function always returns true (unlike other transports) and
// there is never any need to schedule a closure
return true;
}
static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs,
grpc_slice *slice) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
if (stream->shutdown_error != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(stream->shutdown_error);
}
*slice = grpc_slice_buffer_take_first(&stream->le->sb);
return GRPC_ERROR_NONE;
}
static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs,
grpc_error *error) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = error;
}
static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
sb_list_entry_destroy(exec_ctx, stream->le);
GRPC_ERROR_UNREF(stream->shutdown_error);
}
static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull,
inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy};
void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s,
sb_list_entry *le) {
s->base.length = (uint32_t)le->sb.length;
s->base.flags = 0;
s->base.vtable = &inproc_slice_byte_stream_vtable;
s->le = le;
s->shutdown_error = GRPC_ERROR_NONE;
}
static grpc_closure do_nothing_closure;
static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_error *error);
static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void ref_transport(inproc_transport *t) {
INPROC_LOG(GPR_DEBUG, "ref_transport %p", t);
@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s,
static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s);
slice_buffer_list_destroy(exec_ctx, &s->to_read_message);
slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message);
GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
GRPC_ERROR_UNREF(s->cancel_self_error);
GRPC_ERROR_UNREF(s->cancel_other_error);
if (s->recv_inited) {
grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message);
}
unref_transport(exec_ctx, s->t);
if (s->closure_at_destroy) {
@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
}
}
static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client,
bool is_initial) {
for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL;
@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->write_buffer_initial_md_filled = false;
grpc_metadata_batch_init(&s->write_buffer_trailing_md);
s->write_buffer_trailing_md_filled = false;
slice_buffer_list_init(&s->to_read_message);
slice_buffer_list_init(&s->write_buffer_message);
s->reads_needed = false;
s->read_closure_scheduled = false;
GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s,
s->ops_needed = false;
s->op_closure_scheduled = false;
GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s,
grpc_schedule_on_exec_ctx);
s->t = t;
s->closure_at_destroy = NULL;
@ -425,11 +309,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md);
cs->write_buffer_initial_md_filled = false;
}
while (!slice_buffer_list_empty(&cs->write_buffer_message)) {
slice_buffer_list_append_entry(
&s->to_read_message,
slice_buffer_list_pophead(&cs->write_buffer_message));
}
if (cs->write_buffer_trailing_md_filled) {
fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0,
&s->to_read_trailing_md, NULL,
@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
}
}
// Call the on_complete closure associated with this stream_op_batch if
// this stream_op_batch is only one of the pending operations for this
// stream. This is called when one of the pending operations for the stream
// is done and about to be NULLed out
static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx,
inproc_stream *s, grpc_error *error,
grpc_transport_stream_op_batch *op,
const char *msg) {
int is_sm = (int)(op == s->send_message_op);
int is_stm = (int)(op == s->send_trailing_md_op);
int is_rim = (int)(op == s->recv_initial_md_op);
int is_rm = (int)(op == s->recv_message_op);
int is_rtm = (int)(op == s->recv_trailing_md_op);
if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error);
GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error));
}
}
static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx,
inproc_stream *s,
grpc_error *error) {
if (s && s->ops_needed && !s->op_closure_scheduled) {
GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error));
s->op_closure_scheduled = true;
s->ops_needed = false;
}
}
static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_error *error) {
INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s);
INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s);
// If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata
if (!s->trailing_md_sent) {
@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(error);
}
if (other->reads_needed) {
if (!other->read_closure_scheduled) {
GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
GRPC_ERROR_REF(error));
other->read_closure_scheduled = true;
}
other->reads_needed = false;
}
maybe_schedule_op_closure_locked(exec_ctx, other, error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
}
@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
err);
// Last use of err so no need to REF and then UNREF it
if ((s->recv_initial_md_op != s->recv_message_op) &&
(s->recv_initial_md_op != s->recv_trailing_md_op)) {
INPROC_LOG(GPR_DEBUG,
"fail_helper %p scheduling initial-metadata-on-complete %p",
error, s);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
GRPC_ERROR_REF(error));
}
complete_if_batch_end_locked(
exec_ctx, s, error, s->recv_initial_md_op,
"fail_helper scheduling recv-initial-metadata-on-complete");
s->recv_initial_md_op = NULL;
}
if (s->recv_message_op) {
@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_CLOSURE_SCHED(
exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
if (s->recv_message_op != s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p",
s, error);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
GRPC_ERROR_REF(error));
}
complete_if_batch_end_locked(
exec_ctx, s, error, s->recv_message_op,
"fail_helper scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
if (s->send_message_op) {
complete_if_batch_end_locked(
exec_ctx, s, error, s->send_message_op,
"fail_helper scheduling send-message-on-complete");
s->send_message_op = NULL;
}
if (s->send_trailing_md_op) {
complete_if_batch_end_locked(
exec_ctx, s, error, s->send_trailing_md_op,
"fail_helper scheduling send-trailng-md-on-complete");
s->send_trailing_md_op = NULL;
}
if (s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"fail_helper %p scheduling trailing-md-on-complete %p", s,
error);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(error));
complete_if_batch_end_locked(
exec_ctx, s, error, s->recv_trailing_md_op,
"fail_helper scheduling recv-trailing-metadata-on-complete");
s->recv_trailing_md_op = NULL;
}
close_other_side_locked(exec_ctx, s, "fail_helper:other_side");
@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_ERROR_UNREF(error);
}
static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
inproc_stream *sender,
inproc_stream *receiver) {
size_t remaining =
sender->send_message_op->payload->send_message.send_message->length;
if (receiver->recv_inited) {
grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message);
}
grpc_slice_buffer_init(&receiver->recv_message);
receiver->recv_inited = true;
do {
grpc_slice message_slice;
grpc_closure unused;
GPR_ASSERT(grpc_byte_stream_next(
exec_ctx, sender->send_message_op->payload->send_message.send_message,
SIZE_MAX, &unused));
grpc_error *error = grpc_byte_stream_pull(
exec_ctx, sender->send_message_op->payload->send_message.send_message,
&message_slice);
if (error != GRPC_ERROR_NONE) {
cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error));
break;
}
GPR_ASSERT(error == GRPC_ERROR_NONE);
remaining -= GRPC_SLICE_LENGTH(message_slice);
grpc_slice_buffer_add(&receiver->recv_message, message_slice);
} while (remaining > 0);
grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message,
0);
*receiver->recv_message_op->payload->recv_message.recv_message =
&receiver->recv_stream.base;
INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready",
receiver);
GRPC_CLOSURE_SCHED(
exec_ctx,
receiver->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op,
"message_transfer scheduling sender on_complete");
complete_if_batch_end_locked(
exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
"message_transfer scheduling receiver on_complete");
receiver->recv_message_op = NULL;
sender->send_message_op = NULL;
}
static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
// This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted
// Schedule our appropriate closures
// and then return to reads_needed state if still needed
// and then return to ops_needed state if still needed
// Since this is a closure directly invoked by the combiner, it should not
// unref the error parameter explicitly; the combiner will do that implicitly
@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
bool needs_close = false;
INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg);
INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg);
inproc_stream *s = (inproc_stream *)arg;
gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed
gpr_mu_lock(mu);
s->read_closure_scheduled = false;
s->op_closure_scheduled = false;
// cancellation takes precedence
inproc_stream *other = s->other_side;
if (s->cancel_self_error != GRPC_ERROR_NONE) {
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error));
goto done;
@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
goto done;
}
if (s->recv_initial_md_op) {
if (!s->to_read_initial_md_filled) {
// We entered the state machine on some other kind of read even though
// we still haven't satisfied initial md . That's an error.
new_err =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing");
INPROC_LOG(GPR_DEBUG,
"read_state_machine %p scheduling on_complete errors for no "
"initial md %p",
s, new_err);
if (s->send_message_op && other) {
if (other->recv_message_op) {
message_transfer_locked(exec_ctx, s, other);
maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
} else if (!s->t->is_client &&
(s->trailing_md_sent || other->recv_trailing_md_op)) {
// A server send will never be matched if the client is waiting
// for trailing metadata already
complete_if_batch_end_locked(
exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
s->send_message_op = NULL;
}
}
// Pause a send trailing metadata if there is still an outstanding
// send message unless we know that the send message will never get
// matched to a receive. This happens on the client if the server has
// already sent status.
if (s->send_trailing_md_op &&
(!s->send_message_op ||
(s->t->is_client &&
(s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
: &other->to_read_trailing_md;
bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
if (*destfilled || s->trailing_md_sent) {
// The buffer is already in use; that's an error!
INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
} else if (s->initial_md_recvd) {
} else {
if (other && !other->closed) {
fill_in_metadata(exec_ctx, s,
s->send_trailing_md_op->payload->send_trailing_metadata
.send_trailing_metadata,
0, dest, NULL, destfilled);
}
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
s->recv_trailing_md_op = NULL;
needs_close = true;
}
}
maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
complete_if_batch_end_locked(
exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op,
"op_state_machine scheduling send-trailing-metadata-on-complete");
s->send_trailing_md_op = NULL;
}
if (s->recv_initial_md_op) {
if (s->initial_md_recvd) {
new_err =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
INPROC_LOG(
GPR_DEBUG,
"read_state_machine %p scheduling on_complete errors for already "
"op_state_machine %p scheduling on_complete errors for already "
"recvd initial md %p",
s, new_err);
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
}
if (s->to_read_initial_md_filled) {
s->initial_md_recvd = true;
new_err = fill_in_metadata(
exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata,
s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL);
s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata
->deadline = s->deadline;
s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
NULL);
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata->deadline = s->deadline;
grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
s->to_read_initial_md_filled = false;
INPROC_LOG(GPR_DEBUG,
"read_state_machine %p scheduling initial-metadata-ready %p", s,
"op_state_machine %p scheduling initial-metadata-ready %p", s,
new_err);
GRPC_CLOSURE_SCHED(exec_ctx,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
GRPC_ERROR_REF(new_err));
if ((s->recv_initial_md_op != s->recv_message_op) &&
(s->recv_initial_md_op != s->recv_trailing_md_op)) {
INPROC_LOG(
GPR_DEBUG,
"read_state_machine %p scheduling initial-metadata-on-complete %p", s,
new_err);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
GRPC_ERROR_REF(new_err));
}
complete_if_batch_end_locked(
exec_ctx, s, new_err, s->recv_initial_md_op,
"op_state_machine scheduling recv-initial-metadata-on-complete");
s->recv_initial_md_op = NULL;
if (new_err != GRPC_ERROR_NONE) {
INPROC_LOG(GPR_DEBUG,
"read_state_machine %p scheduling on_complete errors2 %p", s,
"op_state_machine %p scheduling on_complete errors2 %p", s,
new_err);
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
}
}
if (s->to_read_initial_md_filled) {
new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame");
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
}
if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) {
inproc_slice_byte_stream_init(
&s->recv_message_stream,
slice_buffer_list_pophead(&s->to_read_message));
*s->recv_message_op->payload->recv_message.recv_message =
&s->recv_message_stream.base;
INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
if (s->recv_message_op != s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"read_state_machine %p scheduling message-on-complete %p", s,
new_err);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
GRPC_ERROR_REF(new_err));
if (s->recv_message_op) {
if (other && other->send_message_op) {
message_transfer_locked(exec_ctx, other, s);
maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
}
s->recv_message_op = NULL;
}
if (s->recv_trailing_md_op && s->t->is_client && other &&
other->send_message_op) {
maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
if (s->trailing_md_recvd) {
@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
INPROC_LOG(
GPR_DEBUG,
"read_state_machine %p scheduling on_complete errors for already "
"op_state_machine %p scheduling on_complete errors for already "
"recvd trailing md %p",
s, new_err);
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
if (s->recv_message_op != NULL) {
// This message needs to be wrapped up because it will never be
// satisfied
INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready",
s);
INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
exec_ctx,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
if (s->recv_message_op != s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"read_state_machine %p scheduling message-on-complete %p", s,
new_err);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
GRPC_ERROR_REF(new_err));
}
complete_if_batch_end_locked(
exec_ctx, s, new_err, s->recv_message_op,
"op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
complete_if_batch_end_locked(
exec_ctx, s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
s->send_message_op = NULL;
}
if (s->recv_trailing_md_op != NULL) {
// We wanted trailing metadata and we got it
s->trailing_md_recvd = true;
@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// (If the server hasn't already sent its trailing md, it doesn't have
// a final status, so don't mark this op complete)
if (s->t->is_client || s->trailing_md_sent) {
INPROC_LOG(
GPR_DEBUG,
"read_state_machine %p scheduling trailing-md-on-complete %p", s,
new_err);
INPROC_LOG(GPR_DEBUG,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = NULL;
needs_close = true;
} else {
INPROC_LOG(GPR_DEBUG,
"read_state_machine %p server needs to delay handling "
"op_state_machine %p server needs to delay handling "
"trailing-md-on-complete %p",
s, new_err);
}
} else {
INPROC_LOG(
GPR_DEBUG,
"read_state_machine %p has trailing md but not yet waiting for it",
s);
"op_state_machine %p has trailing md but not yet waiting for it", s);
}
}
if (s->trailing_md_recvd && s->recv_message_op) {
// No further message will come on this stream, so finish off the
// recv_message_op
INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
if (s->recv_message_op != s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"read_state_machine %p scheduling message-on-complete %p", s,
new_err);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
GRPC_ERROR_REF(new_err));
}
complete_if_batch_end_locked(
exec_ctx, s, new_err, s->recv_message_op,
"op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
if (s->recv_message_op || s->recv_trailing_md_op) {
if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
complete_if_batch_end_locked(
exec_ctx, s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
s->send_message_op = NULL;
}
if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
s->recv_message_op || s->recv_trailing_md_op) {
// Didn't get the item we wanted so we still need to get
// rescheduled
INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s,
INPROC_LOG(
GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s,
s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
s->recv_message_op, s->recv_trailing_md_op);
s->reads_needed = true;
s->ops_needed = true;
}
done:
if (needs_close) {
close_other_side_locked(exec_ctx, s, "read_state_machine");
close_other_side_locked(exec_ctx, s, "op_state_machine");
close_stream_locked(exec_ctx, s);
}
gpr_mu_unlock(mu);
GRPC_ERROR_UNREF(new_err);
}
static grpc_closure do_nothing_closure;
static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_error *error) {
bool ret = false; // was the cancel accepted
@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (s->cancel_self_error == GRPC_ERROR_NONE) {
ret = true;
s->cancel_self_error = GRPC_ERROR_REF(error);
if (s->reads_needed) {
if (!s->read_closure_scheduled) {
GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure,
GRPC_ERROR_REF(s->cancel_self_error));
s->read_closure_scheduled = true;
}
s->reads_needed = false;
}
maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error);
// Send trailing md to the other side indicating cancellation, even if we
// already have
s->trailing_md_sent = true;
@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
}
if (other->reads_needed) {
if (!other->read_closure_scheduled) {
GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
GRPC_ERROR_REF(other->cancel_other_error));
other->read_closure_scheduled = true;
}
other->reads_needed = false;
}
maybe_schedule_op_closure_locked(exec_ctx, other,
other->cancel_other_error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
}
@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"cancel_stream %p scheduling trailing-md-on-complete %p", s,
s->cancel_self_error);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked(
exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
s->recv_trailing_md_op = NULL;
}
}
@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// already self-canceled so still give it an error
error = GRPC_ERROR_REF(s->cancel_self_error);
} else {
INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s,
INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s,
s->t->is_client ? "client" : "server",
op->send_initial_metadata ? " send_initial_metadata" : "",
op->send_message ? " send_message" : "",
op->send_trailing_metadata ? " send_trailing_metadata" : "",
@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
bool needs_close = false;
if (error == GRPC_ERROR_NONE &&
(op->send_initial_metadata || op->send_message ||
op->send_trailing_metadata)) {
inproc_stream *other = s->other_side;
if (error == GRPC_ERROR_NONE &&
(op->send_initial_metadata || op->send_trailing_metadata)) {
if (s->t->is_closed) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
}
@ -963,72 +940,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->initial_md_sent = true;
}
}
}
if (error == GRPC_ERROR_NONE && op->send_message) {
size_t remaining = op->payload->send_message.send_message->length;
grpc_slice_buffer *dest = slice_buffer_list_append(
(other == NULL) ? &s->write_buffer_message : &other->to_read_message);
do {
grpc_slice message_slice;
grpc_closure unused;
GPR_ASSERT(grpc_byte_stream_next(exec_ctx,
op->payload->send_message.send_message,
SIZE_MAX, &unused));
error = grpc_byte_stream_pull(
exec_ctx, op->payload->send_message.send_message, &message_slice);
if (error != GRPC_ERROR_NONE) {
cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error));
break;
}
GPR_ASSERT(error == GRPC_ERROR_NONE);
remaining -= GRPC_SLICE_LENGTH(message_slice);
grpc_slice_buffer_add(dest, message_slice);
} while (remaining != 0);
grpc_byte_stream_destroy(exec_ctx,
op->payload->send_message.send_message);
}
if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) {
grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
: &other->to_read_trailing_md;
bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
if (*destfilled || s->trailing_md_sent) {
// The buffer is already in use; that's an error!
INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
} else {
if (!other->closed) {
fill_in_metadata(
exec_ctx, s,
op->payload->send_trailing_metadata.send_trailing_metadata, 0,
dest, NULL, destfilled);
}
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd &&
s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"perform_stream_op %p scheduling trailing-md-on-complete",
s);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
s->recv_trailing_md_op = NULL;
needs_close = true;
}
}
}
if (other != NULL && other->reads_needed) {
if (!other->read_closure_scheduled) {
GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error);
other->read_closure_scheduled = true;
}
other->reads_needed = false;
maybe_schedule_op_closure_locked(exec_ctx, other, error);
}
}
if (error == GRPC_ERROR_NONE &&
(op->recv_initial_metadata || op->recv_message ||
(op->send_message || op->send_trailing_metadata ||
op->recv_initial_metadata || op->recv_message ||
op->recv_trailing_metadata)) {
// If there are any reads, mark it so that the read closure will react to
// them
// Mark ops that need to be processed by the closure
if (op->send_message) {
s->send_message_op = op;
}
if (op->send_trailing_metadata) {
s->send_trailing_md_op = op;
}
if (op->recv_initial_metadata) {
s->recv_initial_md_op = op;
}
@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
// We want to initiate the closure if:
// 1. There is initial metadata and something ready to take that
// 2. There is a message and something ready to take it
// 3. There is trailing metadata, even if nothing specifically wants
// that because that can shut down the message as well
if ((s->to_read_initial_md_filled && op->recv_initial_metadata) ||
((!slice_buffer_list_empty(&s->to_read_message) ||
s->trailing_md_recvd) &&
op->recv_message) ||
(s->to_read_trailing_md_filled)) {
if (!s->read_closure_scheduled) {
GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE);
s->read_closure_scheduled = true;
// 1. We want to send a message and the other side wants to receive or end
// 2. We want to send trailing metadata and there isn't an unmatched send
// 3. We want initial metadata and the other side has sent it
// 4. We want to receive a message and there is a message ready
// 5. There is trailing metadata, even if nothing specifically wants
// that because that can shut down the receive message as well
if ((op->send_message && other && ((other->recv_message_op != NULL) ||
(other->recv_trailing_md_op != NULL))) ||
(op->send_trailing_metadata && !op->send_message) ||
(op->recv_initial_metadata && s->to_read_initial_md_filled) ||
(op->recv_message && (other && other->send_message_op != NULL)) ||
(s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
if (!s->op_closure_scheduled) {
GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE);
s->op_closure_scheduled = true;
}
} else {
s->reads_needed = true;
s->ops_needed = true;
}
} else {
if (error != GRPC_ERROR_NONE) {
// Schedule op's read closures that we didn't push to read state machine
// Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
INPROC_LOG(
GPR_DEBUG,

@ -104,6 +104,10 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"combiner_locks_scheduled_items",
"combiner_locks_scheduled_final_items",
"combiner_locks_offloaded",
"call_combiner_locks_initiated",
"call_combiner_locks_scheduled_items",
"call_combiner_set_notify_on_cancel",
"call_combiner_cancelled",
"executor_scheduled_short_items",
"executor_scheduled_long_items",
"executor_scheduled_to_self",
@ -112,6 +116,9 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"executor_push_retries",
"server_requested_calls",
"server_slowpath_requests_queued",
"cq_ev_queue_trylock_failures",
"cq_ev_queue_trylock_successes",
"cq_ev_queue_transient_pop_failures",
};
const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of client side calls created by this process",
@ -210,6 +217,11 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of items scheduled against combiner locks",
"Number of final items scheduled against combiner locks",
"Number of combiner locks offloaded to different threads",
"Number of call combiner lock entries by process (first items queued to a "
"call combiner)",
"Number of items scheduled against call combiner locks",
"Number of times a cancellation callback was set on a call combiner",
"Number of times a call combiner was cancelled",
"Number of finite runtime closures scheduled against the executor (gRPC "
"thread pool)",
"Number of potentially infinite runtime closures scheduled against the "
@ -222,6 +234,12 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"How many calls were requested (not necessarily received) by the server",
"How many times was the server slow path taken (indicates too few "
"outstanding requests)",
"Number of lock (trylock) acquisition failures on completion queue event "
"queue. High value here indicates high contention on completion queues",
"Number of lock (trylock) acquisition successes on completion queue event "
"queue.",
"Number of times NULL was popped out of completion queue's event queue "
"even though the event queue was not empty",
};
const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = {
"call_initial_size",

@ -110,6 +110,10 @@ typedef enum {
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED,
GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED,
GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS,
GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL,
GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF,
@ -118,6 +122,9 @@ typedef enum {
GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES,
GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS,
GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED,
GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES,
GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES,
GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES,
GRPC_STATS_COUNTER_COUNT
} grpc_stats_counters;
extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT];
@ -404,6 +411,17 @@ typedef enum {
#define GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED)
#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED)
#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS)
#define GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL)
#define GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED)
#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS)
@ -425,6 +443,15 @@ typedef enum {
#define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED)
#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES)
#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES)
#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(exec_ctx) \
GRPC_STATS_INC_COUNTER( \
(exec_ctx), GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES)
#define GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, value) \
grpc_stats_inc_call_initial_size((exec_ctx), (int)(value))
void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int x);

@ -245,6 +245,16 @@
doc: Number of final items scheduled against combiner locks
- counter: combiner_locks_offloaded
doc: Number of combiner locks offloaded to different threads
# call combiner locks
- counter: call_combiner_locks_initiated
doc: Number of call combiner lock entries by process
(first items queued to a call combiner)
- counter: call_combiner_locks_scheduled_items
doc: Number of items scheduled against call combiner locks
- counter: call_combiner_set_notify_on_cancel
doc: Number of times a cancellation callback was set on a call combiner
- counter: call_combiner_cancelled
doc: Number of times a call combiner was cancelled
# executor
- counter: executor_scheduled_short_items
doc: Number of finite runtime closures scheduled against the executor
@ -272,4 +282,13 @@
- counter: server_slowpath_requests_queued
doc: How many times was the server slow path taken (indicates too few
outstanding requests)
# cq
- counter: cq_ev_queue_trylock_failures
doc: Number of lock (trylock) acquisition failures on completion queue event
queue. High value here indicates high contention on completion queues
- counter: cq_ev_queue_trylock_successes
doc: Number of lock (trylock) acquisition successes on completion queue event
queue.
- counter: cq_ev_queue_transient_pop_failures
doc: Number of times NULL was popped out of completion queue's event queue
even though the event queue was not empty

@ -79,6 +79,10 @@ combiner_locks_initiated_per_iteration:FLOAT,
combiner_locks_scheduled_items_per_iteration:FLOAT,
combiner_locks_scheduled_final_items_per_iteration:FLOAT,
combiner_locks_offloaded_per_iteration:FLOAT,
call_combiner_locks_initiated_per_iteration:FLOAT,
call_combiner_locks_scheduled_items_per_iteration:FLOAT,
call_combiner_set_notify_on_cancel_per_iteration:FLOAT,
call_combiner_cancelled_per_iteration:FLOAT,
executor_scheduled_short_items_per_iteration:FLOAT,
executor_scheduled_long_items_per_iteration:FLOAT,
executor_scheduled_to_self_per_iteration:FLOAT,
@ -86,4 +90,7 @@ executor_wakeup_initiated_per_iteration:FLOAT,
executor_queue_drained_per_iteration:FLOAT,
executor_push_retries_per_iteration:FLOAT,
server_requested_calls_per_iteration:FLOAT,
server_slowpath_requests_queued_per_iteration:FLOAT
server_slowpath_requests_queued_per_iteration:FLOAT,
cq_ev_queue_trylock_failures_per_iteration:FLOAT,
cq_ev_queue_trylock_successes_per_iteration:FLOAT,
cq_ev_queue_transient_pop_failures_per_iteration:FLOAT

@ -21,6 +21,7 @@
#include <inttypes.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/stats.h"
grpc_tracer_flag grpc_call_combiner_trace =
GRPC_TRACER_INITIALIZER(false, "call_combiner");
@ -73,7 +74,9 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
prev_size + 1);
}
GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx);
if (prev_size == 0) {
GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx);
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
}
@ -135,6 +138,7 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner,
grpc_closure* closure) {
GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx);
while (true) {
// Decode original state.
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
@ -179,6 +183,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner,
grpc_error* error) {
GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx);
while (true) {
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
grpc_error* original_error = decode_cancel_state_error(original_state);

@ -165,6 +165,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
lock, cl, last));
if (last == 1) {
GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx);
GPR_TIMER_MARK("combiner.initiated", 0);
gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
(gpr_atm)exec_ctx);
// first element on this list: add it to the list of combiner locks

@ -61,12 +61,43 @@ typedef struct {
event_engine_factory_fn factory;
} event_engine_factory;
namespace {
extern "C" {
grpc_poll_function_type real_poll_function;
int dummy_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
if (timeout == 0) {
return real_poll_function(fds, nfds, 0);
} else {
gpr_log(GPR_ERROR, "Attempted a blocking poll when declared non-polling.");
GPR_ASSERT(false);
return -1;
}
}
} // extern "C"
const grpc_event_engine_vtable *init_non_polling(bool explicit_request) {
if (!explicit_request) {
return nullptr;
}
// return the simplest engine as a dummy but also override the poller
auto ret = grpc_init_poll_posix(explicit_request);
real_poll_function = grpc_poll_function;
grpc_poll_function = dummy_poll;
return ret;
}
} // namespace
static const event_engine_factory g_factories[] = {
{"epoll1", grpc_init_epoll1_linux},
{"epollsig", grpc_init_epollsig_linux},
{"poll", grpc_init_poll_posix},
{"poll-cv", grpc_init_poll_cv_posix},
{"epollex", grpc_init_epollex_linux},
{"none", init_non_polling},
};
static void add(const char *beg, const char *end, char ***ss, size_t *ns) {

@ -109,6 +109,16 @@
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_OPENBSD)
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_UNIX_SOCKET 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1

@ -209,9 +209,9 @@ static void init_output() {
static void rotate_log() {
/* Using malloc here, as this code could end up being called by gpr_malloc */
gpr_timer_log *new = malloc(sizeof(*new));
gpr_timer_log *log = static_cast<gpr_timer_log *>(malloc(sizeof(*log)));
gpr_once_init(&g_once_init, init_output);
new->num_entries = 0;
log->num_entries = 0;
pthread_mutex_lock(&g_mu);
if (g_thread_log != NULL) {
timer_log_remove(&g_in_progress_logs, g_thread_log);
@ -221,9 +221,9 @@ static void rotate_log() {
} else {
g_thread_id = g_next_thread_id++;
}
timer_log_push_back(&g_in_progress_logs, new);
timer_log_push_back(&g_in_progress_logs, log);
pthread_mutex_unlock(&g_mu);
g_thread_log = new;
g_thread_log = log;
}
static void gpr_timers_log_add(const char *tagstr, marker_type type,

@ -42,7 +42,7 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) {
return rv;
}
#if _POSIX_TIMERS > 0
#if _POSIX_TIMERS > 0 || defined(__OpenBSD__)
static gpr_timespec gpr_from_timespec(struct timespec ts,
gpr_clock_type clock_type) {
/*

@ -362,10 +362,23 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
grpc_cq_completion *c = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (gpr_spinlock_trylock(&q->queue_lock)) {
c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue);
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
bool is_empty = false;
c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
gpr_spinlock_unlock(&q->queue_lock);
if (c == NULL && !is_empty) {
GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx);
}
} else {
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
if (c) {
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);

@ -36,7 +36,21 @@ namespace Grpc.Core
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
internal AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
/// <summary>
/// Creates a new AsyncClientStreamingCall object with the specified properties.
/// </summary>
/// <param name="requestStream">Stream of request values.</param>
/// <param name="responseAsync">The response of the asynchronous call.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream,
Task<TResponse> responseAsync,
Task<Metadata> responseHeadersAsync,
Func<Status> getStatusFunc,
Func<Metadata> getTrailersFunc,
Action disposeAction)
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;

@ -35,7 +35,21 @@ namespace Grpc.Core
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
internal AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
/// </summary>
/// <param name="requestStream">Stream of request values.</param>
/// <param name="responseStream">Stream of response values.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream,
IAsyncStreamReader<TResponse> responseStream,
Task<Metadata> responseHeadersAsync,
Func<Status> getStatusFunc,
Func<Metadata> getTrailersFunc,
Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;

@ -33,7 +33,19 @@ namespace Grpc.Core
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
internal AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
/// </summary>
/// <param name="responseStream">Stream of response values.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream,
Task<Metadata> responseHeadersAsync,
Func<Status> getStatusFunc,
Func<Metadata> getTrailersFunc,
Action disposeAction)
{
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;

@ -34,7 +34,20 @@ namespace Grpc.Core
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
internal AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
/// <summary>
/// Creates a new AsyncUnaryCall object with the specified properties.
/// </summary>
/// <param name="responseAsync">The response of the asynchronous call.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
public AsyncUnaryCall(Task<TResponse> responseAsync,
Task<Metadata> responseHeadersAsync,
Func<Status> getStatusFunc,
Func<Metadata> getTrailersFunc,
Action disposeAction)
{
this.responseAsync = responseAsync;
this.responseHeadersAsync = responseHeadersAsync;

@ -18,6 +18,7 @@
#import <UIKit/UIKit.h>
#import <XCTest/XCTest.h>
#import <grpc/grpc.h>
#import <GRPCClient/GRPCCall.h>
#import <GRPCClient/GRPCCall+ChannelArg.h>
@ -30,6 +31,8 @@
#import <RxLibrary/GRXWriter+Immediate.h>
#import <RxLibrary/GRXBufferedPipe.h>
#import "version.h"
#define TEST_TIMEOUT 16
static NSString * const kHostAddress = @"localhost:5050";
@ -266,12 +269,38 @@ static GRPCProtoMethod *kFullDuplexCallMethod;
id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
XCTAssertNotNil(value, @"nil value received as response.");
XCTAssertEqual([value length], 0, @"Non-empty response received: %@", value);
/* This test needs to be more clever in regards to changing the version of the core.
XCTAssertEqualObjects(call.responseHeaders[@"x-grpc-test-echo-useragent"],
@"Foo grpc-objc/0.13.0 grpc-c/0.14.0-dev (ios)",
@"Did not receive expected user agent %@",
call.responseHeaders[@"x-grpc-test-echo-useragent"]);
*/
NSString *userAgent = call.responseHeaders[@"x-grpc-test-echo-useragent"];
NSError *error = nil;
// Test the regex is correct
NSString *expectedUserAgent = @"Foo grpc-objc/";
expectedUserAgent =
[expectedUserAgent stringByAppendingString:GRPC_OBJC_VERSION_STRING];
expectedUserAgent =
[expectedUserAgent stringByAppendingString:@" grpc-c/"];
expectedUserAgent =
[expectedUserAgent stringByAppendingString:GRPC_C_VERSION_STRING];
expectedUserAgent =
[expectedUserAgent stringByAppendingString:@" (ios; chttp2; "];
expectedUserAgent =
[expectedUserAgent stringByAppendingString:[NSString stringWithUTF8String:grpc_g_stands_for()]];
expectedUserAgent = [expectedUserAgent stringByAppendingString:@")"];
XCTAssertEqualObjects(userAgent, expectedUserAgent);
// Change in format of user-agent field in a direction that does not match the regex will likely
// cause problem for certain gRPC users. @muxi for details.
NSRegularExpression *regex =
[NSRegularExpression regularExpressionWithPattern:@" grpc-[a-zA-Z0-9]+(-[a-zA-Z0-9]+)?/[^ ,]+( \\([^)]*\\))?"
options:0
error:&error];
NSString *customUserAgent =
[regex stringByReplacingMatchesInString:userAgent
options:0
range:NSMakeRange(0, [userAgent length])
withTemplate:@""];
XCTAssertEqualObjects(customUserAgent, @"Foo");
[response fulfill];
} completionHandler:^(NSError *errorOrNil) {
XCTAssertNil(errorOrNil, @"Finished with unexpected error: %@", errorOrNil);

@ -144,6 +144,7 @@
5EAD6D241E27047400002378 /* CronetUnitTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = CronetUnitTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
5EAD6D261E27047400002378 /* CronetUnitTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = CronetUnitTests.m; sourceTree = "<group>"; };
5EAD6D281E27047400002378 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
5EAFE8271F8EFB87007F2189 /* version.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = version.h; sourceTree = "<group>"; };
5EE84BF11D4717E40050C6CC /* InteropTestsRemoteWithCronet.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = InteropTestsRemoteWithCronet.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
5EE84BF31D4717E40050C6CC /* InteropTestsRemoteWithCronet.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = InteropTestsRemoteWithCronet.m; sourceTree = "<group>"; };
5EE84BF51D4717E40050C6CC /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
@ -398,6 +399,7 @@
635697C91B14FC11007A7283 /* Tests */ = {
isa = PBXGroup;
children = (
5EAFE8271F8EFB87007F2189 /* version.h */,
6312AE4D1B1BF49B00341DEE /* GRPCClientTests.m */,
63E240CC1B6C4D3A005F3B0E /* InteropTests.h */,
635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */,
@ -740,9 +742,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-CronetUnitTests/Pods-CronetUnitTests-resources.sh",
$PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
"${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -755,9 +760,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsRemoteWithCronet/Pods-InteropTestsRemoteWithCronet-frameworks.sh",
"${PODS_ROOT}/CronetFramework/Cronet.framework",
);
name = "[CP] Embed Pods Frameworks";
outputPaths = (
"${TARGET_BUILD_DIR}/${FRAMEWORKS_FOLDER_PATH}/Cronet.framework",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -770,13 +778,16 @@
files = (
);
inputPaths = (
"${PODS_PODFILE_DIR_PATH}/Podfile.lock",
"${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
"$(DERIVED_FILE_DIR)/Pods-InteropTestsRemote-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
4F5690DC0E6AD6663FE78B8B /* [CP] Embed Pods Frameworks */ = {
@ -800,13 +811,16 @@
files = (
);
inputPaths = (
"${PODS_PODFILE_DIR_PATH}/Podfile.lock",
"${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
"$(DERIVED_FILE_DIR)/Pods-InteropTestsLocalSSL-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
5F14F59509E10C2852014F9E /* [CP] Embed Pods Frameworks */ = {
@ -830,9 +844,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsLocalSSL/Pods-InteropTestsLocalSSL-resources.sh",
$PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
"${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -845,9 +862,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-CoreCronetEnd2EndTests/Pods-CoreCronetEnd2EndTests-resources.sh",
$PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
"${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -860,13 +880,16 @@
files = (
);
inputPaths = (
"${PODS_PODFILE_DIR_PATH}/Podfile.lock",
"${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
"$(DERIVED_FILE_DIR)/Pods-InteropTestsLocalCleartext-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
796680C7599CB4ED736DD62A /* [CP] Check Pods Manifest.lock */ = {
@ -875,13 +898,16 @@
files = (
);
inputPaths = (
"${PODS_PODFILE_DIR_PATH}/Podfile.lock",
"${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
"$(DERIVED_FILE_DIR)/Pods-Tests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
80E2DDD2EC04A4009F45E933 /* [CP] Check Pods Manifest.lock */ = {
@ -890,13 +916,16 @@
files = (
);
inputPaths = (
"${PODS_PODFILE_DIR_PATH}/Podfile.lock",
"${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
"$(DERIVED_FILE_DIR)/Pods-CronetUnitTests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
8AD3130D3C58A0FB32FF2A36 /* [CP] Copy Pods Resources */ = {
@ -905,9 +934,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsLocalCleartext/Pods-InteropTestsLocalCleartext-resources.sh",
$PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
"${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -935,13 +967,16 @@
files = (
);
inputPaths = (
"${PODS_PODFILE_DIR_PATH}/Podfile.lock",
"${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
"$(DERIVED_FILE_DIR)/Pods-AllTests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
A441F71824DCB9D0CA297748 /* [CP] Copy Pods Resources */ = {
@ -950,9 +985,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-AllTests/Pods-AllTests-resources.sh",
$PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
"${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -965,9 +1003,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-CronetUnitTests/Pods-CronetUnitTests-frameworks.sh",
"${PODS_ROOT}/CronetFramework/Cronet.framework",
);
name = "[CP] Embed Pods Frameworks";
outputPaths = (
"${TARGET_BUILD_DIR}/${FRAMEWORKS_FOLDER_PATH}/Cronet.framework",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -995,9 +1036,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-Tests/Pods-Tests-resources.sh",
$PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
"${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -1010,13 +1054,16 @@
files = (
);
inputPaths = (
"${PODS_PODFILE_DIR_PATH}/Podfile.lock",
"${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
"$(DERIVED_FILE_DIR)/Pods-RxLibraryUnitTests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
C0F7B1FF6F88CC5FBF362F4C /* [CP] Check Pods Manifest.lock */ = {
@ -1025,13 +1072,16 @@
files = (
);
inputPaths = (
"${PODS_PODFILE_DIR_PATH}/Podfile.lock",
"${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
"$(DERIVED_FILE_DIR)/Pods-InteropTestsRemoteWithCronet-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
C2E09DC4BD239F71160F0CC1 /* [CP] Copy Pods Resources */ = {
@ -1040,9 +1090,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsRemote/Pods-InteropTestsRemote-resources.sh",
$PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
"${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -1070,9 +1123,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-RxLibraryUnitTests/Pods-RxLibraryUnitTests-resources.sh",
$PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
"${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -1085,9 +1141,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-InteropTestsRemoteWithCronet/Pods-InteropTestsRemoteWithCronet-resources.sh",
$PODS_CONFIGURATION_BUILD_DIR/gRPC/gRPCCertificates.bundle,
);
name = "[CP] Copy Pods Resources";
outputPaths = (
"${TARGET_BUILD_DIR}/${UNLOCALIZED_RESOURCES_FOLDER_PATH}",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -1100,9 +1159,12 @@
files = (
);
inputPaths = (
"${SRCROOT}/Pods/Target Support Files/Pods-CoreCronetEnd2EndTests/Pods-CoreCronetEnd2EndTests-frameworks.sh",
"${PODS_ROOT}/CronetFramework/Cronet.framework",
);
name = "[CP] Embed Pods Frameworks";
outputPaths = (
"${TARGET_BUILD_DIR}/${FRAMEWORKS_FOLDER_PATH}/Cronet.framework",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
@ -1115,13 +1177,16 @@
files = (
);
inputPaths = (
"${PODS_PODFILE_DIR_PATH}/Podfile.lock",
"${PODS_ROOT}/Manifest.lock",
);
name = "[CP] Check Pods Manifest.lock";
outputPaths = (
"$(DERIVED_FILE_DIR)/Pods-CoreCronetEnd2EndTests-checkManifestLockResult.txt",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "diff \"${PODS_ROOT}/../Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n";
shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n";
showEnvVarsInLog = 0;
};
/* End PBXShellScriptBuildPhase section */

@ -0,0 +1,27 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// This file is autogenerated from a template file. Please make
// modifications to
// `templates/src/objective-c/GRPCClient/private/version.h.template`
// instead. This file can be regenerated from the template by running
// `tools/buildgen/generate_projects.sh`.
#define GRPC_OBJC_VERSION_STRING @"1.8.0-dev"
#define GRPC_C_VERSION_STRING @"5.0.0-dev"

@ -218,7 +218,7 @@ class BaseStub
* (optional)
* @param array $options An array of options (optional)
*
* @return SimpleSurfaceActiveCall The active call object
* @return UnaryCall The active call object
*/
protected function _simpleRequest($method,
$argument,
@ -253,7 +253,7 @@ class BaseStub
* (optional)
* @param array $options An array of options (optional)
*
* @return ClientStreamingSurfaceActiveCall The active call object
* @return ClientStreamingCall The active call object
*/
protected function _clientStreamRequest($method,
$deserialize,
@ -288,7 +288,7 @@ class BaseStub
* (optional)
* @param array $options An array of options (optional)
*
* @return ServerStreamingSurfaceActiveCall The active call object
* @return ServerStreamingCall The active call object
*/
protected function _serverStreamRequest($method,
$argument,
@ -322,7 +322,7 @@ class BaseStub
* (optional)
* @param array $options An array of options (optional)
*
* @return BidiStreamingSurfaceActiveCall The active call object
* @return BidiStreamingCall The active call object
*/
protected function _bidiRequest($method,
$deserialize,

@ -41,6 +41,7 @@ LIB_DIRS = [
]
windows = RUBY_PLATFORM =~ /mingw|mswin/
bsd = RUBY_PLATFORM =~ /bsd/
grpc_root = File.expand_path(File.join(File.dirname(__FILE__), '../../../..'))
@ -70,7 +71,8 @@ unless windows
puts 'Building internal gRPC into ' + grpc_lib_dir
nproc = 4
nproc = Etc.nprocessors * 2 if Etc.respond_to? :nprocessors
system("make -j#{nproc} -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config} Q=")
make = bsd ? 'gmake' : 'make'
system("#{make} -j#{nproc} -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config} Q=")
exit 1 unless $? == 0
end

@ -0,0 +1,29 @@
%YAML 1.2
--- |
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// This file is autogenerated from a template file. Please make
// modifications to
// `templates/src/objective-c/GRPCClient/private/version.h.template`
// instead. This file can be regenerated from the template by running
// `tools/buildgen/generate_projects.sh`.
#define GRPC_OBJC_VERSION_STRING @"${settings.version}"
#define GRPC_C_VERSION_STRING @"${settings.core_version}"

@ -24,15 +24,15 @@ import hashlib
FixtureOptions = collections.namedtuple(
'FixtureOptions',
'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2 supports_proxy_auth')
'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2 supports_proxy_auth supports_write_buffering')
default_unsecure_fixture_options = FixtureOptions(
True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True, False)
True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True, False, True)
socketpair_unsecure_fixture_options = default_unsecure_fixture_options._replace(fullstack=False, dns_resolver=False)
default_secure_fixture_options = default_unsecure_fixture_options._replace(secure=True)
uds_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, platforms=['linux', 'mac', 'posix'], exclude_iomgrs=['uv'])
fd_unsecure_fixture_options = default_unsecure_fixture_options._replace(
dns_resolver=False, fullstack=False, platforms=['linux', 'mac', 'posix'], exclude_iomgrs=['uv'])
inproc_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, fullstack=False, name_resolution=False, supports_compression=False, is_inproc=True, is_http2=False)
inproc_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, fullstack=False, name_resolution=False, supports_compression=False, is_inproc=True, is_http2=False, supports_write_buffering=False)
# maps fixture name to whether it requires the security library
END2END_FIXTURES = {
@ -68,8 +68,8 @@ END2END_FIXTURES = {
TestOptions = collections.namedtuple(
'TestOptions',
'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2 needs_proxy_auth')
default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False, False)
'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2 needs_proxy_auth needs_write_buffering')
default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False, False, False)
connectivity_test_options = default_test_options._replace(needs_fullstack=True)
LOWCPU = 0.1
@ -146,8 +146,10 @@ END2END_TESTS = {
'streaming_error_response': default_test_options._replace(cpu_cost=LOWCPU),
'trailing_metadata': default_test_options,
'workaround_cronet_compression': default_test_options,
'write_buffering': default_test_options._replace(cpu_cost=LOWCPU),
'write_buffering_at_end': default_test_options._replace(cpu_cost=LOWCPU),
'write_buffering': default_test_options._replace(cpu_cost=LOWCPU,
needs_write_buffering=True),
'write_buffering_at_end': default_test_options._replace(cpu_cost=LOWCPU,
needs_write_buffering=True),
}
@ -185,6 +187,9 @@ def compatible(f, t):
if END2END_TESTS[t].needs_proxy_auth:
if not END2END_FIXTURES[f].supports_proxy_auth:
return False
if END2END_TESTS[t].needs_write_buffering:
if not END2END_FIXTURES[f].supports_write_buffering:
return False
return True

@ -21,7 +21,8 @@ load("//bazel:grpc_build_system.bzl", "grpc_sh_test", "grpc_cc_binary", "grpc_cc
def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True,
name_resolution=True, secure=True, tracing=False,
platforms=['windows', 'linux', 'mac', 'posix'],
is_inproc=False, is_http2=True, supports_proxy_auth=False):
is_inproc=False, is_http2=True, supports_proxy_auth=False,
supports_write_buffering=True):
return struct(
fullstack=fullstack,
includes_proxy=includes_proxy,
@ -31,7 +32,8 @@ def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True,
tracing=tracing,
is_inproc=is_inproc,
is_http2=is_http2,
supports_proxy_auth=supports_proxy_auth
supports_proxy_auth=supports_proxy_auth,
supports_write_buffering=supports_write_buffering
#platforms=platforms
)
@ -61,14 +63,14 @@ END2END_FIXTURES = {
platforms=['linux', 'mac', 'posix']),
'inproc': fixture_options(fullstack=False, dns_resolver=False,
name_resolution=False, is_inproc=True,
is_http2=False),
is_http2=False, supports_write_buffering=False),
}
def test_options(needs_fullstack=False, needs_dns=False, needs_names=False,
proxyable=True, secure=False, traceable=False,
exclude_inproc=False, needs_http2=False,
needs_proxy_auth=False):
needs_proxy_auth=False, needs_write_buffering=False):
return struct(
needs_fullstack=needs_fullstack,
needs_dns=needs_dns,
@ -78,7 +80,8 @@ def test_options(needs_fullstack=False, needs_dns=False, needs_names=False,
traceable=traceable,
exclude_inproc=exclude_inproc,
needs_http2=needs_http2,
needs_proxy_auth=needs_proxy_auth
needs_proxy_auth=needs_proxy_auth,
needs_write_buffering=needs_write_buffering
)
@ -144,8 +147,8 @@ END2END_TESTS = {
'authority_not_supported': test_options(),
'filter_latency': test_options(),
'workaround_cronet_compression': test_options(),
'write_buffering': test_options(),
'write_buffering_at_end': test_options(),
'write_buffering': test_options(needs_write_buffering=True),
'write_buffering_at_end': test_options(needs_write_buffering=True),
}
@ -174,6 +177,9 @@ def compatible(fopt, topt):
if topt.needs_proxy_auth:
if not fopt.supports_proxy_auth:
return False
if topt.needs_write_buffering:
if not fopt.supports_write_buffering:
return False
return True

@ -183,7 +183,6 @@ static void test(grpc_end2end_test_config config, bool request_status_early) {
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(103), 1);
cq_verify(cqv);
if (!request_status_early) {
memset(ops, 0, sizeof(ops));

@ -1304,7 +1304,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
ServerTryCancelRequestPhase server_try_cancel) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
@ -1315,31 +1314,24 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
// Initiate the 'RequestStream' call on client
CompletionQueue cli_cq;
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
std::thread t1([this, &cli_cq] {
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq);
});
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_request.set_message("Ping " + grpc::to_string(tag_idx));
cli_stream->Write(send_request, tag(tag_idx));
Verifier(GetParam().disable_blocking)
.Expect(tag_idx, true)
.Verify(cq_.get());
}
cli_stream->WritesDone(tag(6));
Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
t1.join();
bool expected_server_cq_result = true;
bool ignore_cq_result = false;
bool want_done_tag = false;
bool expected_client_cq_result = true;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
@ -1347,10 +1339,36 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know
// for sure that all cq results will return false from this point forward
// for sure that all server cq results will return false from this
// point forward
expected_server_cq_result = false;
expected_client_cq_result = false;
}
bool ignore_client_cq_result =
(server_try_cancel == CANCEL_DURING_PROCESSING) ||
(server_try_cancel == CANCEL_BEFORE_PROCESSING);
std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
&ignore_client_cq_result, this] {
EchoRequest send_request;
// Client sends 3 messages (tags 3, 4 and 5)
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_request.set_message("Ping " + grpc::to_string(tag_idx));
cli_stream->Write(send_request, tag(tag_idx));
Verifier(GetParam().disable_blocking)
.Expect(tag_idx, expected_client_cq_result)
.Verify(&cli_cq, ignore_client_cq_result);
}
cli_stream->WritesDone(tag(6));
// Ignore ok on WritesDone since cancel can affect it
Verifier(GetParam().disable_blocking)
.Expect(6, expected_client_cq_result)
.Verify(&cli_cq, ignore_client_cq_result);
});
bool ignore_cq_result = false;
bool want_done_tag = false;
std::thread* server_try_cancel_thd = nullptr;
auto verif = Verifier(GetParam().disable_blocking);
@ -1387,6 +1405,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
}
cli_thread.join();
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
@ -1415,9 +1435,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq);
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
cli_cq.Shutdown();
void* dummy_tag;
bool dummy_ok;
while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
}
}
// Helper for testing server-streaming RPCs which are cancelled on the server.
@ -1439,7 +1465,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
@ -1447,20 +1472,29 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
send_request.set_message("Ping");
// Initiate the 'ResponseStream' call on the client
CompletionQueue cli_cq;
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
std::thread t1([this, &cli_cq] {
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq);
});
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
t1.join();
EXPECT_EQ(send_request.message(), recv_request.message());
bool expected_cq_result = true;
bool ignore_cq_result = false;
bool want_done_tag = false;
bool expected_client_cq_result = true;
bool ignore_client_cq_result =
(server_try_cancel != CANCEL_BEFORE_PROCESSING);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
@ -1470,7 +1504,20 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
expected_cq_result = false;
expected_client_cq_result = false;
}
std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
&ignore_client_cq_result, this] {
// Client attempts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
EchoResponse recv_response;
cli_stream->Read(&recv_response, tag(tag_idx));
Verifier(GetParam().disable_blocking)
.Expect(tag_idx, expected_client_cq_result)
.Verify(&cli_cq, ignore_client_cq_result);
}
});
std::thread* server_try_cancel_thd = nullptr;
@ -1519,10 +1566,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
srv_ctx.TryCancel();
want_done_tag = true;
verif.Expect(11, true);
// Client reads may fail bacause it is notified that the stream is
// cancelled.
ignore_cq_result = true;
}
if (want_done_tag) {
@ -1531,13 +1574,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
want_done_tag = false;
}
// Client attemts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
cli_stream->Read(&recv_response, tag(tag_idx));
Verifier(GetParam().disable_blocking)
.Expect(tag_idx, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
}
cli_thread.join();
// The RPC has been cancelled at this point for sure (i.e irrespective of
// the value of `server_try_cancel` is). So, from this point forward, we
@ -1549,9 +1586,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq);
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
cli_cq.Shutdown();
void* dummy_tag;
bool dummy_ok;
while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
}
}
// Helper for testing bidirectinal-streaming RPCs which are cancelled on the
@ -1584,38 +1627,52 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the call from the client side
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(1, true)
.Expect(2, true)
.Verify(cq_.get());
auto verif = Verifier(GetParam().disable_blocking);
// Client sends the first and the only message
send_request.set_message("Ping");
cli_stream->Write(send_request, tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
verif.Expect(3, true);
bool expected_cq_result = true;
bool ignore_cq_result = false;
bool want_done_tag = false;
int got_tag, got_tag2;
bool tag_3_done = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
verif.Expect(11, true);
// We know for sure that all server cq results will be false from
// this point since the server cancelled the RPC. However, we can't
// say for sure about the client
expected_cq_result = false;
ignore_cq_result = true;
do {
got_tag = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
if (got_tag == 3) {
tag_3_done = true;
}
} while (got_tag != 11);
EXPECT_TRUE(srv_ctx.IsCancelled());
}
std::thread* server_try_cancel_thd = nullptr;
auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread(&ServerContext::TryCancel, &srv_ctx);
@ -1630,39 +1687,42 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
verif.Expect(11, true);
}
int got_tag;
srv_stream.Read(&recv_request, tag(4));
verif.Expect(4, expected_cq_result);
got_tag = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
(got_tag == 11 && want_done_tag));
GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
(got_tag2 == 11 && want_done_tag));
// If we get 3 and 4, we don't need to wait for 11, but if
// we get 11, we should also clear 3 and 4
if (got_tag + got_tag2 != 7) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
got_tag = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == 3) || (got_tag == 4));
}
send_response.set_message("Pong");
srv_stream.Write(send_response, tag(5));
verif.Expect(5, expected_cq_result);
got_tag = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
}
cli_stream->Read(&recv_response, tag(6));
verif.Expect(6, expected_cq_result);
got_tag = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
(got_tag == 11 && want_done_tag));
GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
(got_tag2 == 11 && want_done_tag));
// If we get 5 and 6, we don't need to wait for 11, but if
// we get 11, we should also clear 5 and 6
if (got_tag + got_tag2 != 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
got_tag = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == 5) || (got_tag == 6));
}
// This is expected to succeed in all cases

@ -0,0 +1,502 @@
/* ares_config.h. Generated from ares_config.h.in by configure. */
/* ares_config.h.in. Generated from configure.ac by autoheader. */
/* Define if building universal (internal helper macro) */
/* #undef AC_APPLE_UNIVERSAL_BUILD */
/* define this if ares is built for a big endian system */
/* #undef ARES_BIG_ENDIAN */
/* when building as static part of libcurl */
/* #undef BUILDING_LIBCURL */
/* Defined for build that exposes internal static functions for testing. */
/* #undef CARES_EXPOSE_STATICS */
/* Defined for build with symbol hiding. */
#define CARES_SYMBOL_HIDING 1
/* Definition to make a library symbol externally visible. */
#define CARES_SYMBOL_SCOPE_EXTERN __attribute__ ((__visibility__ ("default")))
/* the signed version of size_t */
#define CARES_TYPEOF_ARES_SSIZE_T ssize_t
/* Use resolver library to configure cares */
/* #undef CARES_USE_LIBRESOLV */
/* if a /etc/inet dir is being used */
/* #undef ETC_INET */
/* Define to the type of arg 2 for gethostname. */
#define GETHOSTNAME_TYPE_ARG2 size_t
/* Define to the type qualifier of arg 1 for getnameinfo. */
#define GETNAMEINFO_QUAL_ARG1 const
/* Define to the type of arg 1 for getnameinfo. */
#define GETNAMEINFO_TYPE_ARG1 struct sockaddr *
/* Define to the type of arg 2 for getnameinfo. */
#define GETNAMEINFO_TYPE_ARG2 socklen_t
/* Define to the type of args 4 and 6 for getnameinfo. */
#define GETNAMEINFO_TYPE_ARG46 size_t
/* Define to the type of arg 7 for getnameinfo. */
#define GETNAMEINFO_TYPE_ARG7 int
/* Specifies the number of arguments to getservbyport_r */
#define GETSERVBYPORT_R_ARGS 4
/* Specifies the size of the buffer to pass to getservbyport_r */
#define GETSERVBYPORT_R_BUFSIZE sizeof(struct servent_data)
/* Define to 1 if you have AF_INET6. */
#define HAVE_AF_INET6 1
/* Define to 1 if you have the <arpa/inet.h> header file. */
#define HAVE_ARPA_INET_H 1
/* Define to 1 if you have the <arpa/nameser_compat.h> header file. */
/* #undef HAVE_ARPA_NAMESER_COMPAT_H */
/* Define to 1 if you have the <arpa/nameser.h> header file. */
#define HAVE_ARPA_NAMESER_H 1
/* Define to 1 if you have the <assert.h> header file. */
#define HAVE_ASSERT_H 1
/* Define to 1 if you have the `bitncmp' function. */
/* #undef HAVE_BITNCMP */
/* Define to 1 if bool is an available type. */
#define HAVE_BOOL_T 1
/* Define to 1 if you have the clock_gettime function and monotonic timer. */
#define HAVE_CLOCK_GETTIME_MONOTONIC 1
/* Define to 1 if you have the closesocket function. */
/* #undef HAVE_CLOSESOCKET */
/* Define to 1 if you have the CloseSocket camel case function. */
/* #undef HAVE_CLOSESOCKET_CAMEL */
/* Define to 1 if you have the connect function. */
#define HAVE_CONNECT 1
/* define if the compiler supports basic C++11 syntax */
/* #undef HAVE_CXX11 */
/* Define to 1 if you have the <dlfcn.h> header file. */
#define HAVE_DLFCN_H 1
/* Define to 1 if you have the <errno.h> header file. */
#define HAVE_ERRNO_H 1
/* Define to 1 if you have the fcntl function. */
#define HAVE_FCNTL 1
/* Define to 1 if you have the <fcntl.h> header file. */
#define HAVE_FCNTL_H 1
/* Define to 1 if you have a working fcntl O_NONBLOCK function. */
#define HAVE_FCNTL_O_NONBLOCK 1
/* Define to 1 if you have the freeaddrinfo function. */
#define HAVE_FREEADDRINFO 1
/* Define to 1 if you have a working getaddrinfo function. */
#define HAVE_GETADDRINFO 1
/* Define to 1 if the getaddrinfo function is threadsafe. */
/* #undef HAVE_GETADDRINFO_THREADSAFE */
/* Define to 1 if you have the getenv function. */
#define HAVE_GETENV 1
/* Define to 1 if you have the gethostbyaddr function. */
#define HAVE_GETHOSTBYADDR 1
/* Define to 1 if you have the gethostbyname function. */
#define HAVE_GETHOSTBYNAME 1
/* Define to 1 if you have the gethostname function. */
#define HAVE_GETHOSTNAME 1
/* Define to 1 if you have the getnameinfo function. */
#define HAVE_GETNAMEINFO 1
/* Define to 1 if you have the getservbyport_r function. */
#define HAVE_GETSERVBYPORT_R 1
/* Define to 1 if you have the `gettimeofday' function. */
#define HAVE_GETTIMEOFDAY 1
/* Define to 1 if you have the `if_indextoname' function. */
#define HAVE_IF_INDEXTONAME 1
/* Define to 1 if you have a IPv6 capable working inet_net_pton function. */
/* #undef HAVE_INET_NET_PTON */
/* Define to 1 if you have a IPv6 capable working inet_ntop function. */
#define HAVE_INET_NTOP 1
/* Define to 1 if you have a IPv6 capable working inet_pton function. */
#define HAVE_INET_PTON 1
/* Define to 1 if you have the <inttypes.h> header file. */
#define HAVE_INTTYPES_H 1
/* Define to 1 if you have the ioctl function. */
#define HAVE_IOCTL 1
/* Define to 1 if you have the ioctlsocket function. */
/* #undef HAVE_IOCTLSOCKET */
/* Define to 1 if you have the IoctlSocket camel case function. */
/* #undef HAVE_IOCTLSOCKET_CAMEL */
/* Define to 1 if you have a working IoctlSocket camel case FIONBIO function.
*/
/* #undef HAVE_IOCTLSOCKET_CAMEL_FIONBIO */
/* Define to 1 if you have a working ioctlsocket FIONBIO function. */
/* #undef HAVE_IOCTLSOCKET_FIONBIO */
/* Define to 1 if you have a working ioctl FIONBIO function. */
#define HAVE_IOCTL_FIONBIO 1
/* Define to 1 if you have a working ioctl SIOCGIFADDR function. */
#define HAVE_IOCTL_SIOCGIFADDR 1
/* Define to 1 if you have the `resolve' library (-lresolve). */
/* #undef HAVE_LIBRESOLVE */
/* Define to 1 if you have the <limits.h> header file. */
#define HAVE_LIMITS_H 1
/* if your compiler supports LL */
#define HAVE_LL 1
/* Define to 1 if the compiler supports the 'long long' data type. */
#define HAVE_LONGLONG 1
/* Define to 1 if you have the malloc.h header file. */
/* #undef HAVE_MALLOC_H */
/* Define to 1 if you have the memory.h header file. */
#define HAVE_MEMORY_H 1
/* Define to 1 if you have the MSG_NOSIGNAL flag. */
#define HAVE_MSG_NOSIGNAL 1
/* Define to 1 if you have the <netdb.h> header file. */
#define HAVE_NETDB_H 1
/* Define to 1 if you have the <netinet/in.h> header file. */
#define HAVE_NETINET_IN_H 1
/* Define to 1 if you have the <netinet/tcp.h> header file. */
#define HAVE_NETINET_TCP_H 1
/* Define to 1 if you have the <net/if.h> header file. */
#define HAVE_NET_IF_H 1
/* Define to 1 if you have PF_INET6. */
#define HAVE_PF_INET6 1
/* Define to 1 if you have the recv function. */
#define HAVE_RECV 1
/* Define to 1 if you have the recvfrom function. */
#define HAVE_RECVFROM 1
/* Define to 1 if you have the send function. */
#define HAVE_SEND 1
/* Define to 1 if you have the setsockopt function. */
#define HAVE_SETSOCKOPT 1
/* Define to 1 if you have a working setsockopt SO_NONBLOCK function. */
/* #undef HAVE_SETSOCKOPT_SO_NONBLOCK */
/* Define to 1 if you have the <signal.h> header file. */
#define HAVE_SIGNAL_H 1
/* Define to 1 if sig_atomic_t is an available typedef. */
#define HAVE_SIG_ATOMIC_T 1
/* Define to 1 if sig_atomic_t is already defined as volatile. */
/* #undef HAVE_SIG_ATOMIC_T_VOLATILE */
/* Define to 1 if your struct sockaddr_in6 has sin6_scope_id. */
#define HAVE_SOCKADDR_IN6_SIN6_SCOPE_ID 1
/* Define to 1 if you have the socket function. */
#define HAVE_SOCKET 1
/* Define to 1 if you have the <socket.h> header file. */
/* #undef HAVE_SOCKET_H */
/* Define to 1 if you have the <stdbool.h> header file. */
#define HAVE_STDBOOL_H 1
/* Define to 1 if you have the <stdint.h> header file. */
#define HAVE_STDINT_H 1
/* Define to 1 if you have the <stdlib.h> header file. */
#define HAVE_STDLIB_H 1
/* Define to 1 if you have the strcasecmp function. */
#define HAVE_STRCASECMP 1
/* Define to 1 if you have the strcmpi function. */
/* #undef HAVE_STRCMPI */
/* Define to 1 if you have the strdup function. */
#define HAVE_STRDUP 1
/* Define to 1 if you have the stricmp function. */
/* #undef HAVE_STRICMP */
/* Define to 1 if you have the <strings.h> header file. */
#define HAVE_STRINGS_H 1
/* Define to 1 if you have the <string.h> header file. */
#define HAVE_STRING_H 1
/* Define to 1 if you have the strncasecmp function. */
#define HAVE_STRNCASECMP 1
/* Define to 1 if you have the strncmpi function. */
/* #undef HAVE_STRNCMPI */
/* Define to 1 if you have the strnicmp function. */
/* #undef HAVE_STRNICMP */
/* Define to 1 if you have the <stropts.h> header file. */
/* #undef HAVE_STROPTS_H */
/* Define to 1 if you have struct addrinfo. */
#define HAVE_STRUCT_ADDRINFO 1
/* Define to 1 if you have struct in6_addr. */
#define HAVE_STRUCT_IN6_ADDR 1
/* Define to 1 if you have struct sockaddr_in6. */
#define HAVE_STRUCT_SOCKADDR_IN6 1
/* if struct sockaddr_storage is defined */
#define HAVE_STRUCT_SOCKADDR_STORAGE 1
/* Define to 1 if you have the timeval struct. */
#define HAVE_STRUCT_TIMEVAL 1
/* Define to 1 if you have the <sys/ioctl.h> header file. */
#define HAVE_SYS_IOCTL_H 1
/* Define to 1 if you have the <sys/param.h> header file. */
#define HAVE_SYS_PARAM_H 1
/* Define to 1 if you have the <sys/select.h> header file. */
#define HAVE_SYS_SELECT_H 1
/* Define to 1 if you have the <sys/socket.h> header file. */
#define HAVE_SYS_SOCKET_H 1
/* Define to 1 if you have the <sys/stat.h> header file. */
#define HAVE_SYS_STAT_H 1
/* Define to 1 if you have the <sys/time.h> header file. */
#define HAVE_SYS_TIME_H 1
/* Define to 1 if you have the <sys/types.h> header file. */
#define HAVE_SYS_TYPES_H 1
/* Define to 1 if you have the <sys/uio.h> header file. */
#define HAVE_SYS_UIO_H 1
/* Define to 1 if you have the <time.h> header file. */
#define HAVE_TIME_H 1
/* Define to 1 if you have the <unistd.h> header file. */
#define HAVE_UNISTD_H 1
/* Define to 1 if you have the windows.h header file. */
/* #undef HAVE_WINDOWS_H */
/* Define to 1 if you have the winsock2.h header file. */
/* #undef HAVE_WINSOCK2_H */
/* Define to 1 if you have the winsock.h header file. */
/* #undef HAVE_WINSOCK_H */
/* Define to 1 if you have the writev function. */
#define HAVE_WRITEV 1
/* Define to 1 if you have the ws2tcpip.h header file. */
/* #undef HAVE_WS2TCPIP_H */
/* Define to the sub-directory where libtool stores uninstalled libraries. */
#define LT_OBJDIR ".libs/"
/* Define to 1 if you need the malloc.h header file even with stdlib.h */
/* #undef NEED_MALLOC_H */
/* Define to 1 if you need the memory.h header file even with stdlib.h */
/* #undef NEED_MEMORY_H */
/* Define to 1 if _REENTRANT preprocessor symbol must be defined. */
/* #undef NEED_REENTRANT */
/* Define to 1 if _THREAD_SAFE preprocessor symbol must be defined. */
/* #undef NEED_THREAD_SAFE */
/* cpu-machine-OS */
#define OS "x86_64-unknown-openbsd6.2"
/* Name of package */
#define PACKAGE "c-ares"
/* Define to the address where bug reports for this package should be sent. */
#define PACKAGE_BUGREPORT "c-ares mailing list: http://cool.haxx.se/mailman/listinfo/c-ares"
/* Define to the full name of this package. */
#define PACKAGE_NAME "c-ares"
/* Define to the full name and version of this package. */
#define PACKAGE_STRING "c-ares 1.13.0"
/* Define to the one symbol short name of this package. */
#define PACKAGE_TARNAME "c-ares"
/* Define to the home page for this package. */
#define PACKAGE_URL ""
/* Define to the version of this package. */
#define PACKAGE_VERSION "1.13.0"
/* a suitable file/device to read random data from */
#define RANDOM_FILE "/dev/urandom"
/* Define to the type qualifier pointed by arg 5 for recvfrom. */
#define RECVFROM_QUAL_ARG5
/* Define to the type of arg 1 for recvfrom. */
#define RECVFROM_TYPE_ARG1 int
/* Define to the type pointed by arg 2 for recvfrom. */
#define RECVFROM_TYPE_ARG2 void
/* Define to 1 if the type pointed by arg 2 for recvfrom is void. */
#define RECVFROM_TYPE_ARG2_IS_VOID 1
/* Define to the type of arg 3 for recvfrom. */
#define RECVFROM_TYPE_ARG3 size_t
/* Define to the type of arg 4 for recvfrom. */
#define RECVFROM_TYPE_ARG4 int
/* Define to the type pointed by arg 5 for recvfrom. */
#define RECVFROM_TYPE_ARG5 struct sockaddr
/* Define to 1 if the type pointed by arg 5 for recvfrom is void. */
/* #undef RECVFROM_TYPE_ARG5_IS_VOID */
/* Define to the type pointed by arg 6 for recvfrom. */
#define RECVFROM_TYPE_ARG6 socklen_t
/* Define to 1 if the type pointed by arg 6 for recvfrom is void. */
/* #undef RECVFROM_TYPE_ARG6_IS_VOID */
/* Define to the function return type for recvfrom. */
#define RECVFROM_TYPE_RETV ssize_t
/* Define to the type of arg 1 for recv. */
#define RECV_TYPE_ARG1 int
/* Define to the type of arg 2 for recv. */
#define RECV_TYPE_ARG2 void *
/* Define to the type of arg 3 for recv. */
#define RECV_TYPE_ARG3 size_t
/* Define to the type of arg 4 for recv. */
#define RECV_TYPE_ARG4 int
/* Define to the function return type for recv. */
#define RECV_TYPE_RETV ssize_t
/* Define as the return type of signal handlers (`int' or `void'). */
#define RETSIGTYPE void
/* Define to the type qualifier of arg 2 for send. */
#define SEND_QUAL_ARG2 const
/* Define to the type of arg 1 for send. */
#define SEND_TYPE_ARG1 int
/* Define to the type of arg 2 for send. */
#define SEND_TYPE_ARG2 void *
/* Define to the type of arg 3 for send. */
#define SEND_TYPE_ARG3 size_t
/* Define to the type of arg 4 for send. */
#define SEND_TYPE_ARG4 int
/* Define to the function return type for send. */
#define SEND_TYPE_RETV ssize_t
/* Define to 1 if you have the ANSI C header files. */
#define STDC_HEADERS 1
/* Define to 1 if you can safely include both <sys/time.h> and <time.h>. */
#define TIME_WITH_SYS_TIME 1
/* Define to disable non-blocking sockets. */
/* #undef USE_BLOCKING_SOCKETS */
/* Version number of package */
#define VERSION "1.13.0"
/* Define to avoid automatic inclusion of winsock.h */
/* #undef WIN32_LEAN_AND_MEAN */
/* Define WORDS_BIGENDIAN to 1 if your processor stores words with the most
significant byte first (like Motorola and SPARC, unlike Intel). */
#if defined AC_APPLE_UNIVERSAL_BUILD
# if defined __BIG_ENDIAN__
# define WORDS_BIGENDIAN 1
# endif
#else
# ifndef WORDS_BIGENDIAN
/* # undef WORDS_BIGENDIAN */
# endif
#endif
/* Define to 1 if OS is AIX. */
#ifndef _ALL_SOURCE
/* # undef _ALL_SOURCE */
#endif
/* Enable large inode numbers on Mac OS X 10.5. */
#ifndef _DARWIN_USE_64_BIT_INODE
# define _DARWIN_USE_64_BIT_INODE 1
#endif
/* Number of bits in a file offset, on hosts where this is settable. */
/* #undef _FILE_OFFSET_BITS */
/* Define for large files, on AIX-style hosts. */
/* #undef _LARGE_FILES */
/* Define to empty if `const' does not conform to ANSI C. */
/* #undef const */
/* Type to use in place of in_addr_t when system does not provide it. */
/* #undef in_addr_t */
/* Define to `unsigned int' if <sys/types.h> does not define. */
/* #undef size_t */

@ -22,4 +22,4 @@ cd $(dirname $0)/../../..
source tools/internal_ci/helper_scripts/prepare_build_linux_rc
tools/interop_matrix/run_interop_matrix_tests.py --language=all --release=all --report_file=sponge_log.xml $@
tools/interop_matrix/run_interop_matrix_tests.py --language=all --release=all --report_file=sponge_log.xml --bq_result_table interop_results $@

@ -36,6 +36,7 @@ sys.path.append(python_util_dir)
import dockerjob
import jobset
import report_utils
import upload_test_results
_LANGUAGES = client_matrix.LANG_RUNTIME_MATRIX.keys()
# All gRPC release tags, flattened, deduped and sorted.
@ -74,6 +75,11 @@ argp.add_argument('--allow_flakes',
const=True,
help=('Allow flaky tests to show as passing (re-runs failed '
'tests up to five times)'))
argp.add_argument('--bq_result_table',
default='',
type=str,
nargs='?',
help='Upload test results to a specified BQ table.')
args = argp.parse_args()
@ -117,7 +123,7 @@ def find_all_images_for_lang(lang):
# caches test cases (list of JobSpec) loaded from file. Keyed by lang and runtime.
_loaded_testcases = {}
def find_test_cases(lang, release):
def find_test_cases(lang, release, suite_name):
"""Returns the list of test cases from testcase files per lang/release."""
file_tmpl = os.path.join(os.path.dirname(__file__), 'testcases/%s__%s')
if not os.path.exists(file_tmpl % (lang, release)):
@ -134,8 +140,12 @@ def find_test_cases(lang, release):
if line.startswith('docker run'):
m = re.search('--test_case=(.*)"', line)
shortname = m.group(1) if m else 'unknown_test'
m = re.search('--server_host_override=(.*).sandbox.googleapis.com',
line)
server = m.group(1) if m else 'unknown_server'
spec = jobset.JobSpec(cmdline=line,
shortname=shortname,
shortname='%s:%s:%s:%s' % (suite_name, lang,
server, shortname),
timeout_seconds=_TEST_TIMEOUT,
shell=True,
flake_retries=5 if args.allow_flakes else 0)
@ -163,11 +173,15 @@ def run_tests_for_lang(lang, runtime, images):
# Download the docker image before running each test case.
subprocess.check_call(['gcloud', 'docker', '--', 'pull', image])
_docker_images_cleanup.append(image)
job_spec_list = find_test_cases(lang,release)
suite_name = '%s__%s_%s' % (lang, runtime, release)
job_spec_list = find_test_cases(lang, release, suite_name)
num_failures, resultset = jobset.run(job_spec_list,
newline_on_success=True,
add_env={'docker_image':image},
maxjobs=args.jobs)
if args.bq_result_table and resultset:
upload_test_results.upload_interop_results_to_bq(
resultset, args.bq_result_table, args)
if num_failures:
jobset.message('FAILED', 'Some tests failed', do_newline=True)
total_num_failures += num_failures
@ -178,7 +192,7 @@ def run_tests_for_lang(lang, runtime, images):
_xml_report_tree,
resultset,
'grpc_interop_matrix',
'%s__%s %s'%(lang,runtime,release),
suite_name,
str(uuid.uuid4()))
return total_num_failures

@ -7485,7 +7485,9 @@
"third_party/cares/cares/config-win32.h",
"third_party/cares/cares/setup_once.h",
"third_party/cares/config_darwin/ares_config.h",
"third_party/cares/config_linux/ares_config.h"
"third_party/cares/config_freebsd/ares_config.h",
"third_party/cares/config_linux/ares_config.h",
"third_party/cares/config_openbsd/ares_config.h"
],
"is_filegroup": false,
"language": "c",

@ -29976,52 +29976,6 @@
"posix"
]
},
{
"args": [
"write_buffering"
],
"ci_platforms": [
"windows",
"linux",
"mac",
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"language": "c",
"name": "inproc_test",
"platforms": [
"windows",
"linux",
"mac",
"posix"
]
},
{
"args": [
"write_buffering_at_end"
],
"ci_platforms": [
"windows",
"linux",
"mac",
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"language": "c",
"name": "inproc_test",
"platforms": [
"windows",
"linux",
"mac",
"posix"
]
},
{
"args": [
"authority_not_supported"
@ -48359,52 +48313,6 @@
"posix"
]
},
{
"args": [
"write_buffering"
],
"ci_platforms": [
"windows",
"linux",
"mac",
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"language": "c",
"name": "inproc_nosec_test",
"platforms": [
"windows",
"linux",
"mac",
"posix"
]
},
{
"args": [
"write_buffering_at_end"
],
"ci_platforms": [
"windows",
"linux",
"mac",
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"language": "c",
"name": "inproc_nosec_test",
"platforms": [
"windows",
"linux",
"mac",
"posix"
]
},
{
"args": [
"--scenarios_json",

@ -101,6 +101,10 @@ def massage_qps_stats(scenario_result):
stats["core_combiner_locks_scheduled_items"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_scheduled_items")
stats["core_combiner_locks_scheduled_final_items"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_scheduled_final_items")
stats["core_combiner_locks_offloaded"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_offloaded")
stats["core_call_combiner_locks_initiated"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_locks_initiated")
stats["core_call_combiner_locks_scheduled_items"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_locks_scheduled_items")
stats["core_call_combiner_set_notify_on_cancel"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_set_notify_on_cancel")
stats["core_call_combiner_cancelled"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_cancelled")
stats["core_executor_scheduled_short_items"] = massage_qps_stats_helpers.counter(core_stats, "executor_scheduled_short_items")
stats["core_executor_scheduled_long_items"] = massage_qps_stats_helpers.counter(core_stats, "executor_scheduled_long_items")
stats["core_executor_scheduled_to_self"] = massage_qps_stats_helpers.counter(core_stats, "executor_scheduled_to_self")
@ -109,6 +113,9 @@ def massage_qps_stats(scenario_result):
stats["core_executor_push_retries"] = massage_qps_stats_helpers.counter(core_stats, "executor_push_retries")
stats["core_server_requested_calls"] = massage_qps_stats_helpers.counter(core_stats, "server_requested_calls")
stats["core_server_slowpath_requests_queued"] = massage_qps_stats_helpers.counter(core_stats, "server_slowpath_requests_queued")
stats["core_cq_ev_queue_trylock_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_failures")
stats["core_cq_ev_queue_trylock_successes"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_successes")
stats["core_cq_ev_queue_transient_pop_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_transient_pop_failures")
h = massage_qps_stats_helpers.histogram(core_stats, "call_initial_size")
stats["core_call_initial_size"] = ",".join("%f" % x for x in h.buckets)
stats["core_call_initial_size_bkts"] = ",".join("%f" % x for x in h.boundaries)

@ -515,6 +515,26 @@
"name": "core_combiner_locks_offloaded",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_combiner_locks_initiated",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_combiner_locks_scheduled_items",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_combiner_set_notify_on_cancel",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_combiner_cancelled",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_executor_scheduled_short_items",
@ -555,6 +575,21 @@
"name": "core_server_slowpath_requests_queued",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_cq_ev_queue_trylock_failures",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_cq_ev_queue_trylock_successes",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_cq_ev_queue_transient_pop_failures",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_initial_size",
@ -1312,6 +1347,26 @@
"name": "core_combiner_locks_offloaded",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_combiner_locks_initiated",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_combiner_locks_scheduled_items",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_combiner_set_notify_on_cancel",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_combiner_cancelled",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_executor_scheduled_short_items",
@ -1352,6 +1407,21 @@
"name": "core_server_slowpath_requests_queued",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_cq_ev_queue_trylock_failures",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_cq_ev_queue_trylock_successes",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_cq_ev_queue_transient_pop_failures",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "core_call_initial_size",

@ -677,7 +677,7 @@ def cloud_to_prod_jobspec(language, test_case, server_host_name,
cmdline=cmdline,
cwd=cwd,
environ=environ,
shortname='%s:%s:%s:%s' % (suite_name, server_host_name, language,
shortname='%s:%s:%s:%s' % (suite_name, language, server_host_name,
test_case),
timeout_seconds=_TEST_TIMEOUT,
flake_retries=5 if args.allow_flakes else 0,

@ -286,7 +286,7 @@ class CLanguage(object):
continue
polling_strategies = (_POLLING_STRATEGIES.get(self.platform, ['all'])
if target.get('uses_polling', True)
else ['all'])
else ['none'])
if self.args.iomgr_platform == 'uv':
polling_strategies = ['all']
for polling_strategy in polling_strategies:

Loading…
Cancel
Save