diff --git a/CMakeLists.txt b/CMakeLists.txt index 2575a328225..1bab5e6cba2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12441,6 +12441,7 @@ if (gRPC_BUILD_TESTS) add_executable(client_callback_end2end_test test/cpp/end2end/client_callback_end2end_test.cc + test/cpp/end2end/interceptors_util.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc ) diff --git a/Makefile b/Makefile index 9d0b37b687a..2822623a221 100644 --- a/Makefile +++ b/Makefile @@ -809,7 +809,7 @@ ifeq ($(HAS_SYSTEM_PROTOBUF),true) ifeq ($(HAS_PKG_CONFIG),true) PROTOBUF_PKG_CONFIG = true PC_REQUIRES_GRPCXX = protobuf -CPPFLAGS := $(shell $(PKG_CONFIG) --cflags protobuf) $(CPPFLAGS) +CPPFLAGS := $(CPPFLAGS) $(shell $(PKG_CONFIG) --cflags protobuf) LDFLAGS_PROTOBUF_PKG_CONFIG = $(shell $(PKG_CONFIG) --libs-only-L protobuf) ifeq ($(SYSTEM),Linux) ifneq ($(LDFLAGS_PROTOBUF_PKG_CONFIG),) @@ -17464,6 +17464,7 @@ endif CLIENT_CALLBACK_END2END_TEST_SRC = \ test/cpp/end2end/client_callback_end2end_test.cc \ + test/cpp/end2end/interceptors_util.cc \ CLIENT_CALLBACK_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_CALLBACK_END2END_TEST_SRC)))) ifeq ($(NO_SECURE),true) @@ -17496,6 +17497,8 @@ endif $(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_callback_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + deps_client_callback_end2end_test: $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep) ifneq ($(NO_SECURE),true) diff --git a/bazel/generate_cc.bzl b/bazel/generate_cc.bzl index 2f14071f92d..8f30c84f6b9 100644 --- a/bazel/generate_cc.bzl +++ b/bazel/generate_cc.bzl @@ -28,7 +28,7 @@ def generate_cc_impl(ctx): else: outs += [proto.path[label_len:-len(".proto")] + ".pb.h" for proto in protos] outs += [proto.path[label_len:-len(".proto")] + ".pb.cc" for proto in protos] - out_files = [ctx.new_file(out) for out in outs] + out_files = [ctx.actions.declare_file(out) for out in outs] dir_out = str(ctx.genfiles_dir.path + proto_root) arguments = [] @@ -38,10 +38,10 @@ def generate_cc_impl(ctx): if ctx.attr.generate_mocks: flags.append("generate_mock_code=true") arguments += ["--PLUGIN_out=" + ",".join(flags) + ":" + dir_out] - additional_input = [ctx.executable.plugin] + tools = [ctx.executable.plugin] else: arguments += ["--cpp_out=" + ",".join(ctx.attr.flags) + ":" + dir_out] - additional_input = [] + tools = [] # Import protos relative to their workspace root so that protoc prints the # right include paths. @@ -70,8 +70,9 @@ def generate_cc_impl(ctx): arguments += ["-I{0}".format(f + "/../..")] well_known_proto_files = [f for f in ctx.attr.well_known_protos.files] - ctx.action( - inputs = protos + includes + additional_input + well_known_proto_files, + ctx.actions.run( + inputs = protos + includes + well_known_proto_files, + tools = tools, outputs = out_files, executable = ctx.executable._protoc, arguments = arguments, diff --git a/bazel/grpc_deps.bzl b/bazel/grpc_deps.bzl index 61a46e1ee5c..e2e47292242 100644 --- a/bazel/grpc_deps.bzl +++ b/bazel/grpc_deps.bzl @@ -170,8 +170,8 @@ def grpc_deps(): if "com_google_absl" not in native.existing_rules(): http_archive( name = "com_google_absl", - strip_prefix = "abseil-cpp-cd95e71df6eaf8f2a282b1da556c2cf1c9b09207", - url = "https://github.com/abseil/abseil-cpp/archive/cd95e71df6eaf8f2a282b1da556c2cf1c9b09207.tar.gz", + strip_prefix = "abseil-cpp-308ce31528a7edfa39f5f6d36142278a0ae1bf45", + url = "https://github.com/abseil/abseil-cpp/archive/308ce31528a7edfa39f5f6d36142278a0ae1bf45.tar.gz", ) if "com_github_bazelbuild_bazeltoolchains" not in native.existing_rules(): @@ -196,8 +196,8 @@ def grpc_deps(): if "io_opencensus_cpp" not in native.existing_rules(): http_archive( name = "io_opencensus_cpp", - strip_prefix = "opencensus-cpp-fdf0f308b1631bb4a942e32ba5d22536a6170274", - url = "https://github.com/census-instrumentation/opencensus-cpp/archive/fdf0f308b1631bb4a942e32ba5d22536a6170274.tar.gz", + strip_prefix = "opencensus-cpp-03dff0352522983ffdee48cedbf87cbe37f1bb7f", + url = "https://github.com/census-instrumentation/opencensus-cpp/archive/03dff0352522983ffdee48cedbf87cbe37f1bb7f.tar.gz", ) if "upb" not in native.existing_rules(): diff --git a/build.yaml b/build.yaml index 855be0ae79f..c18630ecdd3 100644 --- a/build.yaml +++ b/build.yaml @@ -4468,6 +4468,7 @@ targets: language: c++ src: - test/cpp/end2end/client_callback_end2end_test.cc + - test/cpp/end2end/interceptors_util.cc deps: - grpc++_test_util - grpc_test_util diff --git a/doc/environment_variables.md b/doc/environment_variables.md index 132de81a7bd..435edbcfdb4 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -41,6 +41,9 @@ some configuration as environment variables that can be set. - bdp_estimator - traces behavior of bdp estimation logic - call_combiner - traces call combiner state - call_error - traces the possible errors contributing to final call status + - cares_resolver - traces operations of the c-ares based DNS resolver + - cares_address_sorting - traces operations of the c-ares based DNS + resolver's resolved address sorter - channel - traces operations on the C core channel stack - client_channel - traces client channel activity, including resolver and load balancing policy interaction diff --git a/doc/server_reflection_tutorial.md b/doc/server_reflection_tutorial.md index 06a257c1e87..acbb4a6ab2d 100644 --- a/doc/server_reflection_tutorial.md +++ b/doc/server_reflection_tutorial.md @@ -15,8 +15,8 @@ server reflection, you can link this library to your server binary. Some platforms (e.g. Ubuntu 11.10 onwards) only link in libraries that directly contain symbols used by the application. On these platforms, LD flag -`--no-as-needed` is needed for for dynamic linking and `--whole-archive` is -needed for for static linking. +`--no-as-needed` is needed for dynamic linking and `--whole-archive` is +needed for static linking. This [Makefile](../examples/cpp/helloworld/Makefile#L37#L45) demonstrates enabling c++ server reflection on Linux and MacOS. diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h index a77e36dfc50..7b82f49a84e 100644 --- a/include/grpcpp/impl/codegen/byte_buffer.h +++ b/include/grpcpp/impl/codegen/byte_buffer.h @@ -96,7 +96,7 @@ class ByteBuffer final { /// \a buf. Wrapper of core function grpc_byte_buffer_copy . This is not /// a deep copy; it is just a referencing. As a result, its performance is /// size-independent. - ByteBuffer(const ByteBuffer& buf); + ByteBuffer(const ByteBuffer& buf) : buffer_(nullptr) { operator=(buf); } ~ByteBuffer() { if (buffer_) { @@ -107,7 +107,16 @@ class ByteBuffer final { /// Wrapper of core function grpc_byte_buffer_copy . This is not /// a deep copy; it is just a referencing. As a result, its performance is /// size-independent. - ByteBuffer& operator=(const ByteBuffer&); + ByteBuffer& operator=(const ByteBuffer& buf) { + if (this != &buf) { + Clear(); // first remove existing data + } + if (buf.buffer_) { + // then copy + buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buf.buffer_); + } + return *this; + } /// Dump (read) the buffer contents into \a slices. Status Dump(std::vector* slices) const; @@ -215,7 +224,7 @@ class SerializationTraits { bool* own_buffer) { *buffer = source; *own_buffer = true; - return Status::OK; + return g_core_codegen_interface->ok(); } }; diff --git a/setup.py b/setup.py index f533e7b77cf..1e205bdf91d 100644 --- a/setup.py +++ b/setup.py @@ -159,7 +159,7 @@ if EXTRA_ENV_COMPILE_ARGS is None: elif "linux" in sys.platform: EXTRA_ENV_COMPILE_ARGS += ' -std=gnu99 -fvisibility=hidden -fno-wrapv -fno-exceptions' elif "darwin" in sys.platform: - EXTRA_ENV_COMPILE_ARGS += ' -fvisibility=hidden -fno-wrapv -fno-exceptions' + EXTRA_ENV_COMPILE_ARGS += ' -stdlib=libc++ -fvisibility=hidden -fno-wrapv -fno-exceptions' EXTRA_ENV_COMPILE_ARGS += ' -DPB_FIELD_32BIT' if EXTRA_ENV_LINK_ARGS is None: @@ -265,7 +265,7 @@ def cython_extensions_and_necessity(): for name in CYTHON_EXTENSION_MODULE_NAMES] config = os.environ.get('CONFIG', 'opt') prefix = 'libs/' + config + '/' - if "darwin" in sys.platform or USE_PREBUILT_GRPC_CORE: + if USE_PREBUILT_GRPC_CORE: extra_objects = [prefix + 'libares.a', prefix + 'libboringssl.a', prefix + 'libgpr.a', diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index b0046872502..96e9ab8dcfb 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -580,6 +580,10 @@ void PrintHeaderClientMethodCallbackInterfaces( "virtual void $Method$(::grpc::ClientContext* context, " "const $Request$* request, $Response$* response, " "std::function) = 0;\n"); + printer->Print(*vars, + "virtual void $Method$(::grpc::ClientContext* context, " + "const ::grpc::ByteBuffer* request, $Response$* response, " + "std::function) = 0;\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "virtual void $Method$(::grpc::ClientContext* context, " @@ -642,6 +646,10 @@ void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer, "void $Method$(::grpc::ClientContext* context, " "const $Request$* request, $Response$* response, " "std::function) override;\n"); + printer->Print(*vars, + "void $Method$(::grpc::ClientContext* context, " + "const ::grpc::ByteBuffer* request, $Response$* response, " + "std::function) override;\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "void $Method$(::grpc::ClientContext* context, " @@ -1643,6 +1651,16 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "(stub_->channel_.get(), stub_->rpcmethod_$Method$_, " "context, request, response, std::move(f));\n}\n\n"); + printer->Print(*vars, + "void $ns$$Service$::Stub::experimental_async::$Method$(" + "::grpc::ClientContext* context, " + "const ::grpc::ByteBuffer* request, $Response$* response, " + "std::function f) {\n"); + printer->Print(*vars, + " return ::grpc::internal::CallbackUnaryCall" + "(stub_->channel_.get(), stub_->rpcmethod_$Method$_, " + "context, request, response, std::move(f));\n}\n\n"); + for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 69d4ee24368..c99943ab2f1 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -489,7 +489,7 @@ void grpc_resolver_dns_ares_init() { address_sorting_init(); grpc_error* error = grpc_ares_init(); if (error != GRPC_ERROR_NONE) { - GRPC_LOG_IF_ERROR("ares_library_init() failed", error); + GRPC_LOG_IF_ERROR("grpc_ares_init() failed", error); return; } if (default_resolver == nullptr) { diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index d41c8238f1c..986af89454f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -47,9 +47,6 @@ using grpc_core::ServerAddress; using grpc_core::ServerAddressList; -static gpr_once g_basic_init = GPR_ONCE_INIT; -static gpr_mu g_init_mu; - grpc_core::TraceFlag grpc_trace_cares_address_sorting(false, "cares_address_sorting"); @@ -89,8 +86,6 @@ typedef struct grpc_ares_hostbyname_request { bool is_balancer; } grpc_ares_hostbyname_request; -static void do_basic_init(void) { gpr_mu_init(&g_init_mu); } - static void log_address_sorting_list(const ServerAddressList& addresses, const char* input_output_str) { for (size_t i = 0; i < addresses.size(); i++) { @@ -588,12 +583,12 @@ static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) = grpc_cancel_ares_request_locked_impl; +// ares_library_init and ares_library_cleanup are currently no-op except under +// Windows. Calling them may cause race conditions when other parts of the +// binary calls these functions concurrently. +#ifdef GPR_WINDOWS grpc_error* grpc_ares_init(void) { - gpr_once_init(&g_basic_init, do_basic_init); - gpr_mu_lock(&g_init_mu); int status = ares_library_init(ARES_LIB_INIT_ALL); - gpr_mu_unlock(&g_init_mu); - if (status != ARES_SUCCESS) { char* error_msg; gpr_asprintf(&error_msg, "ares_library_init failed: %s", @@ -605,11 +600,11 @@ grpc_error* grpc_ares_init(void) { return GRPC_ERROR_NONE; } -void grpc_ares_cleanup(void) { - gpr_mu_lock(&g_init_mu); - ares_library_cleanup(); - gpr_mu_unlock(&g_init_mu); -} +void grpc_ares_cleanup(void) { ares_library_cleanup(); } +#else +grpc_error* grpc_ares_init(void) { return GRPC_ERROR_NONE; } +void grpc_ares_cleanup(void) {} +#endif // GPR_WINDOWS /* * grpc_resolve_address_ares related structs and functions diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc index d7fd73fd6b2..1d373c5b994 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc @@ -30,7 +30,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/sockaddr_posix.h" +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/slice/slice_internal.h" diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 1c9b37dada2..970c71b663d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1062,12 +1062,15 @@ static void write_action_end_locked(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = static_cast(tp); + bool closed = false; if (error != GRPC_ERROR_NONE) { close_transport_locked(t, GRPC_ERROR_REF(error)); + closed = true; } if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + closed = true; if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { close_transport_locked( t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); @@ -1086,6 +1089,14 @@ static void write_action_end_locked(void* tp, grpc_error* error) { set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); t->is_first_write_in_batch = false; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + // If the transport is closed, we will retry writing on the endpoint + // and next write may contain part of the currently serialized frames. + // So, we should only call the run_after_write callbacks when the next + // write finishes, or the callbacks will be invoked when the stream is + // closed. + if (!closed) { + GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + } GRPC_CLOSURE_RUN( GRPC_CLOSURE_INIT(&t->write_action_begin_locked, write_action_begin_locked, t, diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index ccf2256974a..7b47c9bc18e 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1452,7 +1452,7 @@ static grpc_error* begin_parse_string(grpc_chttp2_hpack_parser* p, uint8_t binary, grpc_chttp2_hpack_parser_string* str) { if (!p->huff && binary == NOT_BINARY && - (end - cur) >= static_cast(p->strlen) && + static_cast(end - cur) >= p->strlen && p->current_slice_refcount != nullptr) { GRPC_STATS_INC_HPACK_RECV_UNCOMPRESSED(); str->copied = false; diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index b0f93eb63f4..d46c24a1de0 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -1032,6 +1032,11 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, } } else { if (error != GRPC_ERROR_NONE) { + // Consume any send message that was sent here but that we are not pushing + // to the other side + if (op->send_message) { + op->payload->send_message.send_message.reset(); + } // Schedule op's closures that we didn't push to op state machine if (op->recv_initial_metadata) { if (op->payload->recv_initial_metadata.trailing_metadata_available != diff --git a/src/core/lib/gprpp/atomic.h b/src/core/lib/gprpp/atomic.h index 8cb9e9342ec..aec283c50dc 100644 --- a/src/core/lib/gprpp/atomic.h +++ b/src/core/lib/gprpp/atomic.h @@ -51,8 +51,9 @@ class Atomic { bool CompareExchangeWeak(T* expected, T desired, MemoryOrder success, MemoryOrder failure) { - return GPR_ATM_INC_CAS_THEN( - storage_.compare_exchange_weak(*expected, desired, success, failure)); + return GPR_ATM_INC_CAS_THEN(storage_.compare_exchange_weak( + *expected, desired, static_cast(success), + static_cast(failure))); } bool CompareExchangeStrong(T* expected, T desired, MemoryOrder success, @@ -76,7 +77,7 @@ class Atomic { // Atomically increment a counter only if the counter value is not zero. // Returns true if increment took place; false if counter is zero. - bool IncrementIfNonzero(MemoryOrder load_order = MemoryOrder::ACQ_REL) { + bool IncrementIfNonzero(MemoryOrder load_order = MemoryOrder::ACQUIRE) { T count = storage_.load(static_cast(load_order)); do { // If zero, we are done (without an increment). If not, we must do a CAS @@ -85,9 +86,8 @@ class Atomic { if (count == 0) { return false; } - } while (!storage_.AtomicCompareExchangeWeak( - &count, count + 1, static_cast(MemoryOrder::ACQ_REL), - static_cast(load_order))); + } while (!CompareExchangeWeak(&count, count + 1, MemoryOrder::ACQ_REL, + load_order)); return true; } diff --git a/src/core/lib/slice/slice.cc b/src/core/lib/slice/slice.cc index e842d84f11f..31437aa4600 100644 --- a/src/core/lib/slice/slice.cc +++ b/src/core/lib/slice/slice.cc @@ -26,6 +26,7 @@ #include +#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/iomgr/exec_ctx.h" char* grpc_slice_to_c_string(grpc_slice slice) { @@ -213,21 +214,35 @@ grpc_slice grpc_slice_from_copied_string(const char* source) { return grpc_slice_from_copied_buffer(source, strlen(source)); } -typedef struct { +namespace { + +struct MallocRefCount { + MallocRefCount(const grpc_slice_refcount_vtable* vtable) { + base.vtable = vtable; + base.sub_refcount = &base; + } + + void Ref() { refs.Ref(); } + void Unref() { + if (refs.Unref()) { + gpr_free(this); + } + } + grpc_slice_refcount base; - gpr_refcount refs; -} malloc_refcount; + grpc_core::RefCount refs; +}; + +} // namespace static void malloc_ref(void* p) { - malloc_refcount* r = static_cast(p); - gpr_ref(&r->refs); + MallocRefCount* r = static_cast(p); + r->Ref(); } static void malloc_unref(void* p) { - malloc_refcount* r = static_cast(p); - if (gpr_unref(&r->refs)) { - gpr_free(r); - } + MallocRefCount* r = static_cast(p); + r->Unref(); } static const grpc_slice_refcount_vtable malloc_vtable = { @@ -246,15 +261,10 @@ grpc_slice grpc_slice_malloc_large(size_t length) { refcount is a malloc_refcount bytes is an array of bytes of the requested length Both parts are placed in the same allocation returned from gpr_malloc */ - malloc_refcount* rc = static_cast( - gpr_malloc(sizeof(malloc_refcount) + length)); - - /* Initial refcount on rc is 1 - and it's up to the caller to release - this reference. */ - gpr_ref_init(&rc->refs, 1); + void* data = + static_cast(gpr_malloc(sizeof(MallocRefCount) + length)); - rc->base.vtable = &malloc_vtable; - rc->base.sub_refcount = &rc->base; + auto* rc = new (data) MallocRefCount(&malloc_vtable); /* Build up the slice to be returned. */ /* The slices refcount points back to the allocated block. */ diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index bfd8445f70e..7d679204bac 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -33,6 +33,7 @@ #include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/tls.h" +#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -44,6 +45,8 @@ grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure"); grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags"); grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount"); +namespace { + // Specifies a cq thread local cache. // The first event that occurs on a thread // with a cq cache will go into that cache, and @@ -84,24 +87,22 @@ typedef struct { grpc_closure* shutdown; } non_polling_poller; -static size_t non_polling_poller_size(void) { - return sizeof(non_polling_poller); -} +size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); } -static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) { +void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) { non_polling_poller* npp = reinterpret_cast(pollset); gpr_mu_init(&npp->mu); *mu = &npp->mu; } -static void non_polling_poller_destroy(grpc_pollset* pollset) { +void non_polling_poller_destroy(grpc_pollset* pollset) { non_polling_poller* npp = reinterpret_cast(pollset); gpr_mu_destroy(&npp->mu); } -static grpc_error* non_polling_poller_work(grpc_pollset* pollset, - grpc_pollset_worker** worker, - grpc_millis deadline) { +grpc_error* non_polling_poller_work(grpc_pollset* pollset, + grpc_pollset_worker** worker, + grpc_millis deadline) { non_polling_poller* npp = reinterpret_cast(pollset); if (npp->shutdown) return GRPC_ERROR_NONE; if (npp->kicked_without_poller) { @@ -141,8 +142,8 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset, return GRPC_ERROR_NONE; } -static grpc_error* non_polling_poller_kick( - grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { +grpc_error* non_polling_poller_kick(grpc_pollset* pollset, + grpc_pollset_worker* specific_worker) { non_polling_poller* p = reinterpret_cast(pollset); if (specific_worker == nullptr) specific_worker = reinterpret_cast(p->root); @@ -159,8 +160,7 @@ static grpc_error* non_polling_poller_kick( return GRPC_ERROR_NONE; } -static void non_polling_poller_shutdown(grpc_pollset* pollset, - grpc_closure* closure) { +void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) { non_polling_poller* p = reinterpret_cast(pollset); GPR_ASSERT(closure != nullptr); p->shutdown = closure; @@ -175,7 +175,7 @@ static void non_polling_poller_shutdown(grpc_pollset* pollset, } } -static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { +const cq_poller_vtable g_poller_vtable_by_poller_type[] = { /* GRPC_CQ_DEFAULT_POLLING */ {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick, grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy}, @@ -188,7 +188,9 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { non_polling_poller_shutdown, non_polling_poller_destroy}, }; -typedef struct cq_vtable { +} // namespace + +struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; void (*init)(void* data, @@ -203,80 +205,116 @@ typedef struct cq_vtable { void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); -} cq_vtable; +}; + +namespace { /* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue * (a lockfree multiproducer single consumer queue). It uses a queue_lock * to support multiple consumers. * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */ -typedef struct grpc_cq_event_queue { +class CqEventQueue { + public: + CqEventQueue() { gpr_mpscq_init(&queue_); } + ~CqEventQueue() { gpr_mpscq_destroy(&queue_); } + + /* Note: The counter is not incremented/decremented atomically with push/pop. + * The count is only eventually consistent */ + intptr_t num_items() const { + return num_queue_items_.Load(grpc_core::MemoryOrder::RELAXED); + } + + bool Push(grpc_cq_completion* c); + grpc_cq_completion* Pop(); + + private: /* Spinlock to serialize consumers i.e pop() operations */ - gpr_spinlock queue_lock; + gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER; - gpr_mpscq queue; + gpr_mpscq queue_; /* A lazy counter of number of items in the queue. This is NOT atomically incremented/decremented along with push/pop operations and hence is only eventually consistent */ - gpr_atm num_queue_items; -} grpc_cq_event_queue; + grpc_core::Atomic num_queue_items_{0}; +}; + +struct cq_next_data { + ~cq_next_data() { GPR_ASSERT(queue.num_items() == 0); } -typedef struct cq_next_data { /** Completed events for completion-queues of type GRPC_CQ_NEXT */ - grpc_cq_event_queue queue; + CqEventQueue queue; /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ - gpr_atm things_queued_ever; + grpc_core::Atomic things_queued_ever{0}; - /* Number of outstanding events (+1 if not shut down) */ - gpr_atm pending_events; + /** Number of outstanding events (+1 if not shut down) + Initial count is dropped by grpc_completion_queue_shutdown */ + grpc_core::Atomic pending_events{1}; /** 0 initially. 1 once we initiated shutdown */ - bool shutdown_called; -} cq_next_data; + bool shutdown_called = false; +}; + +struct cq_pluck_data { + cq_pluck_data() { + completed_tail = &completed_head; + completed_head.next = reinterpret_cast(completed_tail); + } + + ~cq_pluck_data() { + GPR_ASSERT(completed_head.next == + reinterpret_cast(&completed_head)); + } -typedef struct cq_pluck_data { /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion* completed_tail; - /** Number of pending events (+1 if we're not shutdown) */ - gpr_atm pending_events; + /** Number of pending events (+1 if we're not shutdown). + Initial count is dropped by grpc_completion_queue_shutdown. */ + grpc_core::Atomic pending_events{1}; /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ - gpr_atm things_queued_ever; + grpc_core::Atomic things_queued_ever{0}; /** 0 initially. 1 once we completed shutting */ /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if * (pending_events == 0). So consider removing this in future and use * pending_events */ - gpr_atm shutdown; + grpc_core::Atomic shutdown{false}; /** 0 initially. 1 once we initiated shutdown */ - bool shutdown_called; + bool shutdown_called = false; - int num_pluckers; + int num_pluckers = 0; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; -} cq_pluck_data; +}; -typedef struct cq_callback_data { +struct cq_callback_data { + cq_callback_data( + grpc_experimental_completion_queue_functor* shutdown_callback) + : shutdown_callback(shutdown_callback) {} /** No actual completed events queue, unlike other types */ - /** Number of pending events (+1 if we're not shutdown) */ - gpr_atm pending_events; + /** Number of pending events (+1 if we're not shutdown). + Initial count is dropped by grpc_completion_queue_shutdown. */ + grpc_core::Atomic pending_events{1}; /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ - gpr_atm things_queued_ever; + grpc_core::Atomic things_queued_ever{0}; /** 0 initially. 1 once we initiated shutdown */ - bool shutdown_called; + bool shutdown_called = false; /** A callback that gets invoked when the CQ completes shutdown */ grpc_experimental_completion_queue_functor* shutdown_callback; -} cq_callback_data; +}; + +} // namespace /* Completion queue structure */ struct grpc_completion_queue { @@ -408,7 +446,7 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, storage->done(storage->done_arg, storage); ret = 1; cq_next_data* cqd = static_cast DATA_FROM_CQ(cq); - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(cq); @@ -422,31 +460,21 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, return ret; } -static void cq_event_queue_init(grpc_cq_event_queue* q) { - gpr_mpscq_init(&q->queue); - q->queue_lock = GPR_SPINLOCK_INITIALIZER; - gpr_atm_no_barrier_store(&q->num_queue_items, 0); +bool CqEventQueue::Push(grpc_cq_completion* c) { + gpr_mpscq_push(&queue_, reinterpret_cast(c)); + return num_queue_items_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED) == 0; } -static void cq_event_queue_destroy(grpc_cq_event_queue* q) { - gpr_mpscq_destroy(&q->queue); -} - -static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) { - gpr_mpscq_push(&q->queue, reinterpret_cast(c)); - return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0; -} - -static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { +grpc_cq_completion* CqEventQueue::Pop() { grpc_cq_completion* c = nullptr; - if (gpr_spinlock_trylock(&q->queue_lock)) { + if (gpr_spinlock_trylock(&queue_lock_)) { GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(); bool is_empty = false; c = reinterpret_cast( - gpr_mpscq_pop_and_check_end(&q->queue, &is_empty)); - gpr_spinlock_unlock(&q->queue_lock); + gpr_mpscq_pop_and_check_end(&queue_, &is_empty)); + gpr_spinlock_unlock(&queue_lock_); if (c == nullptr && !is_empty) { GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(); @@ -456,18 +484,12 @@ static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { } if (c) { - gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); + num_queue_items_.FetchSub(1, grpc_core::MemoryOrder::RELAXED); } return c; } -/* Note: The counter is not incremented/decremented atomically with push/pop. - * The count is only eventually consistent */ -static long cq_event_queue_num_items(grpc_cq_event_queue* q) { - return static_cast(gpr_atm_no_barrier_load(&q->num_queue_items)); -} - grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, grpc_experimental_completion_queue_functor* shutdown_callback) { @@ -507,49 +529,33 @@ grpc_completion_queue* grpc_completion_queue_create_internal( static void cq_init_next( void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { - cq_next_data* cqd = static_cast(data); - /* Initial count is dropped by grpc_completion_queue_shutdown */ - gpr_atm_no_barrier_store(&cqd->pending_events, 1); - cqd->shutdown_called = false; - gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); - cq_event_queue_init(&cqd->queue); + new (data) cq_next_data(); } static void cq_destroy_next(void* data) { cq_next_data* cqd = static_cast(data); - GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); - cq_event_queue_destroy(&cqd->queue); + cqd->~cq_next_data(); } static void cq_init_pluck( void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { - cq_pluck_data* cqd = static_cast(data); - /* Initial count is dropped by grpc_completion_queue_shutdown */ - gpr_atm_no_barrier_store(&cqd->pending_events, 1); - cqd->completed_tail = &cqd->completed_head; - cqd->completed_head.next = (uintptr_t)cqd->completed_tail; - gpr_atm_no_barrier_store(&cqd->shutdown, 0); - cqd->shutdown_called = false; - cqd->num_pluckers = 0; - gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); + new (data) cq_pluck_data(); } static void cq_destroy_pluck(void* data) { cq_pluck_data* cqd = static_cast(data); - GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); + cqd->~cq_pluck_data(); } static void cq_init_callback( void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { - cq_callback_data* cqd = static_cast(data); - /* Initial count is dropped by grpc_completion_queue_shutdown */ - gpr_atm_no_barrier_store(&cqd->pending_events, 1); - cqd->shutdown_called = false; - gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); - cqd->shutdown_callback = shutdown_callback; + new (data) cq_callback_data(shutdown_callback); } -static void cq_destroy_callback(void* data) {} +static void cq_destroy_callback(void* data) { + cq_callback_data* cqd = static_cast(data); + cqd->~cq_callback_data(); +} grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) { return cq->vtable->cq_completion_type; @@ -632,37 +638,19 @@ static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {} #endif -/* Atomically increments a counter only if the counter is not zero. Returns - * true if the increment was successful; false if the counter is zero */ -static bool atm_inc_if_nonzero(gpr_atm* counter) { - while (true) { - gpr_atm count = gpr_atm_acq_load(counter); - /* If zero, we are done. If not, we must to a CAS (instead of an atomic - * increment) to maintain the contract: do not increment the counter if it - * is zero. */ - if (count == 0) { - return false; - } else if (gpr_atm_full_cas(counter, count, count + 1)) { - break; - } - } - - return true; -} - static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) { cq_next_data* cqd = static_cast DATA_FROM_CQ(cq); - return atm_inc_if_nonzero(&cqd->pending_events); + return cqd->pending_events.IncrementIfNonzero(); } static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) { cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); - return atm_inc_if_nonzero(&cqd->pending_events); + return cqd->pending_events.IncrementIfNonzero(); } static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) { cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); - return atm_inc_if_nonzero(&cqd->pending_events); + return cqd->pending_events.IncrementIfNonzero(); } bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { @@ -716,17 +704,14 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, gpr_tls_set(&g_cached_event, (intptr_t)storage); } else { /* Add the completion to the queue */ - bool is_first = cq_event_queue_push(&cqd->queue, storage); - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - + bool is_first = cqd->queue.Push(storage); + cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED); /* Since we do not hold the cq lock here, it is important to do an 'acquire' load here (instead of a 'no_barrier' load) to match with the release store - (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next + (done via pending_events.FetchSub(1, ACQ_REL)) in cq_shutdown_next */ - bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; - - if (!will_definitely_shutdown) { + if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 1) { /* Only kick if this is the first item queued */ if (is_first) { gpr_mu_lock(cq->mu); @@ -740,7 +725,8 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, GRPC_ERROR_UNREF(kick_error); } } - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == + 1) { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(cq); @@ -749,7 +735,7 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, } } else { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); - gpr_atm_rel_store(&cqd->pending_events, 0); + cqd->pending_events.Store(0, grpc_core::MemoryOrder::RELEASE); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(cq); gpr_mu_unlock(cq->mu); @@ -795,12 +781,12 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, cq_check_tag(cq, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED); cqd->completed_tail->next = ((uintptr_t)storage) | (1u & cqd->completed_tail->next); cqd->completed_tail = storage; - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { cq_finish_shutdown_pluck(cq); gpr_mu_unlock(cq->mu); } else { @@ -856,8 +842,8 @@ static void cq_end_op_for_callback( cq_check_tag(cq, tag, true); /* Used in debug builds only */ - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED); + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { cq_finish_shutdown_callback(cq); } @@ -893,20 +879,20 @@ class ExecCtxNext : public grpc_core::ExecCtx { cq_next_data* cqd = static_cast DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == nullptr); - gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + intptr_t current_last_seen_things_queued_ever = + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED); /* Pop a cq_completion from the queue. Returns NULL if the queue is empty * might return NULL in some cases even if the queue is not empty; but * that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ - a->stolen_completion = cq_event_queue_pop(&cqd->queue); + a->stolen_completion = cqd->queue.Pop(); if (a->stolen_completion != nullptr) { return true; } @@ -965,7 +951,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { - gpr_atm_no_barrier_load(&cqd->things_queued_ever), + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED), cq, deadline_millis, nullptr, @@ -985,7 +971,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, break; } - grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue); + grpc_cq_completion* c = cqd->queue.Pop(); if (c != nullptr) { ret.type = GRPC_OP_COMPLETE; @@ -999,16 +985,16 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, so that the thread comes back quickly from poll to make a second attempt at popping. Not doing this can potentially deadlock this thread forever (if the deadline is infinity) */ - if (cq_event_queue_num_items(&cqd->queue) > 0) { + if (cqd->queue.num_items() > 0) { iteration_deadline = 0; } } - if (gpr_atm_acq_load(&cqd->pending_events) == 0) { + if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ - if (cq_event_queue_num_items(&cqd->queue) > 0) { + if (cqd->queue.num_items() > 0) { /* Go to the beginning of the loop. No point doing a poll because (cq->shutdown == true) is only possible when there is no pending work (i.e cq->pending_events == 0) and any outstanding completion @@ -1049,8 +1035,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, is_finished_arg.first_loop = false; } - if (cq_event_queue_num_items(&cqd->queue) > 0 && - gpr_atm_acq_load(&cqd->pending_events) > 0) { + if (cqd->queue.num_items() > 0 && + cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) > 0) { gpr_mu_lock(cq->mu); cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr); gpr_mu_unlock(cq->mu); @@ -1074,7 +1060,7 @@ static void cq_finish_shutdown_next(grpc_completion_queue* cq) { cq_next_data* cqd = static_cast DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); - GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); + GPR_ASSERT(cqd->pending_events.Load(grpc_core::MemoryOrder::RELAXED) == 0); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); } @@ -1096,10 +1082,10 @@ static void cq_shutdown_next(grpc_completion_queue* cq) { return; } cqd->shutdown_called = true; - /* Doing a full_fetch_add (i.e acq/release) here to match with + /* Doing acq/release FetchSub here to match with * cq_begin_op_for_next and cq_end_op_for_next functions which read/write * on this counter without necessarily holding a lock on cq */ - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { cq_finish_shutdown_next(cq); } gpr_mu_unlock(cq->mu); @@ -1148,12 +1134,12 @@ class ExecCtxPluck : public grpc_core::ExecCtx { GPR_ASSERT(a->stolen_completion == nullptr); gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED); grpc_cq_completion* c; grpc_cq_completion* prev = &cqd->completed_head; while ((c = (grpc_cq_completion*)(prev->next & @@ -1209,7 +1195,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_mu_lock(cq->mu); grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { - gpr_atm_no_barrier_load(&cqd->things_queued_ever), + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED), cq, deadline_millis, nullptr, @@ -1246,7 +1232,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, } prev = c; } - if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + if (cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)) { gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; @@ -1309,8 +1295,8 @@ static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) { cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); - GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); - gpr_atm_no_barrier_store(&cqd->shutdown, 1); + GPR_ASSERT(!cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)); + cqd->shutdown.Store(1, grpc_core::MemoryOrder::RELAXED); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); } @@ -1334,7 +1320,7 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) { return; } cqd->shutdown_called = true; - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { cq_finish_shutdown_pluck(cq); } gpr_mu_unlock(cq->mu); @@ -1368,7 +1354,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { return; } cqd->shutdown_called = true; - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { gpr_mu_unlock(cq->mu); cq_finish_shutdown_callback(cq); } else { diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 1b524bc3e83..e116cf6e7d2 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -138,7 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { } internal::Call call_; - internal::ServerReactor* reactor_; + internal::ServerReactor* const reactor_; bool has_tag_; void* tag_; void* core_cq_tag_; @@ -200,12 +200,17 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { cancelled_ = 1; } - if (cancelled_ && (reactor_ != nullptr)) { + // Decide whether to call the cancel callback before releasing the lock + bool call_cancel = (cancelled_ != 0); + + // Release the lock since we are going to be calling a callback and + // interceptors now + lock.unlock(); + + if (call_cancel && (reactor_ != nullptr)) { reactor_->OnCancel(); } - /* Release the lock since we are going to be running through interceptors now - */ - lock.unlock(); + /* Add interception point and run through interceptors */ interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_CLOSE); diff --git a/src/cpp/util/byte_buffer_cc.cc b/src/cpp/util/byte_buffer_cc.cc index a7e16454352..fb705906455 100644 --- a/src/cpp/util/byte_buffer_cc.cc +++ b/src/cpp/util/byte_buffer_cc.cc @@ -43,18 +43,4 @@ Status ByteBuffer::Dump(std::vector* slices) const { return Status::OK; } -ByteBuffer::ByteBuffer(const ByteBuffer& buf) : buffer_(nullptr) { - operator=(buf); -} - -ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) { - if (this != &buf) { - Clear(); // first remove existing data - } - if (buf.buffer_) { - buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy - } - return *this; -} - } // namespace grpc diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index b805f4277b0..27b98362c11 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -212,50 +212,39 @@ class BuildExt(build_ext.build_ext): LINK_OPTIONS = {} def build_extensions(self): + + def compiler_ok_with_extra_std(): + """Test if default compiler is okay with specifying c++ version + when invoked in C mode. GCC is okay with this, while clang is not. + """ + cc_test = subprocess.Popen( + ['cc', '-x', 'c', '-std=c++11', '-'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + _, cc_err = cc_test.communicate(input='int main(){return 0;}') + return not 'invalid argument' in str(cc_err) + # This special conditioning is here due to difference of compiler # behavior in gcc and clang. The clang doesn't take --stdc++11 # flags but gcc does. Since the setuptools of Python only support # all C or all C++ compilation, the mix of C and C++ will crash. - # *By default*, the macOS use clang and Linux use gcc, that's why - # the special condition here is checking platform. - if "darwin" in sys.platform: - config = os.environ.get('CONFIG', 'opt') - target_path = os.path.abspath( - os.path.join( - os.path.dirname(os.path.realpath(__file__)), '..', '..', - '..', 'libs', config)) - targets = [ - os.path.join(target_path, 'libboringssl.a'), - os.path.join(target_path, 'libares.a'), - os.path.join(target_path, 'libgpr.a'), - os.path.join(target_path, 'libgrpc.a') - ] - # Running make separately for Mac means we lose all - # Extension.define_macros configured in setup.py. Re-add the macro - # for gRPC Core's fork handlers. - # TODO(ericgribkoff) Decide what to do about the other missing core - # macros, including GRPC_ENABLE_FORK_SUPPORT, which defaults to 1 - # on Linux but remains unset on Mac. - extra_defines = [ - 'EXTRA_DEFINES="GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1"' - ] - # Ensure the BoringSSL are built instead of using system provided - # libraries. It prevents dependency issues while distributing to - # Mac users who use MacPorts to manage their libraries. #17002 - mod_env = dict(os.environ) - mod_env['REQUIRE_CUSTOM_LIBRARIES_opt'] = '1' - make_process = subprocess.Popen( - ['make'] + extra_defines + targets, - env=mod_env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - make_out, make_err = make_process.communicate() - if make_out and make_process.returncode != 0: - sys.stdout.write(str(make_out) + '\n') - if make_err: - sys.stderr.write(str(make_err) + '\n') - if make_process.returncode != 0: - raise Exception("make command failed!") + # *By default*, macOS and FreBSD use clang and Linux use gcc + # + # If we are not using a permissive compiler that's OK with being + # passed wrong std flags, swap out compile function by adding a filter + # for it. + if not compiler_ok_with_extra_std(): + old_compile = self.compiler._compile + + def new_compile(obj, src, ext, cc_args, extra_postargs, pp_opts): + if src[-2:] == '.c': + extra_postargs = [ + arg for arg in extra_postargs if not '-std=c++' in arg + ] + return old_compile(obj, src, ext, cc_args, extra_postargs, + pp_opts) + + self.compiler._compile = new_compile compiler = self.compiler.compiler_type if compiler in BuildExt.C_OPTIONS: diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 6e300ee6c5d..9224b2ac672 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -19,6 +19,7 @@ import logging import threading import time +from concurrent import futures import six import grpc @@ -565,8 +566,8 @@ def _send_message_callback_to_blocking_iterator_adapter( def _select_thread_pool_for_behavior(behavior, default_thread_pool): - if hasattr(behavior, 'experimental_thread_pool' - ) and behavior.experimental_thread_pool is not None: + if hasattr(behavior, 'experimental_thread_pool') and isinstance( + behavior.experimental_thread_pool, futures.ThreadPoolExecutor): return behavior.experimental_thread_pool else: return default_thread_pool diff --git a/src/python/grpcio_tests/tests/unit/thread_pool.py b/src/python/grpcio_tests/tests/unit/thread_pool.py index e99efc3e927..bc0f0e523bc 100644 --- a/src/python/grpcio_tests/tests/unit/thread_pool.py +++ b/src/python/grpcio_tests/tests/unit/thread_pool.py @@ -16,7 +16,7 @@ import threading from concurrent import futures -class RecordingThreadPool(futures.Executor): +class RecordingThreadPool(futures.ThreadPoolExecutor): """A thread pool that records if used.""" def __init__(self, max_workers): diff --git a/templates/Makefile.template b/templates/Makefile.template index 8bb06176bf8..31cf14a71c1 100644 --- a/templates/Makefile.template +++ b/templates/Makefile.template @@ -715,7 +715,7 @@ ifeq ($(HAS_PKG_CONFIG),true) PROTOBUF_PKG_CONFIG = true PC_REQUIRES_GRPCXX = protobuf - CPPFLAGS := $(shell $(PKG_CONFIG) --cflags protobuf) $(CPPFLAGS) + CPPFLAGS := $(CPPFLAGS) $(shell $(PKG_CONFIG) --cflags protobuf) LDFLAGS_PROTOBUF_PKG_CONFIG = $(shell $(PKG_CONFIG) --libs-only-L protobuf) ifeq ($(SYSTEM),Linux) ifneq ($(LDFLAGS_PROTOBUF_PKG_CONFIG),) diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index f8ffb551805..99cfec7adf6 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -65,7 +65,7 @@ #define HTTP1_DETAIL_MSG "Trying to connect an http1.x server" -/* TODO(zyc) Check the content of incomming data instead of using this length */ +/* TODO(zyc) Check the content of incoming data instead of using this length */ /* The 'bad' server will start sending responses after reading this amount of * data from the client. */ #define SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD (size_t)200 diff --git a/test/core/end2end/tests/bad_ping.cc b/test/core/end2end/tests/bad_ping.cc index 98d893f64d9..a07bf16876a 100644 --- a/test/core/end2end/tests/bad_ping.cc +++ b/test/core/end2end/tests/bad_ping.cc @@ -312,7 +312,7 @@ static void test_pings_without_data(grpc_end2end_test_config config) { CQ_EXPECT_COMPLETION(cqv, tag(101), 1); cq_verify(cqv); - // Send too many pings to the server similar to the prevous test case. + // Send too many pings to the server similar to the previous test case. // However, since we set the MAX_PINGS_WITHOUT_DATA at the client side, only // MAX_PING_STRIKES will actually be sent and the rpc will still succeed. int i; diff --git a/test/core/tsi/alts/handshaker/alts_tsi_utils_test.cc b/test/core/tsi/alts/handshaker/alts_tsi_utils_test.cc index 98c5d236415..8d75d35368d 100644 --- a/test/core/tsi/alts/handshaker/alts_tsi_utils_test.cc +++ b/test/core/tsi/alts/handshaker/alts_tsi_utils_test.cc @@ -51,7 +51,7 @@ static void deserialize_response_test() { GPR_ASSERT(grpc_gcp_handshaker_resp_equals(resp, decoded_resp)); grpc_byte_buffer_destroy(buffer); - /* Invalid serializaiton. */ + /* Invalid serialization. */ grpc_slice bad_slice = grpc_slice_split_head(&slice, GRPC_SLICE_LENGTH(slice) - 1); buffer = grpc_raw_byte_buffer_create(&bad_slice, 1 /* number of slices */); diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 1871e1375ed..7f9fd29026e 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -113,6 +113,7 @@ class ServiceA final { virtual ~experimental_async_interface() {} // MethodA1 leading comment 1 virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) = 0; + virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function) = 0; // MethodA1 trailing comment 1 // MethodA2 detached leading comment 1 // @@ -182,6 +183,7 @@ class ServiceA final { public StubInterface::experimental_async_interface { public: void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) override; + void MethodA1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function) override; void MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::experimental::ClientWriteReactor< ::grpc::testing::Request>* reactor) override; void MethodA3(::grpc::ClientContext* context, ::grpc::testing::Request* request, ::grpc::experimental::ClientReadReactor< ::grpc::testing::Response>* reactor) override; void MethodA4(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::grpc::testing::Request,::grpc::testing::Response>* reactor) override; @@ -714,6 +716,7 @@ class ServiceB final { virtual ~experimental_async_interface() {} // MethodB1 leading comment 1 virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) = 0; + virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function) = 0; // MethodB1 trailing comment 1 }; virtual class experimental_async_interface* experimental_async() { return nullptr; } @@ -735,6 +738,7 @@ class ServiceB final { public StubInterface::experimental_async_interface { public: void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) override; + void MethodB1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function) override; private: friend class Stub; explicit experimental_async(Stub* stub): stub_(stub) { } diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index a9db19dfe8e..d80fa33a83a 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -150,6 +150,7 @@ grpc_cc_test( "gtest", ], deps = [ + ":interceptors_util", ":test_service_impl", "//:gpr", "//:grpc", diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 30db5b8c01c..893d009392d 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -16,6 +16,7 @@ * */ +#include #include #include #include @@ -35,9 +36,11 @@ #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#include "test/cpp/end2end/interceptors_util.h" #include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/util/byte_buffer_proto_helper.h" #include "test/cpp/util/string_ref_helper.h" +#include "test/cpp/util/test_credentials_provider.h" #include @@ -60,11 +63,17 @@ enum class Protocol { INPROC, TCP }; class TestScenario { public: - TestScenario(bool serve_callback, Protocol protocol) - : callback_server(serve_callback), protocol(protocol) {} + TestScenario(bool serve_callback, Protocol protocol, bool intercept, + const grpc::string& creds_type) + : callback_server(serve_callback), + protocol(protocol), + use_interceptors(intercept), + credentials_type(creds_type) {} void Log() const; bool callback_server; Protocol protocol; + bool use_interceptors; + const grpc::string credentials_type; }; static std::ostream& operator<<(std::ostream& out, @@ -87,15 +96,18 @@ class ClientCallbackEnd2endTest void SetUp() override { ServerBuilder builder; + auto server_creds = GetCredentialsProvider()->GetServerCredentials( + GetParam().credentials_type); + // TODO(vjpai): Support testing of AuthMetadataProcessor + if (GetParam().protocol == Protocol::TCP) { if (!grpc_iomgr_run_in_background()) { do_not_test_ = true; return; } - int port = grpc_pick_unused_port_or_die(); - server_address_ << "localhost:" << port; - builder.AddListeningPort(server_address_.str(), - InsecureServerCredentials()); + picked_port_ = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << picked_port_; + builder.AddListeningPort(server_address_.str(), server_creds); } if (!GetParam().callback_server) { builder.RegisterService(&service_); @@ -103,31 +115,61 @@ class ClientCallbackEnd2endTest builder.RegisterService(&callback_service_); } + if (GetParam().use_interceptors) { + std::vector< + std::unique_ptr> + creators; + // Add 20 dummy server interceptors + creators.reserve(20); + for (auto i = 0; i < 20; i++) { + creators.push_back(std::unique_ptr( + new DummyInterceptorFactory())); + } + builder.experimental().SetInterceptorCreators(std::move(creators)); + } + server_ = builder.BuildAndStart(); is_server_started_ = true; } void ResetStub() { ChannelArguments args; + auto channel_creds = GetCredentialsProvider()->GetChannelCredentials( + GetParam().credentials_type, &args); switch (GetParam().protocol) { case Protocol::TCP: - channel_ = - CreateChannel(server_address_.str(), InsecureChannelCredentials()); + if (!GetParam().use_interceptors) { + channel_ = + CreateCustomChannel(server_address_.str(), channel_creds, args); + } else { + channel_ = CreateCustomChannelWithInterceptors( + server_address_.str(), channel_creds, args, + CreateDummyClientInterceptors()); + } break; case Protocol::INPROC: - channel_ = server_->InProcessChannel(args); + if (!GetParam().use_interceptors) { + channel_ = server_->InProcessChannel(args); + } else { + channel_ = server_->experimental().InProcessChannelWithInterceptors( + args, CreateDummyClientInterceptors()); + } break; default: assert(false); } stub_ = grpc::testing::EchoTestService::NewStub(channel_); generic_stub_.reset(new GenericStub(channel_)); + DummyInterceptor::Reset(); } void TearDown() override { if (is_server_started_) { server_->Shutdown(); } + if (picked_port_ > 0) { + grpc_recycle_unused_port(picked_port_); + } } void SendRpcs(int num_rpcs, bool with_binary_metadata) { @@ -178,6 +220,36 @@ class ClientCallbackEnd2endTest } } + void SendRpcsRawReq(int num_rpcs) { + grpc::string test_string("Hello raw world."); + EchoRequest request; + request.set_message(test_string); + std::unique_ptr send_buf = SerializeToByteBuffer(&request); + + for (int i = 0; i < num_rpcs; i++) { + EchoResponse response; + ClientContext cli_ctx; + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->Echo( + &cli_ctx, send_buf.get(), &response, + [&request, &response, &done, &mu, &cv](Status s) { + GPR_ASSERT(s.ok()); + + EXPECT_EQ(request.message(), response.message()); + std::lock_guard l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock l(mu); + while (!done) { + cv.wait(l); + } + } + } + void SendRpcsGeneric(int num_rpcs, bool maybe_except) { const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); grpc::string test_string(""); @@ -283,6 +355,7 @@ class ClientCallbackEnd2endTest } bool do_not_test_{false}; bool is_server_started_{false}; + int picked_port_{0}; std::shared_ptr channel_; std::unique_ptr stub_; std::unique_ptr generic_stub_; @@ -304,6 +377,12 @@ TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { SendRpcs(10, false); } +TEST_P(ClientCallbackEnd2endTest, SequentialRpcsRawReq) { + MAYBE_SKIP_TEST; + ResetStub(); + SendRpcsRawReq(10); +} + TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) { MAYBE_SKIP_TEST; ResetStub(); @@ -419,136 +498,569 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { while (!done) { cv.wait(l); } + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } } -TEST_P(ClientCallbackEnd2endTest, RequestStream) { +TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) { MAYBE_SKIP_TEST; ResetStub(); - class Client : public grpc::experimental::ClientWriteReactor { - public: - explicit Client(grpc::testing::EchoTestService::Stub* stub) { - context_.set_initial_metadata_corked(true); - stub->experimental_async()->RequestStream(&context_, &response_, this); - StartCall(); - request_.set_message("Hello server."); - StartWrite(&request_); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.AddMetadata(kServerTryCancelRequest, + grpc::to_string(CANCEL_BEFORE_PROCESSING)); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->Echo( + &context, &request, &response, [&done, &mu, &cv](Status s) { + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + std::lock_guard l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock l(mu); + while (!done) { + cv.wait(l); + } +} + +struct ClientCancelInfo { + bool cancel{false}; + int ops_before_cancel; + ClientCancelInfo() : cancel{false} {} + // Allow the single-op version to be non-explicit for ease of use + ClientCancelInfo(int ops) : cancel{true}, ops_before_cancel{ops} {} +}; + +class WriteClient : public grpc::experimental::ClientWriteReactor { + public: + WriteClient(grpc::testing::EchoTestService::Stub* stub, + ServerTryCancelRequestPhase server_try_cancel, + int num_msgs_to_send, ClientCancelInfo client_cancel = {}) + : server_try_cancel_(server_try_cancel), + num_msgs_to_send_(num_msgs_to_send), + client_cancel_{client_cancel} { + grpc::string msg{"Hello server."}; + for (int i = 0; i < num_msgs_to_send; i++) { + desired_ += msg; } - void OnWriteDone(bool ok) override { - writes_left_--; - if (writes_left_ > 1) { - StartWrite(&request_); - } else if (writes_left_ == 1) { - StartWriteLast(&request_, WriteOptions()); - } + if (server_try_cancel != DO_NOT_CANCEL) { + // Send server_try_cancel value in the client metadata + context_.AddMetadata(kServerTryCancelRequest, + grpc::to_string(server_try_cancel)); } - void OnDone(const Status& s) override { + context_.set_initial_metadata_corked(true); + stub->experimental_async()->RequestStream(&context_, &response_, this); + StartCall(); + request_.set_message(msg); + MaybeWrite(); + } + void OnWriteDone(bool ok) override { + if (ok) { + num_msgs_sent_++; + MaybeWrite(); + } + } + void OnDone(const Status& s) override { + gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_); + int num_to_send = + (client_cancel_.cancel) + ? std::min(num_msgs_to_send_, client_cancel_.ops_before_cancel) + : num_msgs_to_send_; + switch (server_try_cancel_) { + case CANCEL_BEFORE_PROCESSING: + case CANCEL_DURING_PROCESSING: + // If the RPC is canceled by server before / during messages from the + // client, it means that the client most likely did not get a chance to + // send all the messages it wanted to send. i.e num_msgs_sent <= + // num_msgs_to_send + EXPECT_LE(num_msgs_sent_, num_to_send); + break; + case DO_NOT_CANCEL: + case CANCEL_AFTER_PROCESSING: + // If the RPC was not canceled or canceled after all messages were read + // by the server, the client did get a chance to send all its messages + EXPECT_EQ(num_msgs_sent_, num_to_send); + break; + default: + assert(false); + break; + } + if ((server_try_cancel_ == DO_NOT_CANCEL) && !client_cancel_.cancel) { EXPECT_TRUE(s.ok()); - EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server."); - std::unique_lock l(mu_); - done_ = true; - cv_.notify_one(); + EXPECT_EQ(response_.message(), desired_); + } else { + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); } - void Await() { - std::unique_lock l(mu_); - while (!done_) { - cv_.wait(l); - } + std::unique_lock l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock l(mu_); + while (!done_) { + cv_.wait(l); } + } - private: - EchoRequest request_; - EchoResponse response_; - ClientContext context_; - int writes_left_{3}; - std::mutex mu_; - std::condition_variable cv_; - bool done_ = false; - } test{stub_.get()}; + private: + void MaybeWrite() { + if (client_cancel_.cancel && + num_msgs_sent_ == client_cancel_.ops_before_cancel) { + context_.TryCancel(); + } else if (num_msgs_to_send_ > num_msgs_sent_ + 1) { + StartWrite(&request_); + } else if (num_msgs_to_send_ == num_msgs_sent_ + 1) { + StartWriteLast(&request_, WriteOptions()); + } + } + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + const ServerTryCancelRequestPhase server_try_cancel_; + int num_msgs_sent_{0}; + const int num_msgs_to_send_; + grpc::string desired_; + const ClientCancelInfo client_cancel_; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; +}; +TEST_P(ClientCallbackEnd2endTest, RequestStream) { + MAYBE_SKIP_TEST; + ResetStub(); + WriteClient test{stub_.get(), DO_NOT_CANCEL, 3}; test.Await(); + // Make sure that the server interceptors were not notified to cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } } -TEST_P(ClientCallbackEnd2endTest, ResponseStream) { +TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) { MAYBE_SKIP_TEST; ResetStub(); - class Client : public grpc::experimental::ClientReadReactor { - public: - explicit Client(grpc::testing::EchoTestService::Stub* stub) { - request_.set_message("Hello client "); - stub->experimental_async()->ResponseStream(&context_, &request_, this); - StartCall(); - StartRead(&response_); + WriteClient test{stub_.get(), DO_NOT_CANCEL, 3, {2}}; + test.Await(); + // Make sure that the server interceptors got the cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Server to cancel before doing reading the request +TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) { + MAYBE_SKIP_TEST; + ResetStub(); + WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1}; + test.Await(); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Server to cancel while reading a request from the stream in parallel +TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) { + MAYBE_SKIP_TEST; + ResetStub(); + WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10}; + test.Await(); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Server to cancel after reading all the requests but before returning to the +// client +TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) { + MAYBE_SKIP_TEST; + ResetStub(); + WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4}; + test.Await(); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +class ReadClient : public grpc::experimental::ClientReadReactor { + public: + ReadClient(grpc::testing::EchoTestService::Stub* stub, + ServerTryCancelRequestPhase server_try_cancel, + ClientCancelInfo client_cancel = {}) + : server_try_cancel_(server_try_cancel), client_cancel_{client_cancel} { + if (server_try_cancel_ != DO_NOT_CANCEL) { + // Send server_try_cancel value in the client metadata + context_.AddMetadata(kServerTryCancelRequest, + grpc::to_string(server_try_cancel)); } - void OnReadDone(bool ok) override { - if (!ok) { + request_.set_message("Hello client "); + stub->experimental_async()->ResponseStream(&context_, &request_, this); + if (client_cancel_.cancel && + reads_complete_ == client_cancel_.ops_before_cancel) { + context_.TryCancel(); + } + // Even if we cancel, read until failure because there might be responses + // pending + StartRead(&response_); + StartCall(); + } + void OnReadDone(bool ok) override { + if (!ok) { + if (server_try_cancel_ == DO_NOT_CANCEL && !client_cancel_.cancel) { EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend); - } else { - EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); - EXPECT_EQ(response_.message(), - request_.message() + grpc::to_string(reads_complete_)); - reads_complete_++; - StartRead(&response_); } + } else { + EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); + EXPECT_EQ(response_.message(), + request_.message() + grpc::to_string(reads_complete_)); + reads_complete_++; + if (client_cancel_.cancel && + reads_complete_ == client_cancel_.ops_before_cancel) { + context_.TryCancel(); + } + // Even if we cancel, read until failure because there might be responses + // pending + StartRead(&response_); } - void OnDone(const Status& s) override { - EXPECT_TRUE(s.ok()); - std::unique_lock l(mu_); - done_ = true; - cv_.notify_one(); + } + void OnDone(const Status& s) override { + gpr_log(GPR_INFO, "Read %d messages", reads_complete_); + switch (server_try_cancel_) { + case DO_NOT_CANCEL: + if (!client_cancel_.cancel || client_cancel_.ops_before_cancel > + kServerDefaultResponseStreamsToSend) { + EXPECT_TRUE(s.ok()); + EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend); + } else { + EXPECT_GE(reads_complete_, client_cancel_.ops_before_cancel); + EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); + // Status might be ok or cancelled depending on whether server + // sent status before client cancel went through + if (!s.ok()) { + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + } + } + break; + case CANCEL_BEFORE_PROCESSING: + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + EXPECT_EQ(reads_complete_, 0); + break; + case CANCEL_DURING_PROCESSING: + case CANCEL_AFTER_PROCESSING: + // If server canceled while writing messages, client must have read + // less than or equal to the expected number of messages. Even if the + // server canceled after writing all messages, the RPC may be canceled + // before the Client got a chance to read all the messages. + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); + break; + default: + assert(false); } - void Await() { - std::unique_lock l(mu_); - while (!done_) { - cv_.wait(l); - } + std::unique_lock l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock l(mu_); + while (!done_) { + cv_.wait(l); } + } - private: - EchoRequest request_; - EchoResponse response_; - ClientContext context_; - int reads_complete_{0}; - std::mutex mu_; - std::condition_variable cv_; - bool done_ = false; - } test{stub_.get()}; + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + const ServerTryCancelRequestPhase server_try_cancel_; + int reads_complete_{0}; + const ClientCancelInfo client_cancel_; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; +}; +TEST_P(ClientCallbackEnd2endTest, ResponseStream) { + MAYBE_SKIP_TEST; + ResetStub(); + ReadClient test{stub_.get(), DO_NOT_CANCEL}; test.Await(); + // Make sure that the server interceptors were not notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } } +TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) { + MAYBE_SKIP_TEST; + ResetStub(); + ReadClient test{stub_.get(), DO_NOT_CANCEL, 2}; + test.Await(); + // Because cancel in this case races with server finish, we can't be sure that + // server interceptors even see cancellation +} + +// Server to cancel before sending any response messages +TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) { + MAYBE_SKIP_TEST; + ResetStub(); + ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING}; + test.Await(); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Server to cancel while writing a response to the stream in parallel +TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) { + MAYBE_SKIP_TEST; + ResetStub(); + ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING}; + test.Await(); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Server to cancel after writing all the respones to the stream but before +// returning to the client +TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) { + MAYBE_SKIP_TEST; + ResetStub(); + ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING}; + test.Await(); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +class BidiClient + : public grpc::experimental::ClientBidiReactor { + public: + BidiClient(grpc::testing::EchoTestService::Stub* stub, + ServerTryCancelRequestPhase server_try_cancel, + int num_msgs_to_send, ClientCancelInfo client_cancel = {}) + : server_try_cancel_(server_try_cancel), + msgs_to_send_{num_msgs_to_send}, + client_cancel_{client_cancel} { + if (server_try_cancel_ != DO_NOT_CANCEL) { + // Send server_try_cancel value in the client metadata + context_.AddMetadata(kServerTryCancelRequest, + grpc::to_string(server_try_cancel)); + } + request_.set_message("Hello fren "); + stub->experimental_async()->BidiStream(&context_, this); + MaybeWrite(); + StartRead(&response_); + StartCall(); + } + void OnReadDone(bool ok) override { + if (!ok) { + if (server_try_cancel_ == DO_NOT_CANCEL) { + if (!client_cancel_.cancel) { + EXPECT_EQ(reads_complete_, msgs_to_send_); + } else { + EXPECT_LE(reads_complete_, writes_complete_); + } + } + } else { + EXPECT_LE(reads_complete_, msgs_to_send_); + EXPECT_EQ(response_.message(), request_.message()); + reads_complete_++; + StartRead(&response_); + } + } + void OnWriteDone(bool ok) override { + if (server_try_cancel_ == DO_NOT_CANCEL) { + EXPECT_TRUE(ok); + } else if (!ok) { + return; + } + writes_complete_++; + MaybeWrite(); + } + void OnDone(const Status& s) override { + gpr_log(GPR_INFO, "Sent %d messages", writes_complete_); + gpr_log(GPR_INFO, "Read %d messages", reads_complete_); + switch (server_try_cancel_) { + case DO_NOT_CANCEL: + if (!client_cancel_.cancel || + client_cancel_.ops_before_cancel > msgs_to_send_) { + EXPECT_TRUE(s.ok()); + EXPECT_EQ(writes_complete_, msgs_to_send_); + EXPECT_EQ(reads_complete_, writes_complete_); + } else { + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + EXPECT_EQ(writes_complete_, client_cancel_.ops_before_cancel); + EXPECT_LE(reads_complete_, writes_complete_); + } + break; + case CANCEL_BEFORE_PROCESSING: + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + // The RPC is canceled before the server did any work or returned any + // reads, but it's possible that some writes took place first from the + // client + EXPECT_LE(writes_complete_, msgs_to_send_); + EXPECT_EQ(reads_complete_, 0); + break; + case CANCEL_DURING_PROCESSING: + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + EXPECT_LE(writes_complete_, msgs_to_send_); + EXPECT_LE(reads_complete_, writes_complete_); + break; + case CANCEL_AFTER_PROCESSING: + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + EXPECT_EQ(writes_complete_, msgs_to_send_); + // The Server canceled after reading the last message and after writing + // the message to the client. However, the RPC cancellation might have + // taken effect before the client actually read the response. + EXPECT_LE(reads_complete_, writes_complete_); + break; + default: + assert(false); + } + std::unique_lock l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + void MaybeWrite() { + if (client_cancel_.cancel && + writes_complete_ == client_cancel_.ops_before_cancel) { + context_.TryCancel(); + } else if (writes_complete_ == msgs_to_send_) { + StartWritesDone(); + } else { + StartWrite(&request_); + } + } + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + const ServerTryCancelRequestPhase server_try_cancel_; + int reads_complete_{0}; + int writes_complete_{0}; + const int msgs_to_send_; + const ClientCancelInfo client_cancel_; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; +}; + TEST_P(ClientCallbackEnd2endTest, BidiStream) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test{stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend}; + test.Await(); + // Make sure that the server interceptors were not notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } +} + +TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test{stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, 2}; + test.Await(); + // Make sure that the server interceptors were notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Server to cancel before reading/writing any requests/responses on the stream +TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2}; + test.Await(); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Server to cancel while reading/writing requests/responses on the stream in +// parallel +TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10}; + test.Await(); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Server to cancel after reading/writing all requests/responses on the stream +// but before returning to the client +TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5}; + test.Await(); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) { MAYBE_SKIP_TEST; ResetStub(); class Client : public grpc::experimental::ClientBidiReactor { public: - explicit Client(grpc::testing::EchoTestService::Stub* stub) { - request_.set_message("Hello fren "); + Client(grpc::testing::EchoTestService::Stub* stub) { + request_.set_message("Hello bidi "); stub->experimental_async()->BidiStream(&context_, this); - StartCall(); - StartRead(&response_); StartWrite(&request_); + StartCall(); } void OnReadDone(bool ok) override { - if (!ok) { - EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend); - } else { - EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); - EXPECT_EQ(response_.message(), request_.message()); - reads_complete_++; - StartRead(&response_); - } + EXPECT_TRUE(ok); + EXPECT_EQ(response_.message(), request_.message()); } void OnWriteDone(bool ok) override { EXPECT_TRUE(ok); - if (++writes_complete_ == kServerDefaultResponseStreamsToSend) { - StartWritesDone(); - } else { - StartWrite(&request_); - } + // Now send out the simultaneous Read and WritesDone + StartWritesDone(); + StartRead(&response_); } void OnDone(const Status& s) override { EXPECT_TRUE(s.ok()); + EXPECT_EQ(response_.message(), request_.message()); std::unique_lock l(mu_); done_ = true; cv_.notify_one(); @@ -564,8 +1076,6 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) { EchoRequest request_; EchoResponse response_; ClientContext context_; - int reads_complete_{0}; - int writes_complete_{0}; std::mutex mu_; std::condition_variable cv_; bool done_ = false; @@ -574,13 +1084,42 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) { test.Await(); } -TestScenario scenarios[]{{false, Protocol::INPROC}, - {false, Protocol::TCP}, - {true, Protocol::INPROC}, - {true, Protocol::TCP}}; +std::vector CreateTestScenarios(bool test_insecure) { + std::vector scenarios; + std::vector credentials_types{ + GetCredentialsProvider()->GetSecureCredentialsTypeList()}; + auto insec_ok = [] { + // Only allow insecure credentials type when it is registered with the + // provider. User may create providers that do not have insecure. + return GetCredentialsProvider()->GetChannelCredentials( + kInsecureCredentialsType, nullptr) != nullptr; + }; + if (test_insecure && insec_ok()) { + credentials_types.push_back(kInsecureCredentialsType); + } + GPR_ASSERT(!credentials_types.empty()); + + bool barr[]{false, true}; + Protocol parr[]{Protocol::INPROC, Protocol::TCP}; + for (Protocol p : parr) { + for (const auto& cred : credentials_types) { + // TODO(vjpai): Test inproc with secure credentials when feasible + if (p == Protocol::INPROC && + (cred != kInsecureCredentialsType || !insec_ok())) { + continue; + } + for (bool callback_server : barr) { + for (bool use_interceptors : barr) { + scenarios.emplace_back(callback_server, p, use_interceptors, cred); + } + } + } + } + return scenarios; +} INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, - ::testing::ValuesIn(scenarios)); + ::testing::ValuesIn(CreateTestScenarios(true))); } // namespace } // namespace testing diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 8c2df1acc33..baebdbc8091 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -583,7 +583,10 @@ CallbackTestServiceImpl::RequestStream() { StartRead(&request_); } void OnDone() override { delete this; } - void OnCancel() override { FinishOnce(Status::CANCELLED); } + void OnCancel() override { + EXPECT_TRUE(ctx_->IsCancelled()); + FinishOnce(Status::CANCELLED); + } void OnReadDone(bool ok) override { if (ok) { response_->mutable_message()->append(request_.message()); @@ -666,7 +669,10 @@ CallbackTestServiceImpl::ResponseStream() { } } void OnDone() override { delete this; } - void OnCancel() override { FinishOnce(Status::CANCELLED); } + void OnCancel() override { + EXPECT_TRUE(ctx_->IsCancelled()); + FinishOnce(Status::CANCELLED); + } void OnWriteDone(bool ok) override { if (num_msgs_sent_ < server_responses_to_send_) { NextWrite(); @@ -749,7 +755,10 @@ CallbackTestServiceImpl::BidiStream() { StartRead(&request_); } void OnDone() override { delete this; } - void OnCancel() override { FinishOnce(Status::CANCELLED); } + void OnCancel() override { + EXPECT_TRUE(ctx_->IsCancelled()); + FinishOnce(Status::CANCELLED); + } void OnReadDone(bool ok) override { if (ok) { num_msgs_read_++; diff --git a/test/cpp/interop/metrics_client.cc b/test/cpp/interop/metrics_client.cc index 02cd5643355..dca8e07b96c 100644 --- a/test/cpp/interop/metrics_client.cc +++ b/test/cpp/interop/metrics_client.cc @@ -88,7 +88,7 @@ bool PrintMetrics(std::unique_ptr stub, bool total_only, int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); - // The output of metrics client is in some cases programatically parsed (for + // The output of metrics client is in some cases programmatically parsed (for // example by the stress test framework). So, we do not want any of the log // from the grpc library appearing on stdout. gpr_set_log_function(BlackholeLogger); diff --git a/test/cpp/microbenchmarks/helpers.cc b/test/cpp/microbenchmarks/helpers.cc index bce72985dc2..d4070de7481 100644 --- a/test/cpp/microbenchmarks/helpers.cc +++ b/test/cpp/microbenchmarks/helpers.cc @@ -20,6 +20,17 @@ #include "test/cpp/microbenchmarks/helpers.h" +static grpc::internal::GrpcLibraryInitializer g_gli_initializer; + +Library::Library() { + g_gli_initializer.summon(); +#ifdef GPR_LOW_LEVEL_COUNTERS + grpc_memory_counters_init(); +#endif + init_lib_.init(); + rq_ = grpc_resource_quota_create("bm"); +} + void TrackCounters::Finish(benchmark::State& state) { std::ostringstream out; for (const auto& l : labels_) { diff --git a/test/cpp/microbenchmarks/helpers.h b/test/cpp/microbenchmarks/helpers.h index 25d34b5f871..770966aa189 100644 --- a/test/cpp/microbenchmarks/helpers.h +++ b/test/cpp/microbenchmarks/helpers.h @@ -39,13 +39,7 @@ class Library { grpc_resource_quota* rq() { return rq_; } private: - Library() { -#ifdef GPR_LOW_LEVEL_COUNTERS - grpc_memory_counters_init(); -#endif - init_lib_.init(); - rq_ = grpc_resource_quota_create("bm"); - } + Library(); ~Library() { init_lib_.shutdown(); } diff --git a/test/cpp/naming/address_sorting_test.cc b/test/cpp/naming/address_sorting_test.cc index 09e705df789..db784a6476a 100644 --- a/test/cpp/naming/address_sorting_test.cc +++ b/test/cpp/naming/address_sorting_test.cc @@ -790,7 +790,7 @@ TEST_F(AddressSortingTest, TestPrefersIpv6LoopbackInputsFlipped) { /* Try to rule out false positives in the above two tests in which * the sorter might think that neither ipv6 or ipv4 loopback is * available, but ipv6 loopback is still preferred only due - * to precedance table lookups. */ + * to precedence table lookups. */ TEST_F(AddressSortingTest, TestSorterKnowsIpv6LoopbackIsAvailable) { sockaddr_in6 ipv6_loopback; memset(&ipv6_loopback, 0, sizeof(ipv6_loopback)); diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 081c8facda2..95b6ae65008 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -3300,7 +3300,8 @@ "language": "c++", "name": "client_callback_end2end_test", "src": [ - "test/cpp/end2end/client_callback_end2end_test.cc" + "test/cpp/end2end/client_callback_end2end_test.cc", + "test/cpp/end2end/interceptors_util.cc" ], "third_party": false, "type": "target"