Merge remote-tracking branch 'upstream/master' into interop-client-additional-metadata

pull/18156/head
Michael Behr 6 years ago
commit ddf7666fdf
  1. 1
      CMakeLists.txt
  2. 5
      Makefile
  3. 11
      bazel/generate_cc.bzl
  4. 8
      bazel/grpc_deps.bzl
  5. 1
      build.yaml
  6. 3
      doc/environment_variables.md
  7. 4
      doc/server_reflection_tutorial.md
  8. 15
      include/grpcpp/impl/codegen/byte_buffer.h
  9. 4
      setup.py
  10. 18
      src/compiler/cpp_generator.cc
  11. 2
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  12. 23
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  13. 2
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  14. 11
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  15. 2
      src/core/ext/transport/chttp2/transport/hpack_parser.cc
  16. 5
      src/core/ext/transport/inproc/inproc_transport.cc
  17. 12
      src/core/lib/gprpp/atomic.h
  18. 44
      src/core/lib/slice/slice.cc
  19. 280
      src/core/lib/surface/completion_queue.cc
  20. 15
      src/cpp/server/server_context.cc
  21. 14
      src/cpp/util/byte_buffer_cc.cc
  22. 69
      src/python/grpcio/commands.py
  23. 5
      src/python/grpcio/grpc/_server.py
  24. 2
      src/python/grpcio_tests/tests/unit/thread_pool.py
  25. 2
      templates/Makefile.template
  26. 2
      test/core/end2end/bad_server_response_test.cc
  27. 2
      test/core/end2end/tests/bad_ping.cc
  28. 2
      test/core/tsi/alts/handshaker/alts_tsi_utils_test.cc
  29. 4
      test/cpp/codegen/compiler_test_golden
  30. 1
      test/cpp/end2end/BUILD
  31. 745
      test/cpp/end2end/client_callback_end2end_test.cc
  32. 15
      test/cpp/end2end/test_service_impl.cc
  33. 2
      test/cpp/interop/metrics_client.cc
  34. 11
      test/cpp/microbenchmarks/helpers.cc
  35. 8
      test/cpp/microbenchmarks/helpers.h
  36. 2
      test/cpp/naming/address_sorting_test.cc
  37. 3
      tools/run_tests/generated/sources_and_headers.json

@ -12441,6 +12441,7 @@ if (gRPC_BUILD_TESTS)
add_executable(client_callback_end2end_test
test/cpp/end2end/client_callback_end2end_test.cc
test/cpp/end2end/interceptors_util.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)

@ -809,7 +809,7 @@ ifeq ($(HAS_SYSTEM_PROTOBUF),true)
ifeq ($(HAS_PKG_CONFIG),true)
PROTOBUF_PKG_CONFIG = true
PC_REQUIRES_GRPCXX = protobuf
CPPFLAGS := $(shell $(PKG_CONFIG) --cflags protobuf) $(CPPFLAGS)
CPPFLAGS := $(CPPFLAGS) $(shell $(PKG_CONFIG) --cflags protobuf)
LDFLAGS_PROTOBUF_PKG_CONFIG = $(shell $(PKG_CONFIG) --libs-only-L protobuf)
ifeq ($(SYSTEM),Linux)
ifneq ($(LDFLAGS_PROTOBUF_PKG_CONFIG),)
@ -17464,6 +17464,7 @@ endif
CLIENT_CALLBACK_END2END_TEST_SRC = \
test/cpp/end2end/client_callback_end2end_test.cc \
test/cpp/end2end/interceptors_util.cc \
CLIENT_CALLBACK_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_CALLBACK_END2END_TEST_SRC))))
ifeq ($(NO_SECURE),true)
@ -17496,6 +17497,8 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_callback_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_client_callback_end2end_test: $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)

@ -28,7 +28,7 @@ def generate_cc_impl(ctx):
else:
outs += [proto.path[label_len:-len(".proto")] + ".pb.h" for proto in protos]
outs += [proto.path[label_len:-len(".proto")] + ".pb.cc" for proto in protos]
out_files = [ctx.new_file(out) for out in outs]
out_files = [ctx.actions.declare_file(out) for out in outs]
dir_out = str(ctx.genfiles_dir.path + proto_root)
arguments = []
@ -38,10 +38,10 @@ def generate_cc_impl(ctx):
if ctx.attr.generate_mocks:
flags.append("generate_mock_code=true")
arguments += ["--PLUGIN_out=" + ",".join(flags) + ":" + dir_out]
additional_input = [ctx.executable.plugin]
tools = [ctx.executable.plugin]
else:
arguments += ["--cpp_out=" + ",".join(ctx.attr.flags) + ":" + dir_out]
additional_input = []
tools = []
# Import protos relative to their workspace root so that protoc prints the
# right include paths.
@ -70,8 +70,9 @@ def generate_cc_impl(ctx):
arguments += ["-I{0}".format(f + "/../..")]
well_known_proto_files = [f for f in ctx.attr.well_known_protos.files]
ctx.action(
inputs = protos + includes + additional_input + well_known_proto_files,
ctx.actions.run(
inputs = protos + includes + well_known_proto_files,
tools = tools,
outputs = out_files,
executable = ctx.executable._protoc,
arguments = arguments,

@ -170,8 +170,8 @@ def grpc_deps():
if "com_google_absl" not in native.existing_rules():
http_archive(
name = "com_google_absl",
strip_prefix = "abseil-cpp-cd95e71df6eaf8f2a282b1da556c2cf1c9b09207",
url = "https://github.com/abseil/abseil-cpp/archive/cd95e71df6eaf8f2a282b1da556c2cf1c9b09207.tar.gz",
strip_prefix = "abseil-cpp-308ce31528a7edfa39f5f6d36142278a0ae1bf45",
url = "https://github.com/abseil/abseil-cpp/archive/308ce31528a7edfa39f5f6d36142278a0ae1bf45.tar.gz",
)
if "com_github_bazelbuild_bazeltoolchains" not in native.existing_rules():
@ -196,8 +196,8 @@ def grpc_deps():
if "io_opencensus_cpp" not in native.existing_rules():
http_archive(
name = "io_opencensus_cpp",
strip_prefix = "opencensus-cpp-fdf0f308b1631bb4a942e32ba5d22536a6170274",
url = "https://github.com/census-instrumentation/opencensus-cpp/archive/fdf0f308b1631bb4a942e32ba5d22536a6170274.tar.gz",
strip_prefix = "opencensus-cpp-03dff0352522983ffdee48cedbf87cbe37f1bb7f",
url = "https://github.com/census-instrumentation/opencensus-cpp/archive/03dff0352522983ffdee48cedbf87cbe37f1bb7f.tar.gz",
)
if "upb" not in native.existing_rules():

@ -4468,6 +4468,7 @@ targets:
language: c++
src:
- test/cpp/end2end/client_callback_end2end_test.cc
- test/cpp/end2end/interceptors_util.cc
deps:
- grpc++_test_util
- grpc_test_util

@ -41,6 +41,9 @@ some configuration as environment variables that can be set.
- bdp_estimator - traces behavior of bdp estimation logic
- call_combiner - traces call combiner state
- call_error - traces the possible errors contributing to final call status
- cares_resolver - traces operations of the c-ares based DNS resolver
- cares_address_sorting - traces operations of the c-ares based DNS
resolver's resolved address sorter
- channel - traces operations on the C core channel stack
- client_channel - traces client channel activity, including resolver
and load balancing policy interaction

@ -15,8 +15,8 @@ server reflection, you can link this library to your server binary.
Some platforms (e.g. Ubuntu 11.10 onwards) only link in libraries that directly
contain symbols used by the application. On these platforms, LD flag
`--no-as-needed` is needed for for dynamic linking and `--whole-archive` is
needed for for static linking.
`--no-as-needed` is needed for dynamic linking and `--whole-archive` is
needed for static linking.
This [Makefile](../examples/cpp/helloworld/Makefile#L37#L45) demonstrates
enabling c++ server reflection on Linux and MacOS.

@ -96,7 +96,7 @@ class ByteBuffer final {
/// \a buf. Wrapper of core function grpc_byte_buffer_copy . This is not
/// a deep copy; it is just a referencing. As a result, its performance is
/// size-independent.
ByteBuffer(const ByteBuffer& buf);
ByteBuffer(const ByteBuffer& buf) : buffer_(nullptr) { operator=(buf); }
~ByteBuffer() {
if (buffer_) {
@ -107,7 +107,16 @@ class ByteBuffer final {
/// Wrapper of core function grpc_byte_buffer_copy . This is not
/// a deep copy; it is just a referencing. As a result, its performance is
/// size-independent.
ByteBuffer& operator=(const ByteBuffer&);
ByteBuffer& operator=(const ByteBuffer& buf) {
if (this != &buf) {
Clear(); // first remove existing data
}
if (buf.buffer_) {
// then copy
buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buf.buffer_);
}
return *this;
}
/// Dump (read) the buffer contents into \a slices.
Status Dump(std::vector<Slice>* slices) const;
@ -215,7 +224,7 @@ class SerializationTraits<ByteBuffer, void> {
bool* own_buffer) {
*buffer = source;
*own_buffer = true;
return Status::OK;
return g_core_codegen_interface->ok();
}
};

@ -159,7 +159,7 @@ if EXTRA_ENV_COMPILE_ARGS is None:
elif "linux" in sys.platform:
EXTRA_ENV_COMPILE_ARGS += ' -std=gnu99 -fvisibility=hidden -fno-wrapv -fno-exceptions'
elif "darwin" in sys.platform:
EXTRA_ENV_COMPILE_ARGS += ' -fvisibility=hidden -fno-wrapv -fno-exceptions'
EXTRA_ENV_COMPILE_ARGS += ' -stdlib=libc++ -fvisibility=hidden -fno-wrapv -fno-exceptions'
EXTRA_ENV_COMPILE_ARGS += ' -DPB_FIELD_32BIT'
if EXTRA_ENV_LINK_ARGS is None:
@ -265,7 +265,7 @@ def cython_extensions_and_necessity():
for name in CYTHON_EXTENSION_MODULE_NAMES]
config = os.environ.get('CONFIG', 'opt')
prefix = 'libs/' + config + '/'
if "darwin" in sys.platform or USE_PREBUILT_GRPC_CORE:
if USE_PREBUILT_GRPC_CORE:
extra_objects = [prefix + 'libares.a',
prefix + 'libboringssl.a',
prefix + 'libgpr.a',

@ -580,6 +580,10 @@ void PrintHeaderClientMethodCallbackInterfaces(
"virtual void $Method$(::grpc::ClientContext* context, "
"const $Request$* request, $Response$* response, "
"std::function<void(::grpc::Status)>) = 0;\n");
printer->Print(*vars,
"virtual void $Method$(::grpc::ClientContext* context, "
"const ::grpc::ByteBuffer* request, $Response$* response, "
"std::function<void(::grpc::Status)>) = 0;\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"virtual void $Method$(::grpc::ClientContext* context, "
@ -642,6 +646,10 @@ void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer,
"void $Method$(::grpc::ClientContext* context, "
"const $Request$* request, $Response$* response, "
"std::function<void(::grpc::Status)>) override;\n");
printer->Print(*vars,
"void $Method$(::grpc::ClientContext* context, "
"const ::grpc::ByteBuffer* request, $Response$* response, "
"std::function<void(::grpc::Status)>) override;\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"void $Method$(::grpc::ClientContext* context, "
@ -1643,6 +1651,16 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"(stub_->channel_.get(), stub_->rpcmethod_$Method$_, "
"context, request, response, std::move(f));\n}\n\n");
printer->Print(*vars,
"void $ns$$Service$::Stub::experimental_async::$Method$("
"::grpc::ClientContext* context, "
"const ::grpc::ByteBuffer* request, $Response$* response, "
"std::function<void(::grpc::Status)> f) {\n");
printer->Print(*vars,
" return ::grpc::internal::CallbackUnaryCall"
"(stub_->channel_.get(), stub_->rpcmethod_$Method$_, "
"context, request, response, std::move(f));\n}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;

@ -489,7 +489,7 @@ void grpc_resolver_dns_ares_init() {
address_sorting_init();
grpc_error* error = grpc_ares_init();
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("ares_library_init() failed", error);
GRPC_LOG_IF_ERROR("grpc_ares_init() failed", error);
return;
}
if (default_resolver == nullptr) {

@ -47,9 +47,6 @@
using grpc_core::ServerAddress;
using grpc_core::ServerAddressList;
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
grpc_core::TraceFlag grpc_trace_cares_address_sorting(false,
"cares_address_sorting");
@ -89,8 +86,6 @@ typedef struct grpc_ares_hostbyname_request {
bool is_balancer;
} grpc_ares_hostbyname_request;
static void do_basic_init(void) { gpr_mu_init(&g_init_mu); }
static void log_address_sorting_list(const ServerAddressList& addresses,
const char* input_output_str) {
for (size_t i = 0; i < addresses.size(); i++) {
@ -588,12 +583,12 @@ static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {
void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) =
grpc_cancel_ares_request_locked_impl;
// ares_library_init and ares_library_cleanup are currently no-op except under
// Windows. Calling them may cause race conditions when other parts of the
// binary calls these functions concurrently.
#ifdef GPR_WINDOWS
grpc_error* grpc_ares_init(void) {
gpr_once_init(&g_basic_init, do_basic_init);
gpr_mu_lock(&g_init_mu);
int status = ares_library_init(ARES_LIB_INIT_ALL);
gpr_mu_unlock(&g_init_mu);
if (status != ARES_SUCCESS) {
char* error_msg;
gpr_asprintf(&error_msg, "ares_library_init failed: %s",
@ -605,11 +600,11 @@ grpc_error* grpc_ares_init(void) {
return GRPC_ERROR_NONE;
}
void grpc_ares_cleanup(void) {
gpr_mu_lock(&g_init_mu);
ares_library_cleanup();
gpr_mu_unlock(&g_init_mu);
}
void grpc_ares_cleanup(void) { ares_library_cleanup(); }
#else
grpc_error* grpc_ares_init(void) { return GRPC_ERROR_NONE; }
void grpc_ares_cleanup(void) {}
#endif // GPR_WINDOWS
/*
* grpc_resolve_address_ares related structs and functions

@ -30,7 +30,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/slice/slice_internal.h"

@ -1062,12 +1062,15 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
bool closed = false;
if (error != GRPC_ERROR_NONE) {
close_transport_locked(t, GRPC_ERROR_REF(error));
closed = true;
}
if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
closed = true;
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
close_transport_locked(
t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
@ -1086,6 +1089,14 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
t->is_first_write_in_batch = false;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
// If the transport is closed, we will retry writing on the endpoint
// and next write may contain part of the currently serialized frames.
// So, we should only call the run_after_write callbacks when the next
// write finishes, or the callbacks will be invoked when the stream is
// closed.
if (!closed) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
}
GRPC_CLOSURE_RUN(
GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t,

@ -1452,7 +1452,7 @@ static grpc_error* begin_parse_string(grpc_chttp2_hpack_parser* p,
uint8_t binary,
grpc_chttp2_hpack_parser_string* str) {
if (!p->huff && binary == NOT_BINARY &&
(end - cur) >= static_cast<intptr_t>(p->strlen) &&
static_cast<uint32_t>(end - cur) >= p->strlen &&
p->current_slice_refcount != nullptr) {
GRPC_STATS_INC_HPACK_RECV_UNCOMPRESSED();
str->copied = false;

@ -1032,6 +1032,11 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
} else {
if (error != GRPC_ERROR_NONE) {
// Consume any send message that was sent here but that we are not pushing
// to the other side
if (op->send_message) {
op->payload->send_message.send_message.reset();
}
// Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
if (op->payload->recv_initial_metadata.trailing_metadata_available !=

@ -51,8 +51,9 @@ class Atomic {
bool CompareExchangeWeak(T* expected, T desired, MemoryOrder success,
MemoryOrder failure) {
return GPR_ATM_INC_CAS_THEN(
storage_.compare_exchange_weak(*expected, desired, success, failure));
return GPR_ATM_INC_CAS_THEN(storage_.compare_exchange_weak(
*expected, desired, static_cast<std::memory_order>(success),
static_cast<std::memory_order>(failure)));
}
bool CompareExchangeStrong(T* expected, T desired, MemoryOrder success,
@ -76,7 +77,7 @@ class Atomic {
// Atomically increment a counter only if the counter value is not zero.
// Returns true if increment took place; false if counter is zero.
bool IncrementIfNonzero(MemoryOrder load_order = MemoryOrder::ACQ_REL) {
bool IncrementIfNonzero(MemoryOrder load_order = MemoryOrder::ACQUIRE) {
T count = storage_.load(static_cast<std::memory_order>(load_order));
do {
// If zero, we are done (without an increment). If not, we must do a CAS
@ -85,9 +86,8 @@ class Atomic {
if (count == 0) {
return false;
}
} while (!storage_.AtomicCompareExchangeWeak(
&count, count + 1, static_cast<std::memory_order>(MemoryOrder::ACQ_REL),
static_cast<std::memory_order>(load_order)));
} while (!CompareExchangeWeak(&count, count + 1, MemoryOrder::ACQ_REL,
load_order));
return true;
}

@ -26,6 +26,7 @@
#include <string.h>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/iomgr/exec_ctx.h"
char* grpc_slice_to_c_string(grpc_slice slice) {
@ -213,21 +214,35 @@ grpc_slice grpc_slice_from_copied_string(const char* source) {
return grpc_slice_from_copied_buffer(source, strlen(source));
}
typedef struct {
namespace {
struct MallocRefCount {
MallocRefCount(const grpc_slice_refcount_vtable* vtable) {
base.vtable = vtable;
base.sub_refcount = &base;
}
void Ref() { refs.Ref(); }
void Unref() {
if (refs.Unref()) {
gpr_free(this);
}
}
grpc_slice_refcount base;
gpr_refcount refs;
} malloc_refcount;
grpc_core::RefCount refs;
};
} // namespace
static void malloc_ref(void* p) {
malloc_refcount* r = static_cast<malloc_refcount*>(p);
gpr_ref(&r->refs);
MallocRefCount* r = static_cast<MallocRefCount*>(p);
r->Ref();
}
static void malloc_unref(void* p) {
malloc_refcount* r = static_cast<malloc_refcount*>(p);
if (gpr_unref(&r->refs)) {
gpr_free(r);
}
MallocRefCount* r = static_cast<MallocRefCount*>(p);
r->Unref();
}
static const grpc_slice_refcount_vtable malloc_vtable = {
@ -246,15 +261,10 @@ grpc_slice grpc_slice_malloc_large(size_t length) {
refcount is a malloc_refcount
bytes is an array of bytes of the requested length
Both parts are placed in the same allocation returned from gpr_malloc */
malloc_refcount* rc = static_cast<malloc_refcount*>(
gpr_malloc(sizeof(malloc_refcount) + length));
/* Initial refcount on rc is 1 - and it's up to the caller to release
this reference. */
gpr_ref_init(&rc->refs, 1);
void* data =
static_cast<MallocRefCount*>(gpr_malloc(sizeof(MallocRefCount) + length));
rc->base.vtable = &malloc_vtable;
rc->base.sub_refcount = &rc->base;
auto* rc = new (data) MallocRefCount(&malloc_vtable);
/* Build up the slice to be returned. */
/* The slices refcount points back to the allocated block. */

@ -33,6 +33,7 @@
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
@ -44,6 +45,8 @@ grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
namespace {
// Specifies a cq thread local cache.
// The first event that occurs on a thread
// with a cq cache will go into that cache, and
@ -84,24 +87,22 @@ typedef struct {
grpc_closure* shutdown;
} non_polling_poller;
static size_t non_polling_poller_size(void) {
return sizeof(non_polling_poller);
}
size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
gpr_mu_init(&npp->mu);
*mu = &npp->mu;
}
static void non_polling_poller_destroy(grpc_pollset* pollset) {
void non_polling_poller_destroy(grpc_pollset* pollset) {
non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
gpr_mu_destroy(&npp->mu);
}
static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
grpc_pollset_worker** worker,
grpc_millis deadline) {
grpc_error* non_polling_poller_work(grpc_pollset* pollset,
grpc_pollset_worker** worker,
grpc_millis deadline) {
non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
if (npp->shutdown) return GRPC_ERROR_NONE;
if (npp->kicked_without_poller) {
@ -141,8 +142,8 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
return GRPC_ERROR_NONE;
}
static grpc_error* non_polling_poller_kick(
grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
grpc_error* non_polling_poller_kick(grpc_pollset* pollset,
grpc_pollset_worker* specific_worker) {
non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
if (specific_worker == nullptr)
specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
@ -159,8 +160,7 @@ static grpc_error* non_polling_poller_kick(
return GRPC_ERROR_NONE;
}
static void non_polling_poller_shutdown(grpc_pollset* pollset,
grpc_closure* closure) {
void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
GPR_ASSERT(closure != nullptr);
p->shutdown = closure;
@ -175,7 +175,7 @@ static void non_polling_poller_shutdown(grpc_pollset* pollset,
}
}
static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
/* GRPC_CQ_DEFAULT_POLLING */
{true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
@ -188,7 +188,9 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
non_polling_poller_shutdown, non_polling_poller_destroy},
};
typedef struct cq_vtable {
} // namespace
struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
void (*init)(void* data,
@ -203,80 +205,116 @@ typedef struct cq_vtable {
void* reserved);
grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
} cq_vtable;
};
namespace {
/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
* (a lockfree multiproducer single consumer queue). It uses a queue_lock
* to support multiple consumers.
* Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
typedef struct grpc_cq_event_queue {
class CqEventQueue {
public:
CqEventQueue() { gpr_mpscq_init(&queue_); }
~CqEventQueue() { gpr_mpscq_destroy(&queue_); }
/* Note: The counter is not incremented/decremented atomically with push/pop.
* The count is only eventually consistent */
intptr_t num_items() const {
return num_queue_items_.Load(grpc_core::MemoryOrder::RELAXED);
}
bool Push(grpc_cq_completion* c);
grpc_cq_completion* Pop();
private:
/* Spinlock to serialize consumers i.e pop() operations */
gpr_spinlock queue_lock;
gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
gpr_mpscq queue;
gpr_mpscq queue_;
/* A lazy counter of number of items in the queue. This is NOT atomically
incremented/decremented along with push/pop operations and hence is only
eventually consistent */
gpr_atm num_queue_items;
} grpc_cq_event_queue;
grpc_core::Atomic<intptr_t> num_queue_items_{0};
};
struct cq_next_data {
~cq_next_data() { GPR_ASSERT(queue.num_items() == 0); }
typedef struct cq_next_data {
/** Completed events for completion-queues of type GRPC_CQ_NEXT */
grpc_cq_event_queue queue;
CqEventQueue queue;
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
grpc_core::Atomic<intptr_t> things_queued_ever{0};
/* Number of outstanding events (+1 if not shut down) */
gpr_atm pending_events;
/** Number of outstanding events (+1 if not shut down)
Initial count is dropped by grpc_completion_queue_shutdown */
grpc_core::Atomic<intptr_t> pending_events{1};
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called;
} cq_next_data;
bool shutdown_called = false;
};
struct cq_pluck_data {
cq_pluck_data() {
completed_tail = &completed_head;
completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
}
~cq_pluck_data() {
GPR_ASSERT(completed_head.next ==
reinterpret_cast<uintptr_t>(&completed_head));
}
typedef struct cq_pluck_data {
/** Completed events for completion-queues of type GRPC_CQ_PLUCK */
grpc_cq_completion completed_head;
grpc_cq_completion* completed_tail;
/** Number of pending events (+1 if we're not shutdown) */
gpr_atm pending_events;
/** Number of pending events (+1 if we're not shutdown).
Initial count is dropped by grpc_completion_queue_shutdown. */
grpc_core::Atomic<intptr_t> pending_events{1};
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
grpc_core::Atomic<intptr_t> things_queued_ever{0};
/** 0 initially. 1 once we completed shutting */
/* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
* (pending_events == 0). So consider removing this in future and use
* pending_events */
gpr_atm shutdown;
grpc_core::Atomic<bool> shutdown{false};
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called;
bool shutdown_called = false;
int num_pluckers;
int num_pluckers = 0;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
} cq_pluck_data;
};
typedef struct cq_callback_data {
struct cq_callback_data {
cq_callback_data(
grpc_experimental_completion_queue_functor* shutdown_callback)
: shutdown_callback(shutdown_callback) {}
/** No actual completed events queue, unlike other types */
/** Number of pending events (+1 if we're not shutdown) */
gpr_atm pending_events;
/** Number of pending events (+1 if we're not shutdown).
Initial count is dropped by grpc_completion_queue_shutdown. */
grpc_core::Atomic<intptr_t> pending_events{1};
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
grpc_core::Atomic<intptr_t> things_queued_ever{0};
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called;
bool shutdown_called = false;
/** A callback that gets invoked when the CQ completes shutdown */
grpc_experimental_completion_queue_functor* shutdown_callback;
} cq_callback_data;
};
} // namespace
/* Completion queue structure */
struct grpc_completion_queue {
@ -408,7 +446,7 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
storage->done(storage->done_arg, storage);
ret = 1;
cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(cq);
@ -422,31 +460,21 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
return ret;
}
static void cq_event_queue_init(grpc_cq_event_queue* q) {
gpr_mpscq_init(&q->queue);
q->queue_lock = GPR_SPINLOCK_INITIALIZER;
gpr_atm_no_barrier_store(&q->num_queue_items, 0);
bool CqEventQueue::Push(grpc_cq_completion* c) {
gpr_mpscq_push(&queue_, reinterpret_cast<gpr_mpscq_node*>(c));
return num_queue_items_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED) == 0;
}
static void cq_event_queue_destroy(grpc_cq_event_queue* q) {
gpr_mpscq_destroy(&q->queue);
}
static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
gpr_mpscq_push(&q->queue, reinterpret_cast<gpr_mpscq_node*>(c));
return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
}
static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
grpc_cq_completion* CqEventQueue::Pop() {
grpc_cq_completion* c = nullptr;
if (gpr_spinlock_trylock(&q->queue_lock)) {
if (gpr_spinlock_trylock(&queue_lock_)) {
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
bool is_empty = false;
c = reinterpret_cast<grpc_cq_completion*>(
gpr_mpscq_pop_and_check_end(&q->queue, &is_empty));
gpr_spinlock_unlock(&q->queue_lock);
gpr_mpscq_pop_and_check_end(&queue_, &is_empty));
gpr_spinlock_unlock(&queue_lock_);
if (c == nullptr && !is_empty) {
GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
@ -456,18 +484,12 @@ static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
}
if (c) {
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
num_queue_items_.FetchSub(1, grpc_core::MemoryOrder::RELAXED);
}
return c;
}
/* Note: The counter is not incremented/decremented atomically with push/pop.
* The count is only eventually consistent */
static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
return static_cast<long>(gpr_atm_no_barrier_load(&q->num_queue_items));
}
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
grpc_experimental_completion_queue_functor* shutdown_callback) {
@ -507,49 +529,33 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
static void cq_init_next(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_next_data* cqd = static_cast<cq_next_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
cq_event_queue_init(&cqd->queue);
new (data) cq_next_data();
}
static void cq_destroy_next(void* data) {
cq_next_data* cqd = static_cast<cq_next_data*>(data);
GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
cq_event_queue_destroy(&cqd->queue);
cqd->~cq_next_data();
}
static void cq_init_pluck(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
gpr_atm_no_barrier_store(&cqd->shutdown, 0);
cqd->shutdown_called = false;
cqd->num_pluckers = 0;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
new (data) cq_pluck_data();
}
static void cq_destroy_pluck(void* data) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
cqd->~cq_pluck_data();
}
static void cq_init_callback(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
cqd->shutdown_callback = shutdown_callback;
new (data) cq_callback_data(shutdown_callback);
}
static void cq_destroy_callback(void* data) {}
static void cq_destroy_callback(void* data) {
cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
cqd->~cq_callback_data();
}
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
return cq->vtable->cq_completion_type;
@ -632,37 +638,19 @@ static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
#endif
/* Atomically increments a counter only if the counter is not zero. Returns
* true if the increment was successful; false if the counter is zero */
static bool atm_inc_if_nonzero(gpr_atm* counter) {
while (true) {
gpr_atm count = gpr_atm_acq_load(counter);
/* If zero, we are done. If not, we must to a CAS (instead of an atomic
* increment) to maintain the contract: do not increment the counter if it
* is zero. */
if (count == 0) {
return false;
} else if (gpr_atm_full_cas(counter, count, count + 1)) {
break;
}
}
return true;
}
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) {
cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
return cqd->pending_events.IncrementIfNonzero();
}
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
return cqd->pending_events.IncrementIfNonzero();
}
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) {
cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
return cqd->pending_events.IncrementIfNonzero();
}
bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
@ -716,17 +704,14 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
gpr_tls_set(&g_cached_event, (intptr_t)storage);
} else {
/* Add the completion to the queue */
bool is_first = cq_event_queue_push(&cqd->queue, storage);
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
bool is_first = cqd->queue.Push(storage);
cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
/* Since we do not hold the cq lock here, it is important to do an 'acquire'
load here (instead of a 'no_barrier' load) to match with the release
store
(done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
(done via pending_events.FetchSub(1, ACQ_REL)) in cq_shutdown_next
*/
bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
if (!will_definitely_shutdown) {
if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 1) {
/* Only kick if this is the first item queued */
if (is_first) {
gpr_mu_lock(cq->mu);
@ -740,7 +725,8 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
GRPC_ERROR_UNREF(kick_error);
}
}
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) ==
1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(cq);
@ -749,7 +735,7 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
}
} else {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_atm_rel_store(&cqd->pending_events, 0);
cqd->pending_events.Store(0, grpc_core::MemoryOrder::RELEASE);
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
@ -795,12 +781,12 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
cq_check_tag(cq, tag, false); /* Used in debug builds only */
/* Add to the list of completions */
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
cqd->completed_tail->next =
((uintptr_t)storage) | (1u & cqd->completed_tail->next);
cqd->completed_tail = storage;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
cq_finish_shutdown_pluck(cq);
gpr_mu_unlock(cq->mu);
} else {
@ -856,8 +842,8 @@ static void cq_end_op_for_callback(
cq_check_tag(cq, tag, true); /* Used in debug builds only */
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
cq_finish_shutdown_callback(cq);
}
@ -893,20 +879,20 @@ class ExecCtxNext : public grpc_core::ExecCtx {
cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == nullptr);
gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
intptr_t current_last_seen_things_queued_ever =
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
if (current_last_seen_things_queued_ever !=
a->last_seen_things_queued_ever) {
a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
/* Pop a cq_completion from the queue. Returns NULL if the queue is empty
* might return NULL in some cases even if the queue is not empty; but
* that
* is ok and doesn't affect correctness. Might effect the tail latencies a
* bit) */
a->stolen_completion = cq_event_queue_pop(&cqd->queue);
a->stolen_completion = cqd->queue.Pop();
if (a->stolen_completion != nullptr) {
return true;
}
@ -965,7 +951,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
cq,
deadline_millis,
nullptr,
@ -985,7 +971,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
break;
}
grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue);
grpc_cq_completion* c = cqd->queue.Pop();
if (c != nullptr) {
ret.type = GRPC_OP_COMPLETE;
@ -999,16 +985,16 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
so that the thread comes back quickly from poll to make a second
attempt at popping. Not doing this can potentially deadlock this
thread forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
if (cqd->queue.num_items() > 0) {
iteration_deadline = 0;
}
}
if (gpr_atm_acq_load(&cqd->pending_events) == 0) {
if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) == 0) {
/* Before returning, check if the queue has any items left over (since
gpr_mpscq_pop() can sometimes return NULL even if the queue is not
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
if (cqd->queue.num_items() > 0) {
/* Go to the beginning of the loop. No point doing a poll because
(cq->shutdown == true) is only possible when there is no pending
work (i.e cq->pending_events == 0) and any outstanding completion
@ -1049,8 +1035,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
is_finished_arg.first_loop = false;
}
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_acq_load(&cqd->pending_events) > 0) {
if (cqd->queue.num_items() > 0 &&
cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) > 0) {
gpr_mu_lock(cq->mu);
cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
gpr_mu_unlock(cq->mu);
@ -1074,7 +1060,7 @@ static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
GPR_ASSERT(cqd->pending_events.Load(grpc_core::MemoryOrder::RELAXED) == 0);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
@ -1096,10 +1082,10 @@ static void cq_shutdown_next(grpc_completion_queue* cq) {
return;
}
cqd->shutdown_called = true;
/* Doing a full_fetch_add (i.e acq/release) here to match with
/* Doing acq/release FetchSub here to match with
* cq_begin_op_for_next and cq_end_op_for_next functions which read/write
* on this counter without necessarily holding a lock on cq */
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
cq_finish_shutdown_next(cq);
}
gpr_mu_unlock(cq->mu);
@ -1148,12 +1134,12 @@ class ExecCtxPluck : public grpc_core::ExecCtx {
GPR_ASSERT(a->stolen_completion == nullptr);
gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
if (current_last_seen_things_queued_ever !=
a->last_seen_things_queued_ever) {
gpr_mu_lock(cq->mu);
a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
grpc_cq_completion* c;
grpc_cq_completion* prev = &cqd->completed_head;
while ((c = (grpc_cq_completion*)(prev->next &
@ -1209,7 +1195,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_mu_lock(cq->mu);
grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
cq,
deadline_millis,
nullptr,
@ -1246,7 +1232,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
}
prev = c;
}
if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
if (cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)) {
gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
@ -1309,8 +1295,8 @@ static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
gpr_atm_no_barrier_store(&cqd->shutdown, 1);
GPR_ASSERT(!cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED));
cqd->shutdown.Store(1, grpc_core::MemoryOrder::RELAXED);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
@ -1334,7 +1320,7 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) {
return;
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
cq_finish_shutdown_pluck(cq);
}
gpr_mu_unlock(cq->mu);
@ -1368,7 +1354,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
return;
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
gpr_mu_unlock(cq->mu);
cq_finish_shutdown_callback(cq);
} else {

@ -138,7 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
}
internal::Call call_;
internal::ServerReactor* reactor_;
internal::ServerReactor* const reactor_;
bool has_tag_;
void* tag_;
void* core_cq_tag_;
@ -200,12 +200,17 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
cancelled_ = 1;
}
if (cancelled_ && (reactor_ != nullptr)) {
// Decide whether to call the cancel callback before releasing the lock
bool call_cancel = (cancelled_ != 0);
// Release the lock since we are going to be calling a callback and
// interceptors now
lock.unlock();
if (call_cancel && (reactor_ != nullptr)) {
reactor_->OnCancel();
}
/* Release the lock since we are going to be running through interceptors now
*/
lock.unlock();
/* Add interception point and run through interceptors */
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_CLOSE);

@ -43,18 +43,4 @@ Status ByteBuffer::Dump(std::vector<Slice>* slices) const {
return Status::OK;
}
ByteBuffer::ByteBuffer(const ByteBuffer& buf) : buffer_(nullptr) {
operator=(buf);
}
ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) {
if (this != &buf) {
Clear(); // first remove existing data
}
if (buf.buffer_) {
buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy
}
return *this;
}
} // namespace grpc

@ -212,50 +212,39 @@ class BuildExt(build_ext.build_ext):
LINK_OPTIONS = {}
def build_extensions(self):
def compiler_ok_with_extra_std():
"""Test if default compiler is okay with specifying c++ version
when invoked in C mode. GCC is okay with this, while clang is not.
"""
cc_test = subprocess.Popen(
['cc', '-x', 'c', '-std=c++11', '-'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
_, cc_err = cc_test.communicate(input='int main(){return 0;}')
return not 'invalid argument' in str(cc_err)
# This special conditioning is here due to difference of compiler
# behavior in gcc and clang. The clang doesn't take --stdc++11
# flags but gcc does. Since the setuptools of Python only support
# all C or all C++ compilation, the mix of C and C++ will crash.
# *By default*, the macOS use clang and Linux use gcc, that's why
# the special condition here is checking platform.
if "darwin" in sys.platform:
config = os.environ.get('CONFIG', 'opt')
target_path = os.path.abspath(
os.path.join(
os.path.dirname(os.path.realpath(__file__)), '..', '..',
'..', 'libs', config))
targets = [
os.path.join(target_path, 'libboringssl.a'),
os.path.join(target_path, 'libares.a'),
os.path.join(target_path, 'libgpr.a'),
os.path.join(target_path, 'libgrpc.a')
]
# Running make separately for Mac means we lose all
# Extension.define_macros configured in setup.py. Re-add the macro
# for gRPC Core's fork handlers.
# TODO(ericgribkoff) Decide what to do about the other missing core
# macros, including GRPC_ENABLE_FORK_SUPPORT, which defaults to 1
# on Linux but remains unset on Mac.
extra_defines = [
'EXTRA_DEFINES="GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1"'
]
# Ensure the BoringSSL are built instead of using system provided
# libraries. It prevents dependency issues while distributing to
# Mac users who use MacPorts to manage their libraries. #17002
mod_env = dict(os.environ)
mod_env['REQUIRE_CUSTOM_LIBRARIES_opt'] = '1'
make_process = subprocess.Popen(
['make'] + extra_defines + targets,
env=mod_env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
make_out, make_err = make_process.communicate()
if make_out and make_process.returncode != 0:
sys.stdout.write(str(make_out) + '\n')
if make_err:
sys.stderr.write(str(make_err) + '\n')
if make_process.returncode != 0:
raise Exception("make command failed!")
# *By default*, macOS and FreBSD use clang and Linux use gcc
#
# If we are not using a permissive compiler that's OK with being
# passed wrong std flags, swap out compile function by adding a filter
# for it.
if not compiler_ok_with_extra_std():
old_compile = self.compiler._compile
def new_compile(obj, src, ext, cc_args, extra_postargs, pp_opts):
if src[-2:] == '.c':
extra_postargs = [
arg for arg in extra_postargs if not '-std=c++' in arg
]
return old_compile(obj, src, ext, cc_args, extra_postargs,
pp_opts)
self.compiler._compile = new_compile
compiler = self.compiler.compiler_type
if compiler in BuildExt.C_OPTIONS:

@ -19,6 +19,7 @@ import logging
import threading
import time
from concurrent import futures
import six
import grpc
@ -565,8 +566,8 @@ def _send_message_callback_to_blocking_iterator_adapter(
def _select_thread_pool_for_behavior(behavior, default_thread_pool):
if hasattr(behavior, 'experimental_thread_pool'
) and behavior.experimental_thread_pool is not None:
if hasattr(behavior, 'experimental_thread_pool') and isinstance(
behavior.experimental_thread_pool, futures.ThreadPoolExecutor):
return behavior.experimental_thread_pool
else:
return default_thread_pool

@ -16,7 +16,7 @@ import threading
from concurrent import futures
class RecordingThreadPool(futures.Executor):
class RecordingThreadPool(futures.ThreadPoolExecutor):
"""A thread pool that records if used."""
def __init__(self, max_workers):

@ -715,7 +715,7 @@
ifeq ($(HAS_PKG_CONFIG),true)
PROTOBUF_PKG_CONFIG = true
PC_REQUIRES_GRPCXX = protobuf
CPPFLAGS := $(shell $(PKG_CONFIG) --cflags protobuf) $(CPPFLAGS)
CPPFLAGS := $(CPPFLAGS) $(shell $(PKG_CONFIG) --cflags protobuf)
LDFLAGS_PROTOBUF_PKG_CONFIG = $(shell $(PKG_CONFIG) --libs-only-L protobuf)
ifeq ($(SYSTEM),Linux)
ifneq ($(LDFLAGS_PROTOBUF_PKG_CONFIG),)

@ -65,7 +65,7 @@
#define HTTP1_DETAIL_MSG "Trying to connect an http1.x server"
/* TODO(zyc) Check the content of incomming data instead of using this length */
/* TODO(zyc) Check the content of incoming data instead of using this length */
/* The 'bad' server will start sending responses after reading this amount of
* data from the client. */
#define SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD (size_t)200

@ -312,7 +312,7 @@ static void test_pings_without_data(grpc_end2end_test_config config) {
CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
cq_verify(cqv);
// Send too many pings to the server similar to the prevous test case.
// Send too many pings to the server similar to the previous test case.
// However, since we set the MAX_PINGS_WITHOUT_DATA at the client side, only
// MAX_PING_STRIKES will actually be sent and the rpc will still succeed.
int i;

@ -51,7 +51,7 @@ static void deserialize_response_test() {
GPR_ASSERT(grpc_gcp_handshaker_resp_equals(resp, decoded_resp));
grpc_byte_buffer_destroy(buffer);
/* Invalid serializaiton. */
/* Invalid serialization. */
grpc_slice bad_slice =
grpc_slice_split_head(&slice, GRPC_SLICE_LENGTH(slice) - 1);
buffer = grpc_raw_byte_buffer_create(&bad_slice, 1 /* number of slices */);

@ -113,6 +113,7 @@ class ServiceA final {
virtual ~experimental_async_interface() {}
// MethodA1 leading comment 1
virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
// MethodA1 trailing comment 1
// MethodA2 detached leading comment 1
//
@ -182,6 +183,7 @@ class ServiceA final {
public StubInterface::experimental_async_interface {
public:
void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
void MethodA1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
void MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::experimental::ClientWriteReactor< ::grpc::testing::Request>* reactor) override;
void MethodA3(::grpc::ClientContext* context, ::grpc::testing::Request* request, ::grpc::experimental::ClientReadReactor< ::grpc::testing::Response>* reactor) override;
void MethodA4(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::grpc::testing::Request,::grpc::testing::Response>* reactor) override;
@ -714,6 +716,7 @@ class ServiceB final {
virtual ~experimental_async_interface() {}
// MethodB1 leading comment 1
virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
// MethodB1 trailing comment 1
};
virtual class experimental_async_interface* experimental_async() { return nullptr; }
@ -735,6 +738,7 @@ class ServiceB final {
public StubInterface::experimental_async_interface {
public:
void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
void MethodB1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
private:
friend class Stub;
explicit experimental_async(Stub* stub): stub_(stub) { }

@ -150,6 +150,7 @@ grpc_cc_test(
"gtest",
],
deps = [
":interceptors_util",
":test_service_impl",
"//:gpr",
"//:grpc",

@ -16,6 +16,7 @@
*
*/
#include <algorithm>
#include <functional>
#include <mutex>
#include <sstream>
@ -35,9 +36,11 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/interceptors_util.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/byte_buffer_proto_helper.h"
#include "test/cpp/util/string_ref_helper.h"
#include "test/cpp/util/test_credentials_provider.h"
#include <gtest/gtest.h>
@ -60,11 +63,17 @@ enum class Protocol { INPROC, TCP };
class TestScenario {
public:
TestScenario(bool serve_callback, Protocol protocol)
: callback_server(serve_callback), protocol(protocol) {}
TestScenario(bool serve_callback, Protocol protocol, bool intercept,
const grpc::string& creds_type)
: callback_server(serve_callback),
protocol(protocol),
use_interceptors(intercept),
credentials_type(creds_type) {}
void Log() const;
bool callback_server;
Protocol protocol;
bool use_interceptors;
const grpc::string credentials_type;
};
static std::ostream& operator<<(std::ostream& out,
@ -87,15 +96,18 @@ class ClientCallbackEnd2endTest
void SetUp() override {
ServerBuilder builder;
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
GetParam().credentials_type);
// TODO(vjpai): Support testing of AuthMetadataProcessor
if (GetParam().protocol == Protocol::TCP) {
if (!grpc_iomgr_run_in_background()) {
do_not_test_ = true;
return;
}
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
builder.AddListeningPort(server_address_.str(),
InsecureServerCredentials());
picked_port_ = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << picked_port_;
builder.AddListeningPort(server_address_.str(), server_creds);
}
if (!GetParam().callback_server) {
builder.RegisterService(&service_);
@ -103,31 +115,61 @@ class ClientCallbackEnd2endTest
builder.RegisterService(&callback_service_);
}
if (GetParam().use_interceptors) {
std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
creators;
// Add 20 dummy server interceptors
creators.reserve(20);
for (auto i = 0; i < 20; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
}
builder.experimental().SetInterceptorCreators(std::move(creators));
}
server_ = builder.BuildAndStart();
is_server_started_ = true;
}
void ResetStub() {
ChannelArguments args;
auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
GetParam().credentials_type, &args);
switch (GetParam().protocol) {
case Protocol::TCP:
channel_ =
CreateChannel(server_address_.str(), InsecureChannelCredentials());
if (!GetParam().use_interceptors) {
channel_ =
CreateCustomChannel(server_address_.str(), channel_creds, args);
} else {
channel_ = CreateCustomChannelWithInterceptors(
server_address_.str(), channel_creds, args,
CreateDummyClientInterceptors());
}
break;
case Protocol::INPROC:
channel_ = server_->InProcessChannel(args);
if (!GetParam().use_interceptors) {
channel_ = server_->InProcessChannel(args);
} else {
channel_ = server_->experimental().InProcessChannelWithInterceptors(
args, CreateDummyClientInterceptors());
}
break;
default:
assert(false);
}
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
generic_stub_.reset(new GenericStub(channel_));
DummyInterceptor::Reset();
}
void TearDown() override {
if (is_server_started_) {
server_->Shutdown();
}
if (picked_port_ > 0) {
grpc_recycle_unused_port(picked_port_);
}
}
void SendRpcs(int num_rpcs, bool with_binary_metadata) {
@ -178,6 +220,36 @@ class ClientCallbackEnd2endTest
}
}
void SendRpcsRawReq(int num_rpcs) {
grpc::string test_string("Hello raw world.");
EchoRequest request;
request.set_message(test_string);
std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
for (int i = 0; i < num_rpcs; i++) {
EchoResponse response;
ClientContext cli_ctx;
std::mutex mu;
std::condition_variable cv;
bool done = false;
stub_->experimental_async()->Echo(
&cli_ctx, send_buf.get(), &response,
[&request, &response, &done, &mu, &cv](Status s) {
GPR_ASSERT(s.ok());
EXPECT_EQ(request.message(), response.message());
std::lock_guard<std::mutex> l(mu);
done = true;
cv.notify_one();
});
std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
}
}
void SendRpcsGeneric(int num_rpcs, bool maybe_except) {
const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
grpc::string test_string("");
@ -283,6 +355,7 @@ class ClientCallbackEnd2endTest
}
bool do_not_test_{false};
bool is_server_started_{false};
int picked_port_{0};
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<grpc::GenericStub> generic_stub_;
@ -304,6 +377,12 @@ TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) {
SendRpcs(10, false);
}
TEST_P(ClientCallbackEnd2endTest, SequentialRpcsRawReq) {
MAYBE_SKIP_TEST;
ResetStub();
SendRpcsRawReq(10);
}
TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) {
MAYBE_SKIP_TEST;
ResetStub();
@ -419,136 +498,569 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
while (!done) {
cv.wait(l);
}
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, RequestStream) {
TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
MAYBE_SKIP_TEST;
ResetStub();
class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> {
public:
explicit Client(grpc::testing::EchoTestService::Stub* stub) {
context_.set_initial_metadata_corked(true);
stub->experimental_async()->RequestStream(&context_, &response_, this);
StartCall();
request_.set_message("Hello server.");
StartWrite(&request_);
EchoRequest request;
EchoResponse response;
ClientContext context;
request.set_message("hello");
context.AddMetadata(kServerTryCancelRequest,
grpc::to_string(CANCEL_BEFORE_PROCESSING));
std::mutex mu;
std::condition_variable cv;
bool done = false;
stub_->experimental_async()->Echo(
&context, &request, &response, [&done, &mu, &cv](Status s) {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
std::lock_guard<std::mutex> l(mu);
done = true;
cv.notify_one();
});
std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
}
struct ClientCancelInfo {
bool cancel{false};
int ops_before_cancel;
ClientCancelInfo() : cancel{false} {}
// Allow the single-op version to be non-explicit for ease of use
ClientCancelInfo(int ops) : cancel{true}, ops_before_cancel{ops} {}
};
class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
public:
WriteClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
int num_msgs_to_send, ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel),
num_msgs_to_send_(num_msgs_to_send),
client_cancel_{client_cancel} {
grpc::string msg{"Hello server."};
for (int i = 0; i < num_msgs_to_send; i++) {
desired_ += msg;
}
void OnWriteDone(bool ok) override {
writes_left_--;
if (writes_left_ > 1) {
StartWrite(&request_);
} else if (writes_left_ == 1) {
StartWriteLast(&request_, WriteOptions());
}
if (server_try_cancel != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
grpc::to_string(server_try_cancel));
}
void OnDone(const Status& s) override {
context_.set_initial_metadata_corked(true);
stub->experimental_async()->RequestStream(&context_, &response_, this);
StartCall();
request_.set_message(msg);
MaybeWrite();
}
void OnWriteDone(bool ok) override {
if (ok) {
num_msgs_sent_++;
MaybeWrite();
}
}
void OnDone(const Status& s) override {
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_);
int num_to_send =
(client_cancel_.cancel)
? std::min(num_msgs_to_send_, client_cancel_.ops_before_cancel)
: num_msgs_to_send_;
switch (server_try_cancel_) {
case CANCEL_BEFORE_PROCESSING:
case CANCEL_DURING_PROCESSING:
// If the RPC is canceled by server before / during messages from the
// client, it means that the client most likely did not get a chance to
// send all the messages it wanted to send. i.e num_msgs_sent <=
// num_msgs_to_send
EXPECT_LE(num_msgs_sent_, num_to_send);
break;
case DO_NOT_CANCEL:
case CANCEL_AFTER_PROCESSING:
// If the RPC was not canceled or canceled after all messages were read
// by the server, the client did get a chance to send all its messages
EXPECT_EQ(num_msgs_sent_, num_to_send);
break;
default:
assert(false);
break;
}
if ((server_try_cancel_ == DO_NOT_CANCEL) && !client_cancel_.cancel) {
EXPECT_TRUE(s.ok());
EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server.");
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
EXPECT_EQ(response_.message(), desired_);
} else {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
}
private:
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
int writes_left_{3};
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
} test{stub_.get()};
private:
void MaybeWrite() {
if (client_cancel_.cancel &&
num_msgs_sent_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
} else if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
StartWrite(&request_);
} else if (num_msgs_to_send_ == num_msgs_sent_ + 1) {
StartWriteLast(&request_, WriteOptions());
}
}
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
const ServerTryCancelRequestPhase server_try_cancel_;
int num_msgs_sent_{0};
const int num_msgs_to_send_;
grpc::string desired_;
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
};
TEST_P(ClientCallbackEnd2endTest, RequestStream) {
MAYBE_SKIP_TEST;
ResetStub();
WriteClient test{stub_.get(), DO_NOT_CANCEL, 3};
test.Await();
// Make sure that the server interceptors were not notified to cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) {
MAYBE_SKIP_TEST;
ResetStub();
class Client : public grpc::experimental::ClientReadReactor<EchoResponse> {
public:
explicit Client(grpc::testing::EchoTestService::Stub* stub) {
request_.set_message("Hello client ");
stub->experimental_async()->ResponseStream(&context_, &request_, this);
StartCall();
StartRead(&response_);
WriteClient test{stub_.get(), DO_NOT_CANCEL, 3, {2}};
test.Await();
// Make sure that the server interceptors got the cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel before doing reading the request
TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) {
MAYBE_SKIP_TEST;
ResetStub();
WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel while reading a request from the stream in parallel
TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) {
MAYBE_SKIP_TEST;
ResetStub();
WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel after reading all the requests but before returning to the
// client
TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
MAYBE_SKIP_TEST;
ResetStub();
WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
public:
ReadClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel), client_cancel_{client_cancel} {
if (server_try_cancel_ != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
grpc::to_string(server_try_cancel));
}
void OnReadDone(bool ok) override {
if (!ok) {
request_.set_message("Hello client ");
stub->experimental_async()->ResponseStream(&context_, &request_, this);
if (client_cancel_.cancel &&
reads_complete_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
}
// Even if we cancel, read until failure because there might be responses
// pending
StartRead(&response_);
StartCall();
}
void OnReadDone(bool ok) override {
if (!ok) {
if (server_try_cancel_ == DO_NOT_CANCEL && !client_cancel_.cancel) {
EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
} else {
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
EXPECT_EQ(response_.message(),
request_.message() + grpc::to_string(reads_complete_));
reads_complete_++;
StartRead(&response_);
}
} else {
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
EXPECT_EQ(response_.message(),
request_.message() + grpc::to_string(reads_complete_));
reads_complete_++;
if (client_cancel_.cancel &&
reads_complete_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
}
// Even if we cancel, read until failure because there might be responses
// pending
StartRead(&response_);
}
void OnDone(const Status& s) override {
EXPECT_TRUE(s.ok());
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
}
void OnDone(const Status& s) override {
gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
switch (server_try_cancel_) {
case DO_NOT_CANCEL:
if (!client_cancel_.cancel || client_cancel_.ops_before_cancel >
kServerDefaultResponseStreamsToSend) {
EXPECT_TRUE(s.ok());
EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
} else {
EXPECT_GE(reads_complete_, client_cancel_.ops_before_cancel);
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
// Status might be ok or cancelled depending on whether server
// sent status before client cancel went through
if (!s.ok()) {
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
}
break;
case CANCEL_BEFORE_PROCESSING:
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_EQ(reads_complete_, 0);
break;
case CANCEL_DURING_PROCESSING:
case CANCEL_AFTER_PROCESSING:
// If server canceled while writing messages, client must have read
// less than or equal to the expected number of messages. Even if the
// server canceled after writing all messages, the RPC may be canceled
// before the Client got a chance to read all the messages.
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
break;
default:
assert(false);
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
}
private:
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
int reads_complete_{0};
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
} test{stub_.get()};
private:
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
const ServerTryCancelRequestPhase server_try_cancel_;
int reads_complete_{0};
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
};
TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), DO_NOT_CANCEL};
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), DO_NOT_CANCEL, 2};
test.Await();
// Because cancel in this case races with server finish, we can't be sure that
// server interceptors even see cancellation
}
// Server to cancel before sending any response messages
TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel while writing a response to the stream in parallel
TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel after writing all the respones to the stream but before
// returning to the client
TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
class BidiClient
: public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
public:
BidiClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
int num_msgs_to_send, ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel),
msgs_to_send_{num_msgs_to_send},
client_cancel_{client_cancel} {
if (server_try_cancel_ != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
grpc::to_string(server_try_cancel));
}
request_.set_message("Hello fren ");
stub->experimental_async()->BidiStream(&context_, this);
MaybeWrite();
StartRead(&response_);
StartCall();
}
void OnReadDone(bool ok) override {
if (!ok) {
if (server_try_cancel_ == DO_NOT_CANCEL) {
if (!client_cancel_.cancel) {
EXPECT_EQ(reads_complete_, msgs_to_send_);
} else {
EXPECT_LE(reads_complete_, writes_complete_);
}
}
} else {
EXPECT_LE(reads_complete_, msgs_to_send_);
EXPECT_EQ(response_.message(), request_.message());
reads_complete_++;
StartRead(&response_);
}
}
void OnWriteDone(bool ok) override {
if (server_try_cancel_ == DO_NOT_CANCEL) {
EXPECT_TRUE(ok);
} else if (!ok) {
return;
}
writes_complete_++;
MaybeWrite();
}
void OnDone(const Status& s) override {
gpr_log(GPR_INFO, "Sent %d messages", writes_complete_);
gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
switch (server_try_cancel_) {
case DO_NOT_CANCEL:
if (!client_cancel_.cancel ||
client_cancel_.ops_before_cancel > msgs_to_send_) {
EXPECT_TRUE(s.ok());
EXPECT_EQ(writes_complete_, msgs_to_send_);
EXPECT_EQ(reads_complete_, writes_complete_);
} else {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_EQ(writes_complete_, client_cancel_.ops_before_cancel);
EXPECT_LE(reads_complete_, writes_complete_);
}
break;
case CANCEL_BEFORE_PROCESSING:
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
// The RPC is canceled before the server did any work or returned any
// reads, but it's possible that some writes took place first from the
// client
EXPECT_LE(writes_complete_, msgs_to_send_);
EXPECT_EQ(reads_complete_, 0);
break;
case CANCEL_DURING_PROCESSING:
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_LE(writes_complete_, msgs_to_send_);
EXPECT_LE(reads_complete_, writes_complete_);
break;
case CANCEL_AFTER_PROCESSING:
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_EQ(writes_complete_, msgs_to_send_);
// The Server canceled after reading the last message and after writing
// the message to the client. However, the RPC cancellation might have
// taken effect before the client actually read the response.
EXPECT_LE(reads_complete_, writes_complete_);
break;
default:
assert(false);
}
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
}
private:
void MaybeWrite() {
if (client_cancel_.cancel &&
writes_complete_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
} else if (writes_complete_ == msgs_to_send_) {
StartWritesDone();
} else {
StartWrite(&request_);
}
}
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
const ServerTryCancelRequestPhase server_try_cancel_;
int reads_complete_{0};
int writes_complete_{0};
const int msgs_to_send_;
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
};
TEST_P(ClientCallbackEnd2endTest, BidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend};
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend, 2};
test.Await();
// Make sure that the server interceptors were notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel before reading/writing any requests/responses on the stream
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel while reading/writing requests/responses on the stream in
// parallel
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel after reading/writing all requests/responses on the stream
// but before returning to the client
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) {
MAYBE_SKIP_TEST;
ResetStub();
class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
EchoResponse> {
public:
explicit Client(grpc::testing::EchoTestService::Stub* stub) {
request_.set_message("Hello fren ");
Client(grpc::testing::EchoTestService::Stub* stub) {
request_.set_message("Hello bidi ");
stub->experimental_async()->BidiStream(&context_, this);
StartCall();
StartRead(&response_);
StartWrite(&request_);
StartCall();
}
void OnReadDone(bool ok) override {
if (!ok) {
EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
} else {
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
EXPECT_EQ(response_.message(), request_.message());
reads_complete_++;
StartRead(&response_);
}
EXPECT_TRUE(ok);
EXPECT_EQ(response_.message(), request_.message());
}
void OnWriteDone(bool ok) override {
EXPECT_TRUE(ok);
if (++writes_complete_ == kServerDefaultResponseStreamsToSend) {
StartWritesDone();
} else {
StartWrite(&request_);
}
// Now send out the simultaneous Read and WritesDone
StartWritesDone();
StartRead(&response_);
}
void OnDone(const Status& s) override {
EXPECT_TRUE(s.ok());
EXPECT_EQ(response_.message(), request_.message());
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
@ -564,8 +1076,6 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
int reads_complete_{0};
int writes_complete_{0};
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
@ -574,13 +1084,42 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
test.Await();
}
TestScenario scenarios[]{{false, Protocol::INPROC},
{false, Protocol::TCP},
{true, Protocol::INPROC},
{true, Protocol::TCP}};
std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
std::vector<TestScenario> scenarios;
std::vector<grpc::string> credentials_types{
GetCredentialsProvider()->GetSecureCredentialsTypeList()};
auto insec_ok = [] {
// Only allow insecure credentials type when it is registered with the
// provider. User may create providers that do not have insecure.
return GetCredentialsProvider()->GetChannelCredentials(
kInsecureCredentialsType, nullptr) != nullptr;
};
if (test_insecure && insec_ok()) {
credentials_types.push_back(kInsecureCredentialsType);
}
GPR_ASSERT(!credentials_types.empty());
bool barr[]{false, true};
Protocol parr[]{Protocol::INPROC, Protocol::TCP};
for (Protocol p : parr) {
for (const auto& cred : credentials_types) {
// TODO(vjpai): Test inproc with secure credentials when feasible
if (p == Protocol::INPROC &&
(cred != kInsecureCredentialsType || !insec_ok())) {
continue;
}
for (bool callback_server : barr) {
for (bool use_interceptors : barr) {
scenarios.emplace_back(callback_server, p, use_interceptors, cred);
}
}
}
}
return scenarios;
}
INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
::testing::ValuesIn(scenarios));
::testing::ValuesIn(CreateTestScenarios(true)));
} // namespace
} // namespace testing

@ -583,7 +583,10 @@ CallbackTestServiceImpl::RequestStream() {
StartRead(&request_);
}
void OnDone() override { delete this; }
void OnCancel() override { FinishOnce(Status::CANCELLED); }
void OnCancel() override {
EXPECT_TRUE(ctx_->IsCancelled());
FinishOnce(Status::CANCELLED);
}
void OnReadDone(bool ok) override {
if (ok) {
response_->mutable_message()->append(request_.message());
@ -666,7 +669,10 @@ CallbackTestServiceImpl::ResponseStream() {
}
}
void OnDone() override { delete this; }
void OnCancel() override { FinishOnce(Status::CANCELLED); }
void OnCancel() override {
EXPECT_TRUE(ctx_->IsCancelled());
FinishOnce(Status::CANCELLED);
}
void OnWriteDone(bool ok) override {
if (num_msgs_sent_ < server_responses_to_send_) {
NextWrite();
@ -749,7 +755,10 @@ CallbackTestServiceImpl::BidiStream() {
StartRead(&request_);
}
void OnDone() override { delete this; }
void OnCancel() override { FinishOnce(Status::CANCELLED); }
void OnCancel() override {
EXPECT_TRUE(ctx_->IsCancelled());
FinishOnce(Status::CANCELLED);
}
void OnReadDone(bool ok) override {
if (ok) {
num_msgs_read_++;

@ -88,7 +88,7 @@ bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only,
int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
// The output of metrics client is in some cases programatically parsed (for
// The output of metrics client is in some cases programmatically parsed (for
// example by the stress test framework). So, we do not want any of the log
// from the grpc library appearing on stdout.
gpr_set_log_function(BlackholeLogger);

@ -20,6 +20,17 @@
#include "test/cpp/microbenchmarks/helpers.h"
static grpc::internal::GrpcLibraryInitializer g_gli_initializer;
Library::Library() {
g_gli_initializer.summon();
#ifdef GPR_LOW_LEVEL_COUNTERS
grpc_memory_counters_init();
#endif
init_lib_.init();
rq_ = grpc_resource_quota_create("bm");
}
void TrackCounters::Finish(benchmark::State& state) {
std::ostringstream out;
for (const auto& l : labels_) {

@ -39,13 +39,7 @@ class Library {
grpc_resource_quota* rq() { return rq_; }
private:
Library() {
#ifdef GPR_LOW_LEVEL_COUNTERS
grpc_memory_counters_init();
#endif
init_lib_.init();
rq_ = grpc_resource_quota_create("bm");
}
Library();
~Library() { init_lib_.shutdown(); }

@ -790,7 +790,7 @@ TEST_F(AddressSortingTest, TestPrefersIpv6LoopbackInputsFlipped) {
/* Try to rule out false positives in the above two tests in which
* the sorter might think that neither ipv6 or ipv4 loopback is
* available, but ipv6 loopback is still preferred only due
* to precedance table lookups. */
* to precedence table lookups. */
TEST_F(AddressSortingTest, TestSorterKnowsIpv6LoopbackIsAvailable) {
sockaddr_in6 ipv6_loopback;
memset(&ipv6_loopback, 0, sizeof(ipv6_loopback));

@ -3300,7 +3300,8 @@
"language": "c++",
"name": "client_callback_end2end_test",
"src": [
"test/cpp/end2end/client_callback_end2end_test.cc"
"test/cpp/end2end/client_callback_end2end_test.cc",
"test/cpp/end2end/interceptors_util.cc"
],
"third_party": false,
"type": "target"

Loading…
Cancel
Save