From 6bf7782ee12925b7baeb722082568999fc98fa7a Mon Sep 17 00:00:00 2001 From: Matt Kwong Date: Wed, 17 May 2017 16:51:50 -0700 Subject: [PATCH 01/14] Add Python GCP API library installation to Alpine Dockerfiles --- tools/dockerfile/test/cxx_alpine_x64/Dockerfile | 3 +++ tools/dockerfile/test/python_alpine_x64/Dockerfile | 3 +++ 2 files changed, 6 insertions(+) diff --git a/tools/dockerfile/test/cxx_alpine_x64/Dockerfile b/tools/dockerfile/test/cxx_alpine_x64/Dockerfile index b13157f2807..fc62d230fb1 100644 --- a/tools/dockerfile/test/cxx_alpine_x64/Dockerfile +++ b/tools/dockerfile/test/cxx_alpine_x64/Dockerfile @@ -55,6 +55,9 @@ RUN pip install pip --upgrade RUN pip install virtualenv RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.2.0 six==1.10.0 +# Google Cloud platform API libraries +RUN pip install --upgrade google-api-python-client + # Prepare ccache RUN ln -s /usr/bin/ccache /usr/local/bin/gcc RUN ln -s /usr/bin/ccache /usr/local/bin/g++ diff --git a/tools/dockerfile/test/python_alpine_x64/Dockerfile b/tools/dockerfile/test/python_alpine_x64/Dockerfile index bdffbd35982..5d25ab0ebed 100644 --- a/tools/dockerfile/test/python_alpine_x64/Dockerfile +++ b/tools/dockerfile/test/python_alpine_x64/Dockerfile @@ -55,6 +55,9 @@ RUN pip install pip --upgrade RUN pip install virtualenv RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.2.0 six==1.10.0 +# Google Cloud platform API libraries +RUN pip install --upgrade google-api-python-client + # Prepare ccache RUN ln -s /usr/bin/ccache /usr/local/bin/gcc RUN ln -s /usr/bin/ccache /usr/local/bin/g++ From 28b082824353eab3912b9c4f949f0c426ea9f702 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Mon, 22 May 2017 10:04:01 -0700 Subject: [PATCH 02/14] bm trickly bugfix --- test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 16 ++++++++-------- tools/profiling/microbenchmarks/bm_json.py | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index d7e3a9cf47d..a087bfe8996 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -320,8 +320,8 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { } static void StreamingTrickleArgs(benchmark::internal::Benchmark* b) { - for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) { - for (int j = 64; j <= 128 * 1024 * 1024; j *= 8) { + for (int i = 1; i <= 128 * 1024 * 1024; i *= 16) { + for (int j = 64; j <= 128 * 1024 * 1024; j *= 16) { double expected_time = static_cast(14 + i) / (125.0 * static_cast(j)); if (expected_time > 2.0) continue; @@ -425,12 +425,12 @@ static void UnaryTrickleArgs(benchmark::internal::Benchmark* b) { const int svr_4M = 4 * 1024 * 1024; const int svr_64M = 64 * 1024 * 1024; for (int bw = 64; bw <= 128 * 1024 * 1024; bw *= 16) { - b->Args({bw, cli_1024k, svr_256k}); - b->Args({bw, cli_1024k, svr_4M}); - b->Args({bw, cli_1024k, svr_64M}); - b->Args({bw, cli_32M, svr_256k}); - b->Args({bw, cli_32M, svr_4M}); - b->Args({bw, cli_32M, svr_64M}); + b->Args({cli_1024k, svr_256k, bw}); + b->Args({cli_1024k, svr_4M, bw}); + b->Args({cli_1024k, svr_64M, bw}); + b->Args({cli_32M, svr_256k, bw}); + b->Args({cli_32M, svr_4M, bw}); + b->Args({cli_32M, svr_64M, bw}); } } BENCHMARK(BM_PumpUnbalancedUnary_Trickle)->Apply(UnaryTrickleArgs); diff --git a/tools/profiling/microbenchmarks/bm_json.py b/tools/profiling/microbenchmarks/bm_json.py index f4d628e11f0..49a37072208 100644 --- a/tools/profiling/microbenchmarks/bm_json.py +++ b/tools/profiling/microbenchmarks/bm_json.py @@ -56,7 +56,7 @@ _BM_SPECS = { }, 'BM_PumpUnbalancedUnary_Trickle': { 'tpl': [], - 'dyn': ['request_size', 'bandwidth_kilobits'], + 'dyn': ['cli_req_size', 'svr_req_size', 'bandwidth_kilobits'], }, 'BM_ErrorStringOnNewError': { 'tpl': ['fixture'], From a4cd06fc212d218255bd083b47d733eebd0b1c77 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 26 May 2017 11:06:10 -0400 Subject: [PATCH 03/14] Fixes --- test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index a087bfe8996..702a14d14ea 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -320,8 +320,8 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { } static void StreamingTrickleArgs(benchmark::internal::Benchmark* b) { - for (int i = 1; i <= 128 * 1024 * 1024; i *= 16) { - for (int j = 64; j <= 128 * 1024 * 1024; j *= 16) { + for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) { + for (int j = 64; j <= 128 * 1024 * 1024; j *= 8) { double expected_time = static_cast(14 + i) / (125.0 * static_cast(j)); if (expected_time > 2.0) continue; @@ -419,18 +419,18 @@ static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) { } static void UnaryTrickleArgs(benchmark::internal::Benchmark* b) { + // A selection of interesting numbers const int cli_1024k = 1024 * 1024; const int cli_32M = 32 * 1024 * 1024; const int svr_256k = 256 * 1024; const int svr_4M = 4 * 1024 * 1024; const int svr_64M = 64 * 1024 * 1024; for (int bw = 64; bw <= 128 * 1024 * 1024; bw *= 16) { - b->Args({cli_1024k, svr_256k, bw}); - b->Args({cli_1024k, svr_4M, bw}); - b->Args({cli_1024k, svr_64M, bw}); - b->Args({cli_32M, svr_256k, bw}); - b->Args({cli_32M, svr_4M, bw}); - b->Args({cli_32M, svr_64M, bw}); + for (auto svr : {svr_256k, svr_4M, svr_64M}) { + for (auto cli: {cli_1024k, cli_32M}) { + b->Args({cli, svr, bw}); + } + } } } BENCHMARK(BM_PumpUnbalancedUnary_Trickle)->Apply(UnaryTrickleArgs); From 3e87909a8a3d17140598103a9dde99c55120fa4f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 26 May 2017 16:45:25 -0700 Subject: [PATCH 04/14] Fix code --- src/core/lib/surface/completion_queue.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 42e2e727102..0aefed1f9ea 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -587,7 +587,8 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, /* Only kick if this is the first item queued */ if (is_first) { gpr_mu_lock(cqd->mu); - grpc_error *kick_error = cq->poller_vtable->kick(POLLSET_FROM_CQ(cq); + grpc_error *kick_error = + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); gpr_mu_unlock(cqd->mu); if (kick_error != GRPC_ERROR_NONE) { From c019572f1add135d16789015d8e6cb4e247d8247 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 31 May 2017 09:51:33 -0700 Subject: [PATCH 05/14] Add missing lock --- src/core/lib/surface/completion_queue.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 9fd7d5fbc7e..bfce9d274ce 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -598,6 +598,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, } } } else { + gpr_mu_lock(cqd->mu); cq_finish_shutdown(exec_ctx, cq); gpr_mu_unlock(cqd->mu); } From d8543c4e0f4ff827337654e198d41ff5539a2b9e Mon Sep 17 00:00:00 2001 From: Matt Kwong Date: Wed, 17 May 2017 18:04:58 -0700 Subject: [PATCH 06/14] Fix --measure_cpu_costs flag in run_tests.py on Windows --- tools/run_tests/python_utils/jobset.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py index 37540353088..b4f7557a35f 100755 --- a/tools/run_tests/python_utils/jobset.py +++ b/tools/run_tests/python_utils/jobset.py @@ -270,8 +270,13 @@ class Job(object): env = sanitized_environment(env) self._start = time.time() cmdline = self._spec.cmdline - if measure_cpu_costs: + # The Unix time command is finicky when used with MSBuild, so we don't use it + # with jobs that run MSBuild. + global measure_cpu_costs + if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]: cmdline = ['time', '-p'] + cmdline + else: + measure_cpu_costs = False try_start = lambda: subprocess.Popen(args=cmdline, stderr=subprocess.STDOUT, stdout=self._tempfile, From 0315de23cc14072b2c202129892f8b13a12355e0 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 31 May 2017 17:58:49 -0700 Subject: [PATCH 07/14] add sanity to internal_ci --- tools/internal_ci/linux/grpc_sanity.cfg | 39 +++++++++++++++++++++++++ tools/internal_ci/linux/grpc_sanity.sh | 2 +- 2 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 tools/internal_ci/linux/grpc_sanity.cfg diff --git a/tools/internal_ci/linux/grpc_sanity.cfg b/tools/internal_ci/linux/grpc_sanity.cfg new file mode 100644 index 00000000000..e2f320494b4 --- /dev/null +++ b/tools/internal_ci/linux/grpc_sanity.cfg @@ -0,0 +1,39 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/linux/grpc_sanity.sh" +timeout_mins: 20 +action { + define_artifacts { + regex: "**/*sponge_log.xml" + } +} diff --git a/tools/internal_ci/linux/grpc_sanity.sh b/tools/internal_ci/linux/grpc_sanity.sh index 7166ce7d248..432a42c4493 100755 --- a/tools/internal_ci/linux/grpc_sanity.sh +++ b/tools/internal_ci/linux/grpc_sanity.sh @@ -35,4 +35,4 @@ cd $(dirname $0)/../../.. source tools/internal_ci/helper_scripts/prepare_build_linux_rc -tools/run_tests/run_tests.py -l sanity -c opt -t -x sponge_log.xml --use_docker --report_suite_name sanity_linux_opt +tools/run_tests/run_tests_matrix.py -f basictests linux sanity opt --inner_jobs 16 -j 1 --internal_ci From 3972fad38cbe06aef040f6d2c010397c0e950813 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 31 May 2017 18:47:49 -0700 Subject: [PATCH 08/14] clang format --- test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 702a14d14ea..9f616fe152f 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -427,7 +427,7 @@ static void UnaryTrickleArgs(benchmark::internal::Benchmark* b) { const int svr_64M = 64 * 1024 * 1024; for (int bw = 64; bw <= 128 * 1024 * 1024; bw *= 16) { for (auto svr : {svr_256k, svr_4M, svr_64M}) { - for (auto cli: {cli_1024k, cli_32M}) { + for (auto cli : {cli_1024k, cli_32M}) { b->Args({cli, svr, bw}); } } From df8b5eacf091e77d6396b6ee93bbbd443db8ea05 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 31 May 2017 16:11:20 -0700 Subject: [PATCH 09/14] Update comment formatting --- include/grpc++/grpc++.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/grpc++/grpc++.h b/include/grpc++/grpc++.h index d6c3e2e7683..a4f512dbee9 100644 --- a/include/grpc++/grpc++.h +++ b/include/grpc++/grpc++.h @@ -34,18 +34,18 @@ /// \mainpage gRPC C++ API /// /// The gRPC C++ API mainly consists of the following classes: -// +///
/// - grpc::Channel, which represents the connection to an endpoint. See [the /// gRPC Concepts page](http://www.grpc.io/docs/guides/concepts.html) for more /// details. Channels are created by the factory function grpc::CreateChannel. -// +/// /// - grpc::CompletionQueue, the producer-consumer queue used for all /// asynchronous communication with the gRPC runtime. -// +/// /// - grpc::ClientContext and grpc::ServerContext, where optional configuration /// for an RPC can be set, such as setting custom metadata to be conveyed to the /// peer, compression settings, authentication, etc. -// +/// /// - grpc::Server, representing a gRPC server, created by grpc::ServerBuilder. /// /// Streaming calls are handled with the streaming classes in From dd1143dab060c7a4775429569075876dcf6f4503 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 1 Jun 2017 20:58:36 +0200 Subject: [PATCH 10/14] install ruby on internal_ci linux --- tools/internal_ci/linux/grpc_build_artifacts.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/internal_ci/linux/grpc_build_artifacts.sh b/tools/internal_ci/linux/grpc_build_artifacts.sh index 1b12be143e9..16c2a748a60 100755 --- a/tools/internal_ci/linux/grpc_build_artifacts.sh +++ b/tools/internal_ci/linux/grpc_build_artifacts.sh @@ -35,4 +35,8 @@ cd $(dirname $0)/../../.. source tools/internal_ci/helper_scripts/prepare_build_linux_rc +# TODO(jtattermusch): install ruby on the internal_ci worker +gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 +curl -sSL https://get.rvm.io | bash -s stable --ruby + tools/run_tests/task_runner.py -f artifact linux From be290851c0e498832545f2e366fbc529147b21bc Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 1 Jun 2017 12:42:09 -0700 Subject: [PATCH 11/14] Split cq_data into cq_next_data, cq_pluck_data --- src/core/lib/surface/completion_queue.c | 382 ++++++++++++++---------- src/core/lib/surface/completion_queue.h | 3 - src/core/lib/surface/server.c | 5 +- 3 files changed, 217 insertions(+), 173 deletions(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index bfce9d274ce..769a700b666 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -203,7 +203,10 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; - size_t (*size)(); + size_t data_size; + void (*init)(void *data); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); + void (*destroy)(void *data); void (*begin_op)(grpc_completion_queue *cq, void *tag); void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, @@ -232,25 +235,30 @@ typedef struct grpc_cq_event_queue { gpr_atm num_queue_items; } grpc_cq_event_queue; -/* TODO: sreek Refactor this based on the completion_type. Put completion-type - * specific data in a different structure (and co-allocate memory for it along - * with completion queue + pollset )*/ -typedef struct cq_data { - gpr_mu *mu; +typedef struct cq_next_data { + /** Completed events for completion-queues of type GRPC_CQ_NEXT */ + grpc_cq_event_queue queue; + + /** Number of pending events (+1 if we're not shutdown) */ + gpr_refcount pending_events; + + /** Counter of how many things have ever been queued on this completion queue + useful for avoiding locks to check the queue */ + gpr_atm things_queued_ever; + + /** 0 initially, 1 once we've begun shutting down */ + gpr_atm shutdown; + int shutdown_called; +} cq_next_data; +typedef struct cq_pluck_data { /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; - /** Completed events for completion-queues of type GRPC_CQ_NEXT */ - grpc_cq_event_queue queue; - /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; - /** Once owning_refs drops to zero, we will destroy the cq */ - gpr_refcount owning_refs; - /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; @@ -259,34 +267,42 @@ typedef struct cq_data { gpr_atm shutdown; int shutdown_called; - int is_server_cq; - int num_pluckers; - int num_polls; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; - grpc_closure pollset_shutdown_done; +} cq_pluck_data; + +/* Completion queue structure */ +struct grpc_completion_queue { + /** Once owning_refs drops to zero, we will destroy the cq */ + gpr_refcount owning_refs; + + gpr_mu *mu; + + const cq_vtable *vtable; + const cq_poller_vtable *poller_vtable; #ifndef NDEBUG void **outstanding_tags; size_t outstanding_tag_count; size_t outstanding_tag_capacity; #endif -} cq_data; -/* Completion queue structure */ -struct grpc_completion_queue { - cq_data data; - const cq_vtable *vtable; - const cq_poller_vtable *poller_vtable; + grpc_closure pollset_shutdown_done; + int num_polls; }; /* Forward declarations */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq); - -static size_t cq_size(grpc_completion_queue *cq); - -static void cq_begin_op(grpc_completion_queue *cq, void *tag); +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); + +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, @@ -310,26 +326,36 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); +static void cq_init_next(void *data); +static void cq_init_pluck(void *data); +static void cq_destroy_next(void *data); +static void cq_destroy_pluck(void *data); + /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ {.cq_completion_type = GRPC_CQ_NEXT, - .size = cq_size, - .begin_op = cq_begin_op, + .init = cq_init_next, + .shutdown = cq_shutdown_next, + .destroy = cq_destroy_next, + .begin_op = cq_begin_op_for_next, .end_op = cq_end_op_for_next, .next = cq_next, .pluck = NULL}, /* GRPC_CQ_PLUCK */ {.cq_completion_type = GRPC_CQ_PLUCK, - .size = cq_size, - .begin_op = cq_begin_op, + .init = cq_init_pluck, + .shutdown = cq_shutdown_pluck, + .destroy = cq_destroy_pluck, + .begin_op = cq_begin_op_for_pluck, .end_op = cq_end_op_for_pluck, .next = NULL, .pluck = cq_pluck}, }; -#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) -#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) +#define DATA_FROM_CQ(cq) ((void *)(cq + 1)) +#define POLLSET_FROM_CQ(cq) \ + ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq))) grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true); grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); @@ -381,12 +407,6 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) { return (long)gpr_atm_no_barrier_load(&q->num_queue_items); } -static size_t cq_size(grpc_completion_queue *cq) { - /* Size of the completion queue and the size of the pollset whose memory is - allocated right after that of completion queue */ - return sizeof(grpc_completion_queue) + cq->poller_vtable->size(); -} - grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { @@ -403,41 +423,57 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - cq = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); - cq_data *cqd = &cq->data; + cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + + poller_vtable->size()); cq->vtable = vtable; cq->poller_vtable = poller_vtable; - poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->data.mu); + /* One for destroy(), one for pollset_shutdown */ + gpr_ref_init(&cq->owning_refs, 2); -#ifndef NDEBUG - cqd->outstanding_tags = NULL; - cqd->outstanding_tag_capacity = 0; -#endif + poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); + vtable->init(DATA_FROM_CQ(cq)); + + grpc_closure_init(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, + grpc_schedule_on_exec_ctx); + GPR_TIMER_END("grpc_completion_queue_create_internal", 0); + + return cq; +} + +static void cq_init_next(void *ptr) { + cq_next_data *cqd = ptr; + /* Initial ref is dropped by grpc_completion_queue_shutdown */ + gpr_ref_init(&cqd->pending_events, 1); + gpr_atm_no_barrier_store(&cqd->shutdown, 0); + cqd->shutdown_called = 0; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); + cq_event_queue_init(&cqd->queue); +} + +static void cq_destroy_next(void *ptr) { + cq_next_data *cqd = ptr; + GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); + cq_event_queue_destroy(&cqd->queue); +} + +static void cq_init_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cqd->pending_events, 1); - /* One for destroy(), one for pollset_shutdown */ - gpr_ref_init(&cqd->owning_refs, 2); cqd->completed_tail = &cqd->completed_head; cqd->completed_head.next = (uintptr_t)cqd->completed_tail; gpr_atm_no_barrier_store(&cqd->shutdown, 0); cqd->shutdown_called = 0; - cqd->is_server_cq = 0; cqd->num_pluckers = 0; - cqd->num_polls = 0; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); -#ifndef NDEBUG - cqd->outstanding_tag_count = 0; -#endif - cq_event_queue_init(&cqd->queue); - grpc_closure_init(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cq, - grpc_schedule_on_exec_ctx); - - GPR_TIMER_END("grpc_completion_queue_create_internal", 0); +} - return cq; +static void cq_destroy_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; + GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); } grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) { @@ -446,23 +482,21 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) { int grpc_get_cq_poll_num(grpc_completion_queue *cq) { int cur_num_polls; - gpr_mu_lock(cq->data.mu); - cur_num_polls = cq->data.num_polls; - gpr_mu_unlock(cq->data.mu); + gpr_mu_lock(cq->mu); + cur_num_polls = cq->num_polls; + gpr_mu_unlock(cq->mu); return cur_num_polls; } #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cq->data; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cq, - (int)cqd->owning_refs.count, (int)cqd->owning_refs.count + 1, reason); + (int)cq->owning_refs.count, (int)cq->owning_refs.count + 1, reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cq) { - cq_data *cqd = &cq->data; #endif - gpr_ref(&cqd->owning_refs); + gpr_ref(&cq->owning_refs); } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, @@ -480,61 +514,63 @@ void grpc_cq_internal_unref(grpc_completion_queue *cq, const char *reason, #else void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_data *cqd = &cq->data; #endif - if (gpr_unref(&cqd->owning_refs)) { - GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); + if (gpr_unref(&cq->owning_refs)) { + cq->vtable->destroy(DATA_FROM_CQ(cq)); cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq)); - cq_event_queue_destroy(&cqd->queue); #ifndef NDEBUG - gpr_free(cqd->outstanding_tags); + gpr_free(cq->outstanding_tags); #endif gpr_free(cq); } } -static void cq_begin_op(grpc_completion_queue *cq, void *tag) { - cq_data *cqd = &cq->data; -#ifndef NDEBUG - gpr_mu_lock(cqd->mu); +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + GPR_ASSERT(!cqd->shutdown_called); + gpr_ref(&cqd->pending_events); +} + +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); - if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) { - cqd->outstanding_tag_capacity = - GPR_MAX(4, 2 * cqd->outstanding_tag_capacity); - cqd->outstanding_tags = - gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) * - cqd->outstanding_tag_capacity); - } - cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag; - gpr_mu_unlock(cqd->mu); -#endif gpr_ref(&cqd->pending_events); } void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif cq->vtable->begin_op(cq, tag); } #ifndef NDEBUG static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { - cq_data *cqd = &cq->data; int found = 0; if (lock_cq) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); } - for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) { - if (cqd->outstanding_tags[i] == tag) { - cqd->outstanding_tag_count--; - GPR_SWAP(void *, cqd->outstanding_tags[i], - cqd->outstanding_tags[cqd->outstanding_tag_count]); + for (int i = 0; i < (int)cq->outstanding_tag_count; i++) { + if (cq->outstanding_tags[i] == tag) { + cq->outstanding_tag_count--; + GPR_SWAP(void *, cq->outstanding_tags[i], + cq->outstanding_tags[cq->outstanding_tag_count]); found = 1; break; } } if (lock_cq) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } GPR_ASSERT(found); @@ -568,7 +604,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, } } - cq_data *cqd = &cq->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); storage->tag = tag; @@ -586,10 +622,10 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, if (!shutdown) { /* Only kick if this is the first item queued */ if (is_first) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); grpc_error *kick_error = cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -598,9 +634,9 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, } } } else { - gpr_mu_lock(cqd->mu); - cq_finish_shutdown(exec_ctx, cq); - gpr_mu_unlock(cqd->mu); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); } GPR_TIMER_END("cq_end_op_for_next", 0); @@ -617,7 +653,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); @@ -641,7 +677,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); cq_check_tag(cq, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ @@ -663,7 +699,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, grpc_error *kick_error = cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -672,8 +708,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } else { - cq_finish_shutdown(exec_ctx, cq); - gpr_mu_unlock(cqd->mu); + cq_finish_shutdown_pluck(exec_ctx, cq); + gpr_mu_unlock(cq->mu); } GPR_TIMER_END("cq_end_op_for_pluck", 0); @@ -701,7 +737,7 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -728,18 +764,16 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { static void dump_pending_tags(grpc_completion_queue *cq) { if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return; - cq_data *cqd = &cq->data; - gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); - gpr_mu_lock(cqd->mu); - for (size_t i = 0; i < cqd->outstanding_tag_count; i++) { + gpr_mu_lock(cq->mu); + for (size_t i = 0; i < cq->outstanding_tag_count; i++) { char *s; - gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]); + gpr_asprintf(&s, " %p", cq->outstanding_tags[i]); gpr_strvec_add(&v, s); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); @@ -753,7 +787,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; gpr_timespec now; - cq_data *cqd = &cq->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -842,11 +876,11 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, } /* The main polling work happens in grpc_pollset_work */ - gpr_mu_lock(cqd->mu); - cqd->num_polls++; + gpr_mu_lock(cq->mu); + cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), NULL, now, iteration_deadline); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); @@ -868,9 +902,9 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, if (cq_event_queue_num_items(&cqd->queue) > 0 && gpr_atm_no_barrier_load(&cqd->shutdown) == 0) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } GPR_TIMER_END("grpc_completion_queue_next", 0); @@ -885,7 +919,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -897,7 +931,7 @@ static int add_plucker(grpc_completion_queue *cq, void *tag, static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { cqd->num_pluckers--; @@ -911,13 +945,13 @@ static void del_plucker(grpc_completion_queue *cq, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); grpc_cq_completion *c; @@ -929,13 +963,13 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); a->stolen_completion = c; return true; } prev = c; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; @@ -948,7 +982,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -969,7 +1003,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cq, "pluck"); - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), @@ -982,7 +1016,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -999,7 +1033,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -1009,7 +1043,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, prev = c; } if (gpr_atm_no_barrier_load(&cqd->shutdown)) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -1019,7 +1053,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; @@ -1029,19 +1063,19 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cq, tag, &worker); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); break; } - cqd->num_polls++; + cq->num_polls++; grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), &worker, now, deadline); if (err != GRPC_ERROR_NONE) { del_plucker(cq, tag, &worker); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); @@ -1076,37 +1110,71 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, - Must be called only once in completion queue's lifetime - grpc_completion_queue_shutdown() MUST have been called before calling this function */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { - cq_data *cqd = &cq->data; +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); gpr_atm_no_barrier_store(&cqd->shutdown, 1); cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), - &cqd->pollset_shutdown_done); + &cq->pollset_shutdown_done); } -/* Shutdown simply drops a ref that we reserved at creation time; if we drop - to zero here, then enter shutdown mode and wake up any waiters */ -void grpc_completion_queue_shutdown(grpc_completion_queue *cq) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); - GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); - cq_data *cqd = &cq->data; +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); + gpr_atm_no_barrier_store(&cqd->shutdown, 1); + + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); +} + +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cqd->shutdown_called = 1; if (gpr_unref(&cqd->pending_events)) { - cq_finish_shutdown(&exec_ctx, cq); + cq_finish_shutdown_next(exec_ctx, cq); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); +} + +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); + + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + return; + } + cqd->shutdown_called = 1; + if (gpr_unref(&cqd->pending_events)) { + cq_finish_shutdown_pluck(exec_ctx, cq); + } + gpr_mu_unlock(cq->mu); +} + +/* Shutdown simply drops a ref that we reserved at creation time; if we drop + to zero here, then enter shutdown mode and wake up any waiters */ +void grpc_completion_queue_shutdown(grpc_completion_queue *cq) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); + GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); + cq->vtable->shutdown(&exec_ctx, cq); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -1116,12 +1184,6 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cq) { GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cq); - /* TODO (sreek): This should not ideally be here. Refactor it into the - * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ - if (cq->vtable->cq_completion_type == GRPC_CQ_NEXT) { - GPR_ASSERT(cq_event_queue_num_items(&cq->data.queue) == 0); - } - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy"); grpc_exec_ctx_finish(&exec_ctx); @@ -1132,18 +1194,6 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) { return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL; } -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { - return CQ_FROM_POLLSET(ps); -} - -void grpc_cq_mark_server_cq(grpc_completion_queue *cq) { - cq->data.is_server_cq = 1; -} - -bool grpc_cq_is_server_cq(grpc_completion_queue *cq) { - return cq->data.is_server_cq; -} - bool grpc_cq_can_listen(grpc_completion_queue *cq) { return cq->poller_vtable->can_listen; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 7963ea75e77..93c629c4a4a 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -99,10 +99,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_server_cq(grpc_completion_queue *cc); bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 7e4ae421a04..a64d043367f 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -964,8 +964,6 @@ static void register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } - grpc_cq_mark_server_cq(cq); - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -1129,9 +1127,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, chand->channel = channel; size_t cq_idx; - grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { - if (s->cqs[cq_idx] == accepting_cq) break; + if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break; } if (cq_idx == s->cq_count) { /* completion queue not found: pick a random one to publish new calls to */ From 38338e9922ce1a819a0a977914fa402780b2213f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 1 Jun 2017 12:50:12 -0700 Subject: [PATCH 12/14] Publish struct size --- src/core/lib/surface/completion_queue.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 769a700b666..3d3da15333f 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -334,7 +334,8 @@ static void cq_destroy_pluck(void *data); /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ - {.cq_completion_type = GRPC_CQ_NEXT, + {.data_size = sizeof(cq_next_data), + .cq_completion_type = GRPC_CQ_NEXT, .init = cq_init_next, .shutdown = cq_shutdown_next, .destroy = cq_destroy_next, @@ -343,7 +344,8 @@ static const cq_vtable g_cq_vtable[] = { .next = cq_next, .pluck = NULL}, /* GRPC_CQ_PLUCK */ - {.cq_completion_type = GRPC_CQ_PLUCK, + {.data_size = sizeof(cq_pluck_data), + .cq_completion_type = GRPC_CQ_PLUCK, .init = cq_init_pluck, .shutdown = cq_shutdown_pluck, .destroy = cq_destroy_pluck, From 5bc11d55d44a58e0139b62a3110bdf43ef7e4f95 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 1 Jun 2017 11:23:21 -0700 Subject: [PATCH 13/14] Expand pylint to grpc_health and grpc_reflection --- .../grpc_reflection/v1alpha/reflection.py | 5 ++--- tools/distrib/pylint_code.sh | 14 +++++++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py index 0f399f8f8d9..1a7d3259df0 100644 --- a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py +++ b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py @@ -28,8 +28,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """Reference implementation for reflection in gRPC Python.""" -import threading - import grpc from google.protobuf import descriptor_pb2 from google.protobuf import descriptor_pool @@ -120,6 +118,7 @@ class ReflectionServicer(reflection_pb2_grpc.ServerReflectionServicer): ])) def ServerReflectionInfo(self, request_iterator, context): + # pylint: disable=unused-argument for request in request_iterator: if request.HasField('file_by_filename'): yield self._file_by_filename(request.file_by_filename) @@ -152,4 +151,4 @@ def enable_server_reflection(service_names, server, pool=None): pool: DescriptorPool object to use (descriptor_pool.Default() if None). """ reflection_pb2_grpc.add_ServerReflectionServicer_to_server( - ReflectionServicer(service_names, pool), server) + ReflectionServicer(service_names, pool=pool), server) diff --git a/tools/distrib/pylint_code.sh b/tools/distrib/pylint_code.sh index 6369e605d53..b1e305c56d0 100755 --- a/tools/distrib/pylint_code.sh +++ b/tools/distrib/pylint_code.sh @@ -31,18 +31,22 @@ set -ex # change to root directory -cd $(dirname $0)/../.. +cd "$(dirname "$0")/../.." -DIRS=src/python/grpcio/grpc +DIRS=( + 'src/python/grpcio/grpc' + 'src/python/grpcio_reflection/grpc_reflection' + 'src/python/grpcio_health_checking/grpc_health' +) VIRTUALENV=python_pylint_venv virtualenv $VIRTUALENV -PYTHON=`realpath $VIRTUALENV/bin/python` +PYTHON=$(realpath $VIRTUALENV/bin/python) $PYTHON -m pip install pylint==1.6.5 -for dir in $DIRS; do - $PYTHON -m pylint --rcfile=.pylintrc -rn $dir || exit $? +for dir in "${DIRS[@]}"; do + $PYTHON -m pylint --rcfile=.pylintrc -rn "$dir" || exit $? done exit 0 From caf8ea984bc6dc9ccca9b6895d6233e0539aae02 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 1 Jun 2017 16:08:58 -0700 Subject: [PATCH 14/14] Fix race condition --- src/core/lib/surface/completion_queue.c | 105 +++++++++++++----------- 1 file changed, 57 insertions(+), 48 deletions(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 3d3da15333f..672ee935e4e 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -239,15 +239,13 @@ typedef struct cq_next_data { /** Completed events for completion-queues of type GRPC_CQ_NEXT */ grpc_cq_event_queue queue; - /** Number of pending events (+1 if we're not shutdown) */ - gpr_refcount pending_events; - /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; - /** 0 initially, 1 once we've begun shutting down */ - gpr_atm shutdown; + /* Number of outstanding events (+1 if not shut down) */ + gpr_atm pending_events; + int shutdown_called; } cq_next_data; @@ -448,9 +446,8 @@ grpc_completion_queue *grpc_completion_queue_create_internal( static void cq_init_next(void *ptr) { cq_next_data *cqd = ptr; /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cqd->pending_events, 1); - gpr_atm_no_barrier_store(&cqd->shutdown, 0); - cqd->shutdown_called = 0; + gpr_atm_no_barrier_store(&cqd->pending_events, 1); + cqd->shutdown_called = false; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); cq_event_queue_init(&cqd->queue); } @@ -530,7 +527,7 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); - gpr_ref(&cqd->pending_events); + gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); } static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { @@ -619,9 +616,10 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, /* Add the completion to the queue */ bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - bool shutdown = gpr_unref(&cqd->pending_events); + bool will_definitely_shutdown = + gpr_atm_no_barrier_load(&cqd->pending_events) == 1; - if (!shutdown) { + if (!will_definitely_shutdown) { /* Only kick if this is the first item queued */ if (is_first) { gpr_mu_lock(cq->mu); @@ -635,10 +633,20 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + } } else { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(exec_ctx, cq); gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); @@ -852,7 +860,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, } } - if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ @@ -903,7 +911,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, GPR_ASSERT(is_finished_arg.stolen_completion == NULL); if (cq_event_queue_num_items(&cqd->queue) > 0 && - gpr_atm_no_barrier_load(&cqd->shutdown) == 0) { + gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); gpr_mu_unlock(cq->mu); @@ -914,6 +922,42 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, return ret; } +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); + + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); +} + +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + return; + } + cqd->shutdown_called = 1; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_next(exec_ctx, cq); + } + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); +} + grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { return cq->vtable->next(cq, deadline, reserved); @@ -1106,24 +1150,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, return cq->vtable->pluck(cq, tag, deadline, reserved); } -/* Finishes the completion queue shutdown. This means that there are no more - completion events / tags expected from the completion queue - - Must be called under completion queue lock - - Must be called only once in completion queue's lifetime - - grpc_completion_queue_shutdown() MUST have been called before calling - this function */ -static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); - - GPR_ASSERT(cqd->shutdown_called); - GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); - gpr_atm_no_barrier_store(&cqd->shutdown, 1); - - cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), - &cq->pollset_shutdown_done); -} - static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { cq_pluck_data *cqd = DATA_FROM_CQ(cq); @@ -1136,23 +1162,6 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, &cq->pollset_shutdown_done); } -static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); - - gpr_mu_lock(cq->mu); - if (cqd->shutdown_called) { - gpr_mu_unlock(cq->mu); - GPR_TIMER_END("grpc_completion_queue_shutdown", 0); - return; - } - cqd->shutdown_called = 1; - if (gpr_unref(&cqd->pending_events)) { - cq_finish_shutdown_next(exec_ctx, cq); - } - gpr_mu_unlock(cq->mu); -} - static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { cq_pluck_data *cqd = DATA_FROM_CQ(cq);