From 42dab364a337666003a17d72b4dcad0c4568587a Mon Sep 17 00:00:00 2001 From: Greg Haines Date: Wed, 2 Mar 2016 15:04:41 -0800 Subject: [PATCH 01/33] Pass a non-infinite deadline to grpc_completion_queue_next() to prevent queues from blocking indefinitely in poll(). --- .../GRPCClient/private/GRPCCompletionQueue.h | 7 ++++++ .../GRPCClient/private/GRPCCompletionQueue.m | 25 +++++++++++++------ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h index fe3b8f39d12..03fd2e0d955 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h @@ -36,6 +36,8 @@ typedef void(^GRPCQueueCompletionHandler)(bool success); +extern const int64_t kGRPCCompletionQueueDefaultTimeoutSecs; + /** * This class lets one more easily use |grpc_completion_queue|. To use it, pass the value of the * |unmanagedQueue| property of an instance of this class to |grpc_channel_create_call|. Then for @@ -49,6 +51,11 @@ typedef void(^GRPCQueueCompletionHandler)(bool success); */ @interface GRPCCompletionQueue : NSObject @property(nonatomic, readonly) grpc_completion_queue *unmanagedQueue; +@property(nonatomic, readonly) int64_t timeoutSecs; + (instancetype)completionQueue; + +- (instancetype)init; +- (instancetype)initWithTimeout:(int64_t)timeoutSecs NS_DESIGNATED_INITIALIZER; + @end diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m index ea2b01ee1d7..8163236cc4a 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m @@ -35,6 +35,9 @@ #import + +const int64_t kGRPCCompletionQueueDefaultTimeoutSecs = 60; + @implementation GRPCCompletionQueue + (instancetype)completionQueue { @@ -42,8 +45,13 @@ } - (instancetype)init { + return [self initWithTimeout:kGRPCCompletionQueueDefaultTimeoutSecs]; +} + +- (instancetype)initWithTimeout:(int64_t)timeoutSecs { if ((self = [super init])) { _unmanagedQueue = grpc_completion_queue_create(NULL); + _timeoutSecs = timeoutSecs; // This is for the following block to capture the pointer by value (instead // of retaining self and doing self->_unmanagedQueue). This is essential @@ -52,6 +60,7 @@ // anymore (i.e. on self dealloc). So the block would never end if it // retained self. grpc_completion_queue *unmanagedQueue = _unmanagedQueue; + int64_t lTimeoutSecs = _timeoutSecs; // Start a loop on a concurrent queue to read events from the completion // queue and dispatch each. @@ -61,22 +70,24 @@ gDefaultConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); }); dispatch_async(gDefaultConcurrentQueue, ^{ + gpr_timespec deadline = gpr_time_from_seconds(lTimeoutSecs, GPR_CLOCK_REALTIME); while (YES) { - // The following call blocks until an event is available. - grpc_event event = grpc_completion_queue_next(unmanagedQueue, - gpr_inf_future(GPR_CLOCK_REALTIME), - NULL); + // The following call blocks until an event is available or the deadline elapses. + grpc_event event = grpc_completion_queue_next(unmanagedQueue, deadline, NULL); GRPCQueueCompletionHandler handler; switch (event.type) { - case GRPC_OP_COMPLETE: + case GRPC_OP_COMPLETE: // Falling through deliberately + case GRPC_QUEUE_TIMEOUT: handler = (__bridge_transfer GRPCQueueCompletionHandler)event.tag; - handler(event.success); + if (handler) { + handler(event.success); + } break; case GRPC_QUEUE_SHUTDOWN: grpc_completion_queue_destroy(unmanagedQueue); return; default: - [NSException raise:@"Unrecognized completion type" format:@""]; + [NSException raise:@"Unrecognized completion type" format:@"type=%d", event.type]; } }; }); From 072ebaa1537673e100b0a027d3935252da14fccb Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 1 Mar 2016 18:33:12 -0800 Subject: [PATCH 02/33] make python test suites run in parallel --- src/python/grpcio/tests/_runner.py | 11 +++- src/python/grpcio/tests/tests.json | 62 +++++++++++++++++++ .../grpcio/tests/unit/_sanity/__init__.py | 30 +++++++++ .../grpcio/tests/unit/_sanity/_sanity_test.py | 53 ++++++++++++++++ tools/run_tests/build_python.sh | 2 + tools/run_tests/run_python.sh | 7 ++- tools/run_tests/run_tests.py | 26 +++++--- 7 files changed, 181 insertions(+), 10 deletions(-) create mode 100644 src/python/grpcio/tests/tests.json create mode 100644 src/python/grpcio/tests/unit/_sanity/__init__.py create mode 100644 src/python/grpcio/tests/unit/_sanity/_sanity_test.py diff --git a/src/python/grpcio/tests/_runner.py b/src/python/grpcio/tests/_runner.py index 4f1ddb57fcf..38a5432e791 100644 --- a/src/python/grpcio/tests/_runner.py +++ b/src/python/grpcio/tests/_runner.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -143,10 +143,17 @@ class Runner(object): def run(self, suite): """See setuptools' test_runner setup argument for information.""" + # only run test cases with id starting with given prefix + testcase_filter = os.getenv('GPRC_PYTHON_TESTRUNNER_FILTER') + filtered_cases = [] + for case in _loader.iterate_suite_cases(suite): + if not testcase_filter or case.id().startswith(testcase_filter): + filtered_cases.append(case) + # Ensure that every test case has no collision with any other test case in # the augmented results. augmented_cases = [AugmentedCase(case, uuid.uuid4()) - for case in _loader.iterate_suite_cases(suite)] + for case in filtered_cases] case_id_by_case = dict((augmented_case.case, augmented_case.id) for augmented_case in augmented_cases) result_out = StringIO.StringIO() diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json new file mode 100644 index 00000000000..388d040d5ca --- /dev/null +++ b/src/python/grpcio/tests/tests.json @@ -0,0 +1,62 @@ +[ + "_base_interface_test.AsyncEasyTest", + "_base_interface_test.AsyncPeasyTest", + "_base_interface_test.SyncEasyTest", + "_base_interface_test.SyncPeasyTest", + "_beta_features_test.BetaFeaturesTest", + "_beta_features_test.ContextManagementAndLifecycleTest", + "_channel_test.ChannelTest", + "_connectivity_channel_test.ChannelConnectivityTest", + "_core_over_links_base_interface_test.AsyncEasyTest", + "_core_over_links_base_interface_test.AsyncPeasyTest", + "_core_over_links_base_interface_test.SyncEasyTest", + "_core_over_links_base_interface_test.SyncPeasyTest", + "_crust_over_core_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", + "_crust_over_core_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest", + "_crust_over_core_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", + "_crust_over_core_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", + "_crust_over_core_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest", + "_crust_over_core_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", + "_crust_over_core_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", + "_crust_over_core_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest", + "_crust_over_core_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", + "_crust_over_core_over_links_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", + "_crust_over_core_over_links_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest", + "_crust_over_core_over_links_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", + "_crust_over_core_over_links_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", + "_crust_over_core_over_links_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest", + "_crust_over_core_over_links_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", + "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", + "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest", + "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", + "_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", + "_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest", + "_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", + "_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", + "_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest", + "_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", + "_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", + "_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest", + "_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", + "_implementations_test.ChannelCredentialsTest", + "_insecure_interop_test.InsecureInteropTest", + "_intermediary_low_test.CancellationTest", + "_intermediary_low_test.EchoTest", + "_intermediary_low_test.ExpirationTest", + "_intermediary_low_test.LonelyClientTest", + "_later_test.LaterTest", + "_logging_pool_test.LoggingPoolTest", + "_lonely_invocation_link_test.LonelyInvocationLinkTest", + "_low_test.HangingServerShutdown", + "_low_test.InsecureServerInsecureClient", + "_not_found_test.NotFoundTest", + "_sanity_test.Sanity", + "_secure_interop_test.SecureInteropTest", + "_transmission_test.RoundTripTest", + "_transmission_test.TransmissionTest", + "_utilities_test.ChannelConnectivityTest", + "beta_python_plugin_test.PythonPluginTest", + "cygrpc_test.InsecureServerInsecureClient", + "cygrpc_test.SecureServerSecureClient", + "cygrpc_test.TypeSmokeTest" +] \ No newline at end of file diff --git a/src/python/grpcio/tests/unit/_sanity/__init__.py b/src/python/grpcio/tests/unit/_sanity/__init__.py new file mode 100644 index 00000000000..2f88fa04122 --- /dev/null +++ b/src/python/grpcio/tests/unit/_sanity/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/_sanity/_sanity_test.py b/src/python/grpcio/tests/unit/_sanity/_sanity_test.py new file mode 100644 index 00000000000..0a5a715c0e1 --- /dev/null +++ b/src/python/grpcio/tests/unit/_sanity/_sanity_test.py @@ -0,0 +1,53 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import json +import unittest + +import tests + + +class Sanity(unittest.TestCase): + + def testTestsJsonUpToDate(self): + """Autodiscovers all test suites and checks that tests.json is up to date""" + loader = tests.Loader() + loader.loadTestsFromNames(['tests']) + test_suite_names = [ + test_case_class.id().rsplit('.', 1)[0] + for test_case_class in tests._loader.iterate_suite_cases(loader.suite)] + test_suite_names = sorted(set(test_suite_names)) + + with open('src/python/grpcio/tests/tests.json') as tests_json_file: + tests_json = json.load(tests_json_file) + self.assertListEqual(test_suite_names, tests_json) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tools/run_tests/build_python.sh b/tools/run_tests/build_python.sh index e0fcbb602d4..365abb4d863 100755 --- a/tools/run_tests/build_python.sh +++ b/tools/run_tests/build_python.sh @@ -45,3 +45,5 @@ export GRPC_PYTHON_ENABLE_CYTHON_TRACING=1 tox --notest $ROOT/.tox/py27/bin/python $ROOT/setup.py build +$ROOT/.tox/py27/bin/python $ROOT/setup.py build_py +$ROOT/.tox/py27/bin/python $ROOT/setup.py gather --test diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh index ffe9c12af1a..beb747a6169 100755 --- a/tools/run_tests/run_python.sh +++ b/tools/run_tests/run_python.sh @@ -42,7 +42,12 @@ export LDFLAGS="-L$ROOT/libs/$CONFIG" export GRPC_PYTHON_BUILD_WITH_CYTHON=1 export GRPC_PYTHON_ENABLE_CYTHON_TRACING=1 -tox +if [ "$CONFIG" = "gcov" ] +then + tox +else + $ROOT/.tox/py27/bin/python $ROOT/setup.py test +fi mkdir -p $ROOT/reports rm -rf $ROOT/reports/python-coverage diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 08a5ff0e8fa..c55d1fbe633 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -353,15 +353,27 @@ class PythonLanguage(object): _check_compiler(self.args.compiler, ['default']) def test_specs(self): + # load list of known test suites + with open('src/python/grpcio/tests/tests.json') as tests_json_file: + tests_json = json.load(tests_json_file) environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS) environment['PYVER'] = '2.7' - return [self.config.job_spec( - ['tools/run_tests/run_python.sh'], - None, - environ=environment, - shortname='py.test', - timeout_seconds=15*60 - )] + if self.config.build_config != 'gcov': + return [self.config.job_spec( + ['tools/run_tests/run_python.sh'], + None, + environ=dict(environment.items() + + [('GPRC_PYTHON_TESTRUNNER_FILTER', suite_name)]), + shortname='py.test.%s' % suite_name, + timeout_seconds=5*60) + for suite_name in tests_json] + else: + return [self.config.job_spec(['tools/run_tests/run_python.sh'], + None, + environ=environment, + shortname='py.test.coverage', + timeout_seconds=15*60)] + def pre_build_steps(self): return [] From 7d757ca29dc413777a9648cc2db9246235438d43 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 2 Mar 2016 19:44:16 -0800 Subject: [PATCH 03/33] Ensure that no #includes are inside of a namespace. --- test/cpp/qps/limit_cores.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc index fad9a323afd..28264d031a8 100644 --- a/test/cpp/qps/limit_cores.cc +++ b/test/cpp/qps/limit_cores.cc @@ -37,14 +37,16 @@ #include #include -namespace grpc { -namespace testing { #ifdef GPR_CPU_LINUX #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include + +namespace grpc { +namespace testing { + int LimitCores(const int* cores, int cores_size) { const int num_cores = gpr_cpu_num_cores(); int cores_set = 0; From 2a8c28037061b94c02eb05fb532e268709af88e5 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 2 Mar 2016 20:46:54 -0800 Subject: [PATCH 04/33] sanity --- test/cpp/interop/reconnect_interop_client.cc | 12 ++++++------ test/cpp/interop/reconnect_interop_server.cc | 12 ++++++------ test/cpp/qps/limit_cores.cc | 1 - 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc index 1f6b352db17..c668edaceb0 100644 --- a/test/cpp/interop/reconnect_interop_client.cc +++ b/test/cpp/interop/reconnect_interop_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,16 +34,16 @@ #include #include -#include -#include #include #include #include -#include "test/cpp/util/create_test_channel.h" -#include "test/cpp/util/test_config.h" -#include "src/proto/grpc/testing/test.grpc.pb.h" +#include +#include #include "src/proto/grpc/testing/empty.grpc.pb.h" #include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/test.grpc.pb.h" +#include "test/cpp/util/create_test_channel.h" +#include "test/cpp/util/test_config.h" DEFINE_int32(server_control_port, 0, "Server port for control rpcs."); DEFINE_int32(server_retry_port, 0, "Server port for testing reconnection."); diff --git a/test/cpp/interop/reconnect_interop_server.cc b/test/cpp/interop/reconnect_interop_server.cc index 785f9c7ad5f..1f9147d0efa 100644 --- a/test/cpp/interop/reconnect_interop_server.cc +++ b/test/cpp/interop/reconnect_interop_server.cc @@ -30,7 +30,7 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - + // Test description at doc/connection-backoff-interop-test-description.md #include @@ -42,17 +42,17 @@ #include #include -#include -#include #include #include #include +#include +#include -#include "test/core/util/reconnect_server.h" -#include "test/cpp/util/test_config.h" -#include "src/proto/grpc/testing/test.grpc.pb.h" #include "src/proto/grpc/testing/empty.grpc.pb.h" #include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/test.grpc.pb.h" +#include "test/core/util/reconnect_server.h" +#include "test/cpp/util/test_config.h" DEFINE_int32(control_port, 0, "Server port for controlling the server."); DEFINE_int32(retry_port, 0, diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc index 28264d031a8..1fb2d628f6d 100644 --- a/test/cpp/qps/limit_cores.cc +++ b/test/cpp/qps/limit_cores.cc @@ -37,7 +37,6 @@ #include #include - #ifdef GPR_CPU_LINUX #ifndef _GNU_SOURCE #define _GNU_SOURCE From 91bd627de4fb1ab1b8c31a915d4a91ccdcf72bbe Mon Sep 17 00:00:00 2001 From: Greg Haines Date: Wed, 2 Mar 2016 23:05:46 -0800 Subject: [PATCH 05/33] Feedback from @jcanizales and @vjpai --- .../GRPCClient/private/GRPCCompletionQueue.m | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m index 8163236cc4a..ffbb14374d1 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m @@ -60,7 +60,6 @@ const int64_t kGRPCCompletionQueueDefaultTimeoutSecs = 60; // anymore (i.e. on self dealloc). So the block would never end if it // retained self. grpc_completion_queue *unmanagedQueue = _unmanagedQueue; - int64_t lTimeoutSecs = _timeoutSecs; // Start a loop on a concurrent queue to read events from the completion // queue and dispatch each. @@ -70,18 +69,18 @@ const int64_t kGRPCCompletionQueueDefaultTimeoutSecs = 60; gDefaultConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); }); dispatch_async(gDefaultConcurrentQueue, ^{ - gpr_timespec deadline = gpr_time_from_seconds(lTimeoutSecs, GPR_CLOCK_REALTIME); + gpr_timespec deadline = gpr_time_from_seconds(timeoutSecs, GPR_CLOCK_REALTIME); while (YES) { // The following call blocks until an event is available or the deadline elapses. grpc_event event = grpc_completion_queue_next(unmanagedQueue, deadline, NULL); GRPCQueueCompletionHandler handler; switch (event.type) { - case GRPC_OP_COMPLETE: // Falling through deliberately - case GRPC_QUEUE_TIMEOUT: + case GRPC_OP_COMPLETE: handler = (__bridge_transfer GRPCQueueCompletionHandler)event.tag; - if (handler) { - handler(event.success); - } + handler(event.success); + break; + case GRPC_QUEUE_TIMEOUT: + // Nothing to do here break; case GRPC_QUEUE_SHUTDOWN: grpc_completion_queue_destroy(unmanagedQueue); From d7f12e30a0050b3fecb72fe7ad558b4d837e140f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 3 Mar 2016 10:08:31 -0800 Subject: [PATCH 06/33] Fix accept_stream being called post-channel deletion - Have the server clear the accept_stream callback prior to destroying the channel (required a small transport op protocol change) - Have the transport not enact transport ops until parsing is completed (prevents accept_stream from disappearing mid-parse) --- src/core/channel/client_channel.c | 2 +- src/core/channel/client_uchannel.c | 2 +- src/core/surface/server.c | 14 +++++++-- src/core/transport/chttp2/internal.h | 3 ++ src/core/transport/chttp2_transport.c | 43 +++++++++++++++++++-------- src/core/transport/transport.h | 6 ++-- 6 files changed, 51 insertions(+), 19 deletions(-) diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index eeac3c146c3..d4ba9508185 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -251,7 +251,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); - GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, op->bind_pollset); diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index b1e7155773f..83fcc3a87f5 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -107,7 +107,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); - GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->set_accept_stream == false); GPR_ASSERT(op->bind_pollset == NULL); if (op->on_connectivity_state_change != NULL) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index fb5e0d4b9e7..5b13d4ba526 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -407,8 +407,15 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { maybe_finish_shutdown(exec_ctx, chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true, - NULL); + + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.set_accept_stream = true; + op.on_consumed = &chand->finish_destroy_channel_closure; + grpc_channel_next_op(exec_ctx, + grpc_channel_stack_element( + grpc_channel_get_channel_stack(chand->channel), 0), + &op); } static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -971,7 +978,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); memset(&op, 0, sizeof(op)); - op.set_accept_stream = accept_stream; + op.set_accept_stream = true; + op.set_accept_stream_fn = accept_stream; op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 891aad6ef28..b720d1ab3e5 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -358,6 +358,9 @@ struct grpc_chttp2_transport { /** connectivity tracking */ grpc_connectivity_state_tracker state_tracker; } channel_callback; + + /** Transport op to be applied post-parsing */ + grpc_transport_op *post_parsing_op; }; typedef struct { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a1ca78d4c4f..369ff0ad7f6 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -951,12 +951,10 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, unlock(exec_ctx, t); } -static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_transport_op *op) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - int close_transport = 0; - - lock(t); +static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_transport_op *op) { + bool close_transport = false; grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); @@ -975,8 +973,8 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, close_transport = !grpc_chttp2_has_streams(t); } - if (op->set_accept_stream != NULL) { - t->channel_callback.accept_stream = op->set_accept_stream; + if (op->set_accept_stream) { + t->channel_callback.accept_stream = op->set_accept_stream_fn; t->channel_callback.accept_stream_user_data = op->set_accept_stream_user_data; } @@ -997,15 +995,29 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, close_transport_locked(exec_ctx, t); } - unlock(exec_ctx, t); - if (close_transport) { - lock(t); close_transport_locked(exec_ctx, t); - unlock(exec_ctx, t); } } +static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_transport_op *op) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + + lock(t); + + /* Let's be overly cautious: don't change any state while we're parsing */ + if (t->parsing_active) { + GPR_ASSERT(t->post_parsing_op == NULL); + t->post_parsing_op = gpr_malloc(sizeof(*op)); + memcpy(t->post_parsing_op, op, sizeof(*op)); + } else { + perform_transport_op_locked(exec_ctx, t, op); + } + + unlock(exec_ctx, t); +} + /******************************************************************************* * INPUT PROCESSING */ @@ -1401,6 +1413,13 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { /* handle higher level things */ grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); t->parsing_active = 0; + /* handle delayed transport ops (if there is one) */ + if (t->post_parsing_op) { + grpc_transport_op *op = t->post_parsing_op; + t->post_parsing_op = NULL; + perform_transport_op_locked(exec_ctx, t, op); + gpr_free(op); + } /* if a stream is in the stream map, and gets cancelled, we need to ensure * we are not parsing before continuing the cancellation to keep things in * a sane state */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 8902c5d2f65..7a007ef777b 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -139,8 +139,10 @@ typedef struct grpc_transport_op { gpr_slice *goaway_message; /** set the callback for accepting new streams; this is a permanent callback, unlike the other one-shot closures */ - void (*set_accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_transport *transport, const void *server_data); + bool set_accept_stream; + void (*set_accept_stream_fn)(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_transport *transport, + const void *server_data); void *set_accept_stream_user_data; /** add this transport to a pollset */ grpc_pollset *bind_pollset; From 66a6d014a4b2afec45906ff803e31dd89e82e22a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 3 Mar 2016 12:35:08 -0800 Subject: [PATCH 07/33] run build_ext in build_python step --- tools/run_tests/build_python.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/run_tests/build_python.sh b/tools/run_tests/build_python.sh index 365abb4d863..f120fc7ed69 100755 --- a/tools/run_tests/build_python.sh +++ b/tools/run_tests/build_python.sh @@ -46,4 +46,5 @@ tox --notest $ROOT/.tox/py27/bin/python $ROOT/setup.py build $ROOT/.tox/py27/bin/python $ROOT/setup.py build_py +$ROOT/.tox/py27/bin/python $ROOT/setup.py build_ext --inplace $ROOT/.tox/py27/bin/python $ROOT/setup.py gather --test From 13ee2f2df3ded81d096a1440a0f273eeece6a2cc Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 3 Mar 2016 13:57:32 -0800 Subject: [PATCH 08/33] Properly integrate async API with server-side cancellations. There is a comment above IsCancelled that says when it is ok to use this. --- .../grpc++/impl/codegen/completion_queue.h | 1 + include/grpc++/impl/codegen/server_context.h | 3 + src/cpp/server/server_context.cc | 26 ++- test/cpp/end2end/async_end2end_test.cc | 218 +++++++++++++----- 4 files changed, 187 insertions(+), 61 deletions(-) diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 102831e1c9b..928ab2db317 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -184,6 +184,7 @@ class CompletionQueue : private GrpcLibrary { bool Pluck(CompletionQueueTag* tag); /// Performs a single polling pluck on \a tag. + /// \warning Must not be mixed with calls to \a Next. void TryPluck(CompletionQueueTag* tag); grpc_completion_queue* cq_; // owned diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index ad08b8210d6..91ebe574b14 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -103,6 +103,9 @@ class ServerContext { void AddInitialMetadata(const grpc::string& key, const grpc::string& value); void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); + // IsCancelled is always safe to call when using sync API + // When using async API, it is only safe to call IsCancelled after + // the AsyncNotifyWhenDone tag has been delivered bool IsCancelled() const; // Cancel the Call from the server. This is a best-effort API and depending on diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index e205a1969b3..eb49b210379 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -62,7 +62,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; - bool CheckCancelled(CompletionQueue* cq); + bool CheckCancelled(CompletionQueue* cq) { + cq->TryPluck(this); + return CheckCancelledNoPluck(); + } + bool CheckCancelledAsync() { return CheckCancelledNoPluck(); } void set_tag(void* tag) { has_tag_ = true; @@ -72,6 +76,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void Unref(); private: + bool CheckCancelledNoPluck() { + grpc::lock_guard g(mu_); + return finalized_ ? (cancelled_ != 0) : false; + } + bool has_tag_; void* tag_; grpc::mutex mu_; @@ -88,12 +97,6 @@ void ServerContext::CompletionOp::Unref() { } } -bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { - cq->TryPluck(this); - grpc::lock_guard g(mu_); - return finalized_ ? cancelled_ != 0 : false; -} - void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->data.recv_close_on_server.cancelled = &cancelled_; @@ -182,7 +185,14 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - return completion_op_ && completion_op_->CheckCancelled(cq_); + if (has_notify_when_done_tag_) { + // when using async API, but the result is only valid + // if the tag has already been delivered at the completion queue + return completion_op_ && completion_op_->CheckCancelledAsync(); + } else { + // when using sync API + return completion_op_ && completion_op_->CheckCancelled(cq_); + } } void ServerContext::set_compression_level(grpc_compression_level level) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 9ca3bf98f85..09e973f28d5 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -68,6 +68,7 @@ namespace testing { namespace { void* tag(int i) { return (void*)(intptr_t)i; } +int detag(void* p) { return static_cast(reinterpret_cast(p)); } #ifdef GPR_POSIX_SOCKET static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, @@ -111,30 +112,35 @@ class Verifier { return *this; } + int Next(CompletionQueue* cq, bool ignore_ok) { + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + } + auto it = expectations_.find(got_tag); + EXPECT_TRUE(it != expectations_.end()); + if (!ignore_ok) { + EXPECT_EQ(it->second, ok); + } + expectations_.erase(it); + return detag(got_tag); + } + void Verify(CompletionQueue* cq) { Verify(cq, false); } void Verify(CompletionQueue* cq, bool ignore_ok) { GPR_ASSERT(!expectations_.empty()); while (!expectations_.empty()) { - bool ok; - void* got_tag; - if (spin_) { - for (;;) { - auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); - if (r == CompletionQueue::TIMEOUT) continue; - if (r == CompletionQueue::GOT_EVENT) break; - gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); - abort(); - } - } else { - EXPECT_TRUE(cq->Next(&got_tag, &ok)); - } - auto it = expectations_.find(got_tag); - EXPECT_TRUE(it != expectations_.end()); - if (!ignore_ok) { - EXPECT_EQ(it->second, ok); - } - expectations_.erase(it); + Next(cq, ignore_ok); } } void Verify(CompletionQueue* cq, @@ -793,7 +799,8 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { } // This class is for testing scenarios where RPCs are cancelled on the server -// by calling ServerContext::TryCancel() +// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone +// API to check for cancellation class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { protected: typedef enum { @@ -803,13 +810,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { CANCEL_AFTER_PROCESSING } ServerTryCancelRequestPhase; - void ServerTryCancel(ServerContext* context) { - EXPECT_FALSE(context->IsCancelled()); - context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancel()"); - EXPECT_TRUE(context->IsCancelled()); - } - // Helper for testing client-streaming RPCs which are cancelled on the server. // Depending on the value of server_try_cancel parameter, this will test one // of the following three scenarios: @@ -843,6 +843,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client + srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -858,9 +859,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_server_cq_result = true; bool ignore_cq_result = false; + bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); // Since cancellation is done before server reads any results, we know // for sure that all cq results will return false from this point forward @@ -868,22 +872,39 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; + + auto verif = Verifier(GetParam()); + if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = new std::thread( - &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + &ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while reading the // requests from the client. Since the cancellation can happen at anytime, // some of the cq results (i.e those until cancellation) might be true but // its non deterministic. So better to ignore the cq results ignore_cq_result = true; + // Expect that we might possibly see the done tag that + // indicates cancellation completion in this case + want_done_tag = true; + verif.Expect(11, true); } // Server reads 3 messages (tags 6, 7 and 8) + // But if want_done_tag is true, we might also see tag 11 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { srv_stream.Read(&recv_request, tag(tag_idx)); - Verifier(GetParam()) - .Expect(tag_idx, expected_server_cq_result) - .Verify(cq_.get(), ignore_cq_result); + // Note that we'll add something to the verifier and verify that + // something was seen, but it might be tag 11 and not what we + // just added + int got_tag = verif.Expect(tag_idx, expected_server_cq_result) + .Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); + } } if (server_try_cancel_thd != NULL) { @@ -892,7 +913,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + want_done_tag = true; + verif.Expect(11, true); + } + + if (want_done_tag) { + verif.Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; } // The RPC has been cancelled at this point for sure (i.e irrespective of @@ -945,6 +974,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client + srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -952,9 +982,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; + bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -962,24 +995,41 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; + + auto verif = Verifier(GetParam()); + if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = new std::thread( - &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + &ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while writing responses // to the client. Since the cancellation can happen at anytime, some of // the cq results (i.e those until cancellation) might be true but it is // non deterministic. So better to ignore the cq results ignore_cq_result = true; + // Expect that we might possibly see the done tag that + // indicates cancellation completion in this case + want_done_tag = true; + verif.Expect(11, true); } // Server sends three messages (tags 3, 4 and 5) + // But if want_done tag is true, we might also see tag 11 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { send_response.set_message("Pong " + std::to_string(tag_idx)); srv_stream.Write(send_response, tag(tag_idx)); - Verifier(GetParam()) - .Expect(tag_idx, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + // Note that we'll add something to the verifier and verify that + // something was seen, but it might be tag 11 and not what we + // just added + int got_tag = verif.Expect(tag_idx, expected_cq_result) + .Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); + } } if (server_try_cancel_thd != NULL) { @@ -988,13 +1038,21 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + want_done_tag = true; + verif.Expect(11, true); // Client reads may fail bacause it is notified that the stream is // cancelled. ignore_cq_result = true; } + if (want_done_tag) { + verif.Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + } + // Client attemts to read the three messages from the server for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { cli_stream->Read(&recv_response, tag(tag_idx)); @@ -1052,6 +1110,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client + srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -1063,9 +1122,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; + bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -1073,42 +1135,84 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; + + auto verif = Verifier(GetParam()); + if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = new std::thread( - &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + &ServerContext::TryCancel, &srv_ctx); // Since server is going to cancel the RPC in a parallel thread, some of // the cq results (i.e those until the cancellation) might be true. Since // that number is non-deterministic, it is better to ignore the cq results ignore_cq_result = true; + // Expect that we might possibly see the done tag that + // indicates cancellation completion in this case + want_done_tag = true; + verif.Expect(11, true); } + int got_tag; srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam()) - .Expect(4, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + verif.Expect(4, expected_cq_result); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4); + } send_response.set_message("Pong"); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam()) - .Expect(5, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + verif.Expect(5, expected_cq_result); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5); + } cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam()) - .Expect(6, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + verif.Expect(6, expected_cq_result); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6); + } // This is expected to succeed in all cases cli_stream->WritesDone(tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); + verif.Expect(7, true); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7); + } // This is expected to fail in all cases i.e for all values of // server_try_cancel. This is because at this point, either there are no // more msgs from the client (because client called WritesDone) or the RPC // is cancelled on the server srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + verif.Expect(8, false); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8); + } if (server_try_cancel_thd != NULL) { server_try_cancel_thd->join(); @@ -1116,7 +1220,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + want_done_tag = true; + verif.Expect(11, true); + } + + if (want_done_tag) { + verif.Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; } // The RPC has been cancelled at this point for sure (i.e irrespective of From 2e729387f7943971de2591d0db09a0a87026cfd7 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 3 Mar 2016 14:22:12 -0800 Subject: [PATCH 09/33] clang-format --- test/cpp/end2end/async_end2end_test.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 09e973f28d5..30028dd4aa6 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -876,8 +876,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { auto verif = Verifier(GetParam()); if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = new std::thread( - &ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = + new std::thread(&ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while reading the // requests from the client. Since the cancellation can happen at anytime, // some of the cq results (i.e those until cancellation) might be true but @@ -897,7 +897,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // something was seen, but it might be tag 11 and not what we // just added int got_tag = verif.Expect(tag_idx, expected_server_cq_result) - .Next(cq_.get(), ignore_cq_result); + .Next(cq_.get(), ignore_cq_result); GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); if (got_tag == 11) { EXPECT_TRUE(srv_ctx.IsCancelled()); @@ -999,8 +999,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { auto verif = Verifier(GetParam()); if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = new std::thread( - &ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = + new std::thread(&ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while writing responses // to the client. Since the cancellation can happen at anytime, some of @@ -1022,7 +1022,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // something was seen, but it might be tag 11 and not what we // just added int got_tag = verif.Expect(tag_idx, expected_cq_result) - .Next(cq_.get(), ignore_cq_result); + .Next(cq_.get(), ignore_cq_result); GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); if (got_tag == 11) { EXPECT_TRUE(srv_ctx.IsCancelled()); @@ -1139,8 +1139,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { auto verif = Verifier(GetParam()); if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = new std::thread( - &ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = + new std::thread(&ServerContext::TryCancel, &srv_ctx); // Since server is going to cancel the RPC in a parallel thread, some of // the cq results (i.e those until the cancellation) might be true. Since From 6dd3c707c2dd921288a82ec9813d41b567ff07b6 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 3 Mar 2016 13:53:49 -0800 Subject: [PATCH 10/33] Update base INSTALL to markdown and remove outdated content --- CONTRIBUTING.md | 2 +- INSTALL | 217 ------------------------------ INSTALL.md | 45 +++++++ README.md | 2 +- examples/cpp/README.md | 2 +- examples/cpp/cpptutorial.md | 2 +- examples/cpp/helloworld/README.md | 2 +- 7 files changed, 50 insertions(+), 222 deletions(-) delete mode 100644 INSTALL create mode 100644 INSTALL.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9423c46547a..03d2e276a82 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -13,7 +13,7 @@ In order to protect both you and ourselves, you will need to sign the ### Technical requirements You will need several tools to work with this repository. In addition to all of -the packages described in the [INSTALL](INSTALL) file, you will also need +the packages described in the [INSTALL](INSTALL.md) file, you will also need python, and the mako template renderer. To install the latter, using pip, one should simply be able to do `pip install mako`. diff --git a/INSTALL b/INSTALL deleted file mode 100644 index e33f8970a95..00000000000 --- a/INSTALL +++ /dev/null @@ -1,217 +0,0 @@ -These instructions only cover building grpc C and C++ libraries under -typical unix systems. If you need more information, please try grpc's -wiki pages: - - https://github.com/google/grpc/wiki - - -************************* -* If you are in a hurry * -************************* - -On Linux (Debian): - - Note: you will need to add the Debian 'jessie-backports' distribution to your sources - file first. - - Add the following line to your `/etc/apt/sources.list` file: - - deb http://http.debian.net/debian jessie-backports main - - Install the gRPC library: - - $ [sudo] apt-get install libgrpc-dev - -OR - - $ git clone https://github.com/grpc/grpc.git - $ cd grpc - $ git submodule update --init - $ make - $ [sudo] make install - -You don't need anything else than GNU Make, gcc and autotools. Under a Debian -or Ubuntu system, this should boil down to the following packages: - - $ [sudo] apt-get install build-essential autoconf libtool - -Building the python wrapper requires the following: - - $ [sudo] apt-get install python-all-dev python-virtualenv - -If you want to install in a different directory than the default /usr/lib, you can -override it on the command line: - - $ [sudo] make install prefix=/opt - - -******************************* -* More detailled instructions * -******************************* - -Setting up dependencies -======================= - -Dependencies to compile the libraries -------------------------------------- - -grpc libraries have few external dependencies. If you need to compile and -install them, they are present in the third_party directory if you have -cloned the github repository recursively. If you didn't clone recursively, -you can still get them later by running the following command: - - $ git submodule update --init - -Note that the Makefile makes it much easier for you to compile from sources -if you were to clone recursively our git repository: it will automatically -compile zlib and OpenSSL, which are core requirements for grpc. Note this -creates grpc libraries that will have zlib and OpenSSL built-in inside of them, -which significantly increases the libraries' size. - -In order to decrease that size, you can manually install zlib and OpenSSL on -your system, so that the Makefile can use them instead. - -Under a Debian or Ubuntu system, one can acquire the development package -for zlib this way: - - # apt-get install zlib1g-dev - -To the best of our knowledge, no distribution has an OpenSSL package that -supports ALPN yet, so you would still have to depend on installing from source -for that particular dependency if you want to reduce the libraries' size. - -The recommended version of OpenSSL that provides ALPN support is available -at this URL: - - https://www.openssl.org/source/openssl-1.0.2.tar.gz - - -Dependencies to compile and run the tests ------------------------------------------ - -Compiling and running grpc plain-C tests dont't require any more dependency. - - -Compiling and running grpc C++ tests depend on protobuf 3.0.0, gtest and -gflags. Although gflags is provided in third_party, you will need to manually -install that dependency on your system to run these tests. - -Under a Debian or Ubuntu system, you can install the gtests and gflags packages -using apt-get: - - # apt-get install libgflags-dev libgtest-dev - -However, protobuf 3.0.0 isn't in a debian package yet, but the Makefile will -automatically try and compile the one present in third_party if you cloned the -repository recursively, and that it detects your system is lacking it. - -Compiling and installing protobuf 3.0.0 requires a few more dependencies in -itself, notably the autoconf suite. If you have apt-get, you can install -these dependencies this way: - - # apt-get install autoconf libtool - -If you want to run the tests using one of the sanitized configurations, you -will need clang and its instrumented libc++: - - # apt-get install clang libc++-dev - -Mac-specific notes: -------------------- - -For a Mac system, git is not available by default. You will first need to -install Xcode from the Mac AppStore and then run the following command from a -terminal: - - $ sudo xcode-select --install - -You should also install "port" following the instructions at -https://www.macports.org . This will reside in /opt/local/bin/port for -most Mac installations. Do the "git submodule" command listed above. - -Then execute the following for all the needed build dependencies - - $ sudo /opt/local/bin/port install autoconf automake libtool gflags cmake - $ mkdir ~/gtest-svn - $ svn checkout http://googletest.googlecode.com/svn/trunk/ gtest-svn - $ mkdir mybuild - $ cd mybuild - $ cmake ../gtest-svn - $ make - $ make gtest.a gtest_main.a - $ sudo cp libgtest.a libgtest_main.a /opt/local/lib - $ sudo mkdir /opt/local/include/gtest - $ sudo cp -pr ../gtest-svn/include/gtest /opt/local/include/gtest - -If you are going to make changes and need to regenerate the projects file, -you will need to install certain modules for python. - - $ sudo easy_install simplejson mako - -Mingw-specific notes: ---------------------- - -While gRPC compiles properly under mingw, some more preparation work is needed. -The recommendation is to use msys2. The installation instructions are available -at that address: http://msys2.github.io/ - -Once this is installed, make sure you are using the following: MinGW-w64 Win64. -You'll be required to install a few more packages: - - $ pacman -S make mingw-w64-x86_64-gcc mingw-w64-x86_64-zlib autoconf automake libtool - -Please also install OpenSSL from that website: - - http://slproweb.com/products/Win32OpenSSL.html - -The package Win64 OpenSSL v1.0.2a should do. At that point you should be able -to compile gRPC with the following: - - $ export LDFLAGS="-L/mingw64/lib -L/c/OpenSSL-Win64" - $ export CPPFLAGS="-I/mingw64/include -I/c/OpenSSL-Win64/include" - $ make - -A word on OpenSSL ------------------ - -Secure HTTP2 requires the TLS extension ALPN (see rfc 7301 and -http://http2.github.io/http2-spec/ section 3.3). Our HTTP2 implementation -relies on OpenSSL's implementation. OpenSSL 1.0.2 is the first released version -of OpenSSL that has ALPN support, and this explains our dependency on it. - -Note that the Makefile supports compiling only the unsecure elements of grpc, -and if you do not have OpenSSL and do not want it, you can still proceed -with installing only the elements you require. However, we strongly recommend -the use of encryption for all network traffic, and discourage the use of grpc -without TLS. - - -Compiling -========= - -If you have all the dependencies mentioned above, you should simply be able -to go ahead and run "make" to compile grpc's C and C++ libraries: - - $ make - - -Testing -======= - -To build and run the tests, you can run the command: - - $ make test - -If you want to be able to run them in parallel, and get better output, you can -also use the python tool we have written: - - $ ./tools/run_tests/run_tests.py - - -Installing -========== - -Once everything is compiled, you should be able to install grpc C and C++ -libraries and headers: - - # make install diff --git a/INSTALL.md b/INSTALL.md new file mode 100644 index 00000000000..aa809f23c8f --- /dev/null +++ b/INSTALL.md @@ -0,0 +1,45 @@ +#If you are in a hurry + +For language-specific installation instructions, please refer to these +documents + + * [C++](examples/cpp) + * [C#](src/csharp): NuGet package `Grpc` + * [Go](https://github.com/grpc/grpc-go): `go get google.golang.org/grpc` + * [Java](https://github.com/grpc/grpc-java) + * [Node](src/node): `npm install grpc` + * [Objective-C](src/objective-c) + * [PHP](src/php): `pecl install grpc-beta` + * [Python](src/python/grpcio): `pip install grpcio` + * [Ruby](src/ruby): `gem install grpc` + + +#Pre-requisites + +##Linux + +```sh + $ [sudo] apt-get install build-essential autoconf libtool +``` + +##Mac OSX + +For a Mac system, git is not available by default. You will first need to +install Xcode from the Mac AppStore and then run the following command from a +terminal: + +```sh + $ [sudo] xcode-select --install +``` + +#Build from Source + +This is for compiling just the gRPC C Core library. + +```sh + $ git clone https://github.com/grpc/grpc.git + $ cd grpc + $ git submodule update --init + $ make + $ [sudo] make install +``` diff --git a/README.md b/README.md index 033e09b91b7..abb4905392c 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ You can find more detailed documentation and examples in the [doc](doc) and [exa #Installation -See [grpc/INSTALL](INSTALL) for installation instructions for various platforms. +See [INSTALL](INSTALL.md) for installation instructions for various platforms. #Repository Structure & Status diff --git a/examples/cpp/README.md b/examples/cpp/README.md index 85c495099b7..979ba55dbb5 100644 --- a/examples/cpp/README.md +++ b/examples/cpp/README.md @@ -2,7 +2,7 @@ ## Installation -To install gRPC on your system, follow the instructions [here](../../INSTALL). +To install gRPC on your system, follow the instructions [here](../../INSTALL.md). ## Hello C++ gRPC! diff --git a/examples/cpp/cpptutorial.md b/examples/cpp/cpptutorial.md index cd1cddb1115..ef9ca99c0fe 100644 --- a/examples/cpp/cpptutorial.md +++ b/examples/cpp/cpptutorial.md @@ -91,7 +91,7 @@ message Point { Next we need to generate the gRPC client and server interfaces from our .proto service definition. We do this using the protocol buffer compiler `protoc` with a special gRPC C++ plugin. -For simplicity, we've provided a [makefile](route_guide/Makefile) that runs `protoc` for you with the appropriate plugin, input, and output (if you want to run this yourself, make sure you've installed protoc and followed the gRPC code [installation instructions](../../INSTALL) first): +For simplicity, we've provided a [makefile](route_guide/Makefile) that runs `protoc` for you with the appropriate plugin, input, and output (if you want to run this yourself, make sure you've installed protoc and followed the gRPC code [installation instructions](../../INSTALL.md) first): ```shell $ make route_guide.grpc.pb.cc route_guide.pb.cc diff --git a/examples/cpp/helloworld/README.md b/examples/cpp/helloworld/README.md index 90f3d6b1475..8e11f7cdf3e 100644 --- a/examples/cpp/helloworld/README.md +++ b/examples/cpp/helloworld/README.md @@ -2,7 +2,7 @@ ### Install gRPC Make sure you have installed gRPC on your system. Follow the instructions here: -[https://github.com/grpc/grpc/blob/master/INSTALL](../../../INSTALL). +[https://github.com/grpc/grpc/blob/master/INSTALL](../../../INSTALL.md). ### Get the tutorial source code From a22a50d61795c38522fe8b032603c63fcef7f93f Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 3 Mar 2016 14:49:03 -0800 Subject: [PATCH 11/33] bit of text change --- INSTALL.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/INSTALL.md b/INSTALL.md index aa809f23c8f..d9411db021d 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -1,7 +1,7 @@ #If you are in a hurry -For language-specific installation instructions, please refer to these -documents +For language-specific installation instructions for gRPC runtime, please +refer to these documents * [C++](examples/cpp) * [C#](src/csharp): NuGet package `Grpc` @@ -34,7 +34,8 @@ terminal: #Build from Source -This is for compiling just the gRPC C Core library. +For developers who are interested to contribute, here is how to compile the +gRPC C Core library. ```sh $ git clone https://github.com/grpc/grpc.git From 19f703dbb7805b95bdc64d166a4e5fb36e2dd192 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 3 Mar 2016 15:33:29 -0800 Subject: [PATCH 12/33] increase timeout for interop tests --- tools/run_tests/run_interop_tests.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py index df3ab90a839..1dc772a8565 100755 --- a/tools/run_tests/run_interop_tests.py +++ b/tools/run_tests/run_interop_tests.py @@ -60,6 +60,8 @@ _SKIP_COMPRESSION = ['large_compressed_unary', _SKIP_ADVANCED = ['custom_metadata', 'status_code_and_message', 'unimplemented_method'] +_TEST_TIMEOUT = 3*60 + class CXXLanguage: def __init__(self): @@ -459,7 +461,7 @@ def cloud_to_prod_jobspec(language, test_case, server_host_name, environ=environ, shortname='%s:%s:%s:%s' % (suite_name, server_host_name, language, test_case), - timeout_seconds=90, + timeout_seconds=_TEST_TIMEOUT, flake_retries=5 if args.allow_flakes else 0, timeout_retries=2 if args.allow_flakes else 0, kill_handler=_job_kill_handler) @@ -495,7 +497,7 @@ def cloud_to_cloud_jobspec(language, test_case, server_name, server_host, environ=environ, shortname='cloud_to_cloud:%s:%s_server:%s' % (language, server_name, test_case), - timeout_seconds=90, + timeout_seconds=_TEST_TIMEOUT, flake_retries=5 if args.allow_flakes else 0, timeout_retries=2 if args.allow_flakes else 0, kill_handler=_job_kill_handler) From dbf47fabd406b51e8d3d03c5eb06c5fe6f7d0d5d Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 3 Mar 2016 16:25:06 -0800 Subject: [PATCH 13/33] Better comments. --- test/cpp/end2end/async_end2end_test.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 30028dd4aa6..dc8c2bb6e5b 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -107,11 +107,14 @@ class PollingOverrider { class Verifier { public: explicit Verifier(bool spin) : spin_(spin) {} + // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { expectations_[tag(i)] = expect_ok; return *this; } + // Next waits for 1 async tag to complete, checks its + // expectations, and returns the tag int Next(CompletionQueue* cq, bool ignore_ok) { bool ok; void* got_tag; @@ -135,14 +138,19 @@ class Verifier { return detag(got_tag); } + // Verify keeps calling Next until all currently set + // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } + // This version of Verify allows optionally ignoring the + // outcome of the expectation void Verify(CompletionQueue* cq, bool ignore_ok) { GPR_ASSERT(!expectations_.empty()); while (!expectations_.empty()) { Next(cq, ignore_ok); } } + // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { if (expectations_.empty()) { From 79a0f0611c6235fbf636c3e754860e6de28b4e3c Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 3 Mar 2016 17:18:48 -0800 Subject: [PATCH 14/33] Properly delete Node OpenSSL headers after downloading them --- tools/run_tests/build_node.bat | 10 +++++++++- tools/run_tests/pre_build_node.bat | 9 +-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/tools/run_tests/build_node.bat b/tools/run_tests/build_node.bat index 6896bc1d1bb..3ddd0e73cf7 100644 --- a/tools/run_tests/build_node.bat +++ b/tools/run_tests/build_node.bat @@ -27,4 +27,12 @@ @rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE @rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -npm install --build-from-source \ No newline at end of file +call npm install --build-from-source + +@rem delete the redundant openssl headers +for /f "delims=v" %%v in ('node --version') do ( + rmdir "%HOMEDRIVE%%HOMEPATH%\.node-gyp\%%v\include\node\openssl" /S /Q +) + +@rem rebuild, because it probably failed the first time +call npm install --build-from-source \ No newline at end of file diff --git a/tools/run_tests/pre_build_node.bat b/tools/run_tests/pre_build_node.bat index 6e7cbe5d420..ffb4a09f15a 100644 --- a/tools/run_tests/pre_build_node.bat +++ b/tools/run_tests/pre_build_node.bat @@ -28,12 +28,5 @@ @rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. @rem Expire cache after 1 week -npm update --cache-min 604800 +call npm update --cache-min 604800 -npm install node-gyp-install -.\node_modules\.bin\node-gyp-install.cmd - -@rem delete the redundant openssl headers -for /f "delims=v" %%v in ('node --version') do ( - rmdir "%HOMEDRIVE%%HOMEPATH%\.node-gyp\%%v\include\node\openssl" /S /Q -) \ No newline at end of file From 4913ea06eb4093a2229b3009dedb02828af1edc2 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 3 Mar 2016 17:22:53 -0800 Subject: [PATCH 15/33] Ensure node and npm are in the path when running tests --- tools/run_tests/build_node.bat | 4 ++++ tools/run_tests/pre_build_node.bat | 2 ++ tools/run_tests/run_node.bat | 1 + 3 files changed, 7 insertions(+) diff --git a/tools/run_tests/build_node.bat b/tools/run_tests/build_node.bat index 3ddd0e73cf7..886af0610ff 100644 --- a/tools/run_tests/build_node.bat +++ b/tools/run_tests/build_node.bat @@ -27,6 +27,10 @@ @rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE @rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +set PATH=%PATH%;C:\Program Files\nodejs\;%APPDATA%\npm + +del /f /q BUILD || rmdir build /s /q + call npm install --build-from-source @rem delete the redundant openssl headers diff --git a/tools/run_tests/pre_build_node.bat b/tools/run_tests/pre_build_node.bat index ffb4a09f15a..a29456f9ed9 100644 --- a/tools/run_tests/pre_build_node.bat +++ b/tools/run_tests/pre_build_node.bat @@ -27,6 +27,8 @@ @rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE @rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +set PATH=%PATH%;C:\Program Files\nodejs\;%APPDATA%\npm + @rem Expire cache after 1 week call npm update --cache-min 604800 diff --git a/tools/run_tests/run_node.bat b/tools/run_tests/run_node.bat index 41777363568..0987fbee559 100644 --- a/tools/run_tests/run_node.bat +++ b/tools/run_tests/run_node.bat @@ -27,6 +27,7 @@ @rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE @rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +set PATH=%PATH%;C:\Program Files\nodejs\;%APPDATA%\npm set JUNIT_REPORT_PATH=src\node\report.xml set JUNIT_REPORT_STACK=1 .\node_modules\.bin\mocha.cmd --reporter mocha-jenkins-reporter --timeout 8000 src\node\test \ No newline at end of file From 77fae2784ed7f894dc557ee2004364f2bb1b6b74 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 3 Mar 2016 19:39:56 -0800 Subject: [PATCH 16/33] fix broken links --- doc/interop-test-descriptions.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/doc/interop-test-descriptions.md b/doc/interop-test-descriptions.md index e618e967ee0..3beb1d11f4a 100644 --- a/doc/interop-test-descriptions.md +++ b/doc/interop-test-descriptions.md @@ -2,9 +2,8 @@ Interoperability Test Case Descriptions ======================================= Client and server use -[test.proto](https://github.com/grpc/grpc/blob/master/test/proto/test.proto) -and the [gRPC over HTTP/2 v2 -protocol](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md). +[test.proto](../src/proto/grpc/testing/test.proto) +and the [gRPC over HTTP/2 v2 protocol](./PROTOCOL-HTTP2.md). Client ------ From 3f5c5c7e18f6b09d85e39f310f214e90316cc0ef Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 3 Mar 2016 19:46:13 -0800 Subject: [PATCH 17/33] fix broken link --- examples/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/README.md b/examples/README.md index 84ec80057e1..cd85417ca20 100644 --- a/examples/README.md +++ b/examples/README.md @@ -447,4 +447,4 @@ $ greeter_client ## Read more! - You can find links to language-specific tutorials, examples, and other docs in each language's [quick start](#quickstart). -- [gRPC Authentication Support](doc/grpc-auth-support.md) introduces authentication support in gRPC with supported mechanisms and examples. +- [gRPC Authentication Support](http://www.grpc.io/docs/guides/auth.html) introduces authentication support in gRPC with supported mechanisms and examples. From 5821a444c703757d4bacdf2d58a3045c6cfbb838 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 3 Mar 2016 19:46:58 -0800 Subject: [PATCH 18/33] remove stray grpc_common reference --- examples/node/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/node/README.md b/examples/node/README.md index 7a2bc9794f3..c1ef6b05ab0 100644 --- a/examples/node/README.md +++ b/examples/node/README.md @@ -20,7 +20,7 @@ TRY IT! - Run the server ```sh - $ # from this directory (grpc_common/node). + $ # from this directory $ node ./greeter_server.js & ``` From f9946d579bc5b258261177b5ff87df22e7d49a07 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 3 Mar 2016 20:06:20 -0800 Subject: [PATCH 19/33] remove old stuff from tools/README.md --- tools/README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tools/README.md b/tools/README.md index a0c41eb79ff..df4c6ef7d25 100644 --- a/tools/README.md +++ b/tools/README.md @@ -1,7 +1,5 @@ buildgen: template renderer for our build system. -distpackages: script to generate debian packages. - distrib: scripts to distribute language-specific packages. dockerfile: Docker files to test gRPC. @@ -12,6 +10,4 @@ gce: scripts to help setup testing infrastructure on GCE. jenkins: support for running tests on Jenkins. -profile_analyzer: pretty printer for gRPC profiling data. - run_tests: scripts to run gRPC tests in parallel. From 389297dedfe384246d4831a4fc33e7af7b0200e5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 3 Mar 2016 21:02:54 -0800 Subject: [PATCH 20/33] Document some things --- src/core/transport/transport.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7a007ef777b..5a90bd6d387 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -123,7 +123,8 @@ typedef struct grpc_transport_stream_op { /** Transport op: a set of operations to perform on a transport as a whole */ typedef struct grpc_transport_op { - /** called when processing of this op is done */ + /** Called when processing of this op is done. + Only one transport_op is allowed to be outstanding at any time. */ grpc_closure *on_consumed; /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ grpc_closure *on_connectivity_state_change; @@ -138,7 +139,9 @@ typedef struct grpc_transport_op { grpc_status_code goaway_status; gpr_slice *goaway_message; /** set the callback for accepting new streams; - this is a permanent callback, unlike the other one-shot closures */ + this is a permanent callback, unlike the other one-shot closures. + If true, the callback is set to set_accept_stream_fn, with its + user_data argument set to set_accept_stream_user_data */ bool set_accept_stream; void (*set_accept_stream_fn)(grpc_exec_ctx *exec_ctx, void *user_data, grpc_transport *transport, From 1bd9ea407fd673f6c795a524c6454bc5db339a85 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 3 Mar 2016 21:09:31 -0800 Subject: [PATCH 21/33] Refine condition --- src/core/transport/chttp2_transport.c | 5 +++-- src/core/transport/transport.h | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 369ff0ad7f6..cf1dbfa0e59 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1006,8 +1006,9 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, lock(t); - /* Let's be overly cautious: don't change any state while we're parsing */ - if (t->parsing_active) { + /* If there's a set_accept_stream ensure that we're not parsing + to avoid changing things out from underneath */ + if (t->parsing_active && t->set_accept_stream) { GPR_ASSERT(t->post_parsing_op == NULL); t->post_parsing_op = gpr_malloc(sizeof(*op)); memcpy(t->post_parsing_op, op, sizeof(*op)); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 5a90bd6d387..ed6e121c9cb 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -123,8 +123,7 @@ typedef struct grpc_transport_stream_op { /** Transport op: a set of operations to perform on a transport as a whole */ typedef struct grpc_transport_op { - /** Called when processing of this op is done. - Only one transport_op is allowed to be outstanding at any time. */ + /** Called when processing of this op is done. */ grpc_closure *on_consumed; /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ grpc_closure *on_connectivity_state_change; From 687bf6295f5e3a02f249aced69868e497c1bcc30 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 3 Mar 2016 21:18:48 -0800 Subject: [PATCH 22/33] Fix typo --- src/core/transport/chttp2_transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index cf1dbfa0e59..643e3feeb97 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1008,7 +1008,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, /* If there's a set_accept_stream ensure that we're not parsing to avoid changing things out from underneath */ - if (t->parsing_active && t->set_accept_stream) { + if (t->parsing_active && op->set_accept_stream) { GPR_ASSERT(t->post_parsing_op == NULL); t->post_parsing_op = gpr_malloc(sizeof(*op)); memcpy(t->post_parsing_op, op, sizeof(*op)); From af22087a43ffe7e69cb269419f046635adfc46e6 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 4 Mar 2016 10:00:31 -0800 Subject: [PATCH 23/33] Fix leak: dont become writable if writes are closed --- src/core/transport/chttp2_transport.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 643e3feeb97..ce54f7e6e82 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -617,6 +617,7 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && + !stream_global->write_closed && grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); } From ad23ae196702cc3889a8c04a21273aa6a43aad75 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 4 Mar 2016 10:50:56 -0800 Subject: [PATCH 24/33] Unref writable streams on transport close to avoid leaks --- src/core/transport/chttp2_transport.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index ce54f7e6e82..19265252ca3 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -432,6 +432,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, if (t->ep) { allow_endpoint_shutdown_locked(exec_ctx, t); } + + /* flush writable stream list to avoid dangling references */ + grpc_chttp2_stream_global *stream_global; + grpc_chttp2_stream_writing *stream_writing; + while (grpc_chttp2_list_pop_writable_stream( + &t->global, &t->writing, &stream_global, &stream_writing)) { + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); + } } } @@ -617,7 +625,6 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && - !stream_global->write_closed && grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); } From 34fcf43d374d312290d05408fbd554c3138a1d7d Mon Sep 17 00:00:00 2001 From: vjpai Date: Fri, 4 Mar 2016 11:02:05 -0800 Subject: [PATCH 25/33] Restart workers in each scenario --- tools/jenkins/run_performance.sh | 33 ++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/tools/jenkins/run_performance.sh b/tools/jenkins/run_performance.sh index fbc078330f7..abad97358cc 100755 --- a/tools/jenkins/run_performance.sh +++ b/tools/jenkins/run_performance.sh @@ -42,15 +42,10 @@ config=opt make CONFIG=$config qps_worker qps_driver -j8 -bins/$config/qps_worker -driver_port 10000 & -PID1=$! -bins/$config/qps_worker -driver_port 10010 & -PID2=$! - # # Put a timeout on these tests # -((sleep 900; kill $$ && killall qps_worker && rm -f /tmp/qps-test.$$ )&) +((sleep 900; killall qps_worker && rm -f /tmp/qps-test.$$ && kill $$)&) export QPS_WORKERS="localhost:10000,localhost:10010" @@ -71,38 +66,52 @@ halfcores=`expr $cores / 2` for secure in true false; do # Scenario 1: generic async streaming ping-pong (contentionless latency) + bins/$config/qps_worker -driver_port 10000 & + bins/$config/qps_worker -driver_port 10010 & bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=1 \ --client_channels=1 --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=1 --async_server_threads=1 --secure_test=$secure \ --num_servers=1 --num_clients=1 \ --server_core_limit=$halfcores --client_core_limit=0 - + bins/$config/qps_driver --quit=true + # Scenario 2: generic async streaming "unconstrained" (QPS) + bins/$config/qps_worker -driver_port 10000 & + bins/$config/qps_worker -driver_port 10010 & bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=0 \ --server_core_limit=$halfcores --client_core_limit=0 |& tee /tmp/qps-test.$$ + bins/$config/qps_driver --quit=true # Scenario 2b: QPS with a single server core + bins/$config/qps_worker -driver_port 10000 & + bins/$config/qps_worker -driver_port 10010 & bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=0 --server_core_limit=1 --client_core_limit=0 + bins/$config/qps_driver --quit=true # Scenario 2c: protobuf-based QPS + bins/$config/qps_worker -driver_port 10000 & + bins/$config/qps_worker -driver_port 10010 & bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --simple_req_size=0 --simple_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=0 \ --server_core_limit=$halfcores --client_core_limit=0 + bins/$config/qps_driver --quit=true # Scenario 3: Latency at sub-peak load (all clients equally loaded) for loadfactor in 0.7; do + bins/$config/qps_worker -driver_port 10000 & + bins/$config/qps_worker -driver_port 10010 & bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ @@ -110,27 +119,31 @@ for secure in true false; do --num_servers=1 --num_clients=0 --poisson_load=`awk -v lf=$loadfactor \ '$5 == "QPS:" {print int(lf * $6); exit}' /tmp/qps-test.$$` \ --server_core_limit=$halfcores --client_core_limit=0 + bins/$config/qps_driver --quit=true done rm /tmp/qps-test.$$ # Scenario 4: Single-channel bidirectional throughput test (like TCP_STREAM). + bins/$config/qps_worker -driver_port 10000 & + bins/$config/qps_worker -driver_port 10010 & bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=1 --bbuf_req_size=$big --bbuf_resp_size=$big \ --async_client_threads=1 --async_server_threads=1 --secure_test=$secure \ --num_servers=1 --num_clients=1 \ --server_core_limit=$halfcores --client_core_limit=0 + bins/$config/qps_driver --quit=true # Scenario 5: Sync unary ping-pong with protobufs + bins/$config/qps_worker -driver_port 10000 & + bins/$config/qps_worker -driver_port 10010 & bins/$config/qps_driver --rpc_type=UNARY --client_type=SYNC_CLIENT \ --server_type=SYNC_SERVER --outstanding_rpcs_per_channel=1 \ --client_channels=1 --simple_req_size=0 --simple_resp_size=0 \ --secure_test=$secure --num_servers=1 --num_clients=1 \ --server_core_limit=$halfcores --client_core_limit=0 - + bins/$config/qps_driver --quit=true done -bins/$config/qps_driver --quit=true - wait From 2d9476898b119ead1045eb31894bcd10176344fd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 4 Mar 2016 11:05:42 -0800 Subject: [PATCH 26/33] Revert "Ensure that no #includes are inside of a namespace." --- test/cpp/interop/reconnect_interop_client.cc | 10 +++++----- test/cpp/interop/reconnect_interop_server.cc | 12 +++++------- test/cpp/qps/limit_cores.cc | 7 +++---- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc index c668edaceb0..79a60cc8602 100644 --- a/test/cpp/interop/reconnect_interop_client.cc +++ b/test/cpp/interop/reconnect_interop_client.cc @@ -34,16 +34,16 @@ #include #include +#include +#include #include #include #include -#include -#include -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" -#include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/util/test_config.h" +#include "src/proto/grpc/testing/test.grpc.pb.h" +#include "src/proto/grpc/testing/empty.grpc.pb.h" +#include "src/proto/grpc/testing/messages.grpc.pb.h" DEFINE_int32(server_control_port, 0, "Server port for control rpcs."); DEFINE_int32(server_retry_port, 0, "Server port for testing reconnection."); diff --git a/test/cpp/interop/reconnect_interop_server.cc b/test/cpp/interop/reconnect_interop_server.cc index 1f9147d0efa..3602b8c2b05 100644 --- a/test/cpp/interop/reconnect_interop_server.cc +++ b/test/cpp/interop/reconnect_interop_server.cc @@ -31,8 +31,6 @@ * */ -// Test description at doc/connection-backoff-interop-test-description.md - #include #include @@ -42,17 +40,17 @@ #include #include +#include +#include #include #include #include -#include -#include -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" -#include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/core/util/reconnect_server.h" #include "test/cpp/util/test_config.h" +#include "src/proto/grpc/testing/test.grpc.pb.h" +#include "src/proto/grpc/testing/empty.grpc.pb.h" +#include "src/proto/grpc/testing/messages.grpc.pb.h" DEFINE_int32(control_port, 0, "Server port for controlling the server."); DEFINE_int32(retry_port, 0, diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc index 1fb2d628f6d..fad9a323afd 100644 --- a/test/cpp/qps/limit_cores.cc +++ b/test/cpp/qps/limit_cores.cc @@ -37,15 +37,14 @@ #include #include +namespace grpc { +namespace testing { + #ifdef GPR_CPU_LINUX #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include - -namespace grpc { -namespace testing { - int LimitCores(const int* cores, int cores_size) { const int num_cores = gpr_cpu_num_cores(); int cores_set = 0; From 180f6dfc8d16f16c607f0b5e51f1efe1596f3212 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 4 Mar 2016 11:23:48 -0800 Subject: [PATCH 27/33] Keep changing the ports each scenario also --- tools/jenkins/run_performance.sh | 58 ++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/tools/jenkins/run_performance.sh b/tools/jenkins/run_performance.sh index abad97358cc..b06fce7acb9 100755 --- a/tools/jenkins/run_performance.sh +++ b/tools/jenkins/run_performance.sh @@ -47,8 +47,6 @@ make CONFIG=$config qps_worker qps_driver -j8 # ((sleep 900; killall qps_worker && rm -f /tmp/qps-test.$$ && kill $$)&) -export QPS_WORKERS="localhost:10000,localhost:10010" - # big is the size in bytes of large messages (0 is the size otherwise) big=65536 @@ -64,10 +62,18 @@ deep=100 cores=`grep -c ^processor /proc/cpuinfo` halfcores=`expr $cores / 2` +# +# Keep changing the ports +port1=10000 +port2=10100 + for secure in true false; do # Scenario 1: generic async streaming ping-pong (contentionless latency) - bins/$config/qps_worker -driver_port 10000 & - bins/$config/qps_worker -driver_port 10010 & + bins/$config/qps_worker -driver_port $port1 & + bins/$config/qps_worker -driver_port $port2 & + export QPS_WORKERS="localhost:$port1,localhost:$port2" + port1=`expr $port1 + 1` + port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=1 \ --client_channels=1 --bbuf_req_size=0 --bbuf_resp_size=0 \ @@ -75,10 +81,13 @@ for secure in true false; do --num_servers=1 --num_clients=1 \ --server_core_limit=$halfcores --client_core_limit=0 bins/$config/qps_driver --quit=true - + # Scenario 2: generic async streaming "unconstrained" (QPS) - bins/$config/qps_worker -driver_port 10000 & - bins/$config/qps_worker -driver_port 10010 & + bins/$config/qps_worker -driver_port $port1 & + bins/$config/qps_worker -driver_port $port2 & + export QPS_WORKERS="localhost:$port1,localhost:$port2" + port1=`expr $port1 + 1` + port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ @@ -88,8 +97,11 @@ for secure in true false; do bins/$config/qps_driver --quit=true # Scenario 2b: QPS with a single server core - bins/$config/qps_worker -driver_port 10000 & - bins/$config/qps_worker -driver_port 10010 & + bins/$config/qps_worker -driver_port $port1 & + bins/$config/qps_worker -driver_port $port2 & + export QPS_WORKERS="localhost:$port1,localhost:$port2" + port1=`expr $port1 + 1` + port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ @@ -98,8 +110,11 @@ for secure in true false; do bins/$config/qps_driver --quit=true # Scenario 2c: protobuf-based QPS - bins/$config/qps_worker -driver_port 10000 & - bins/$config/qps_worker -driver_port 10010 & + bins/$config/qps_worker -driver_port $port1 & + bins/$config/qps_worker -driver_port $port2 & + export QPS_WORKERS="localhost:$port1,localhost:$port2" + port1=`expr $port1 + 1` + port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --simple_req_size=0 --simple_resp_size=0 \ @@ -110,8 +125,11 @@ for secure in true false; do # Scenario 3: Latency at sub-peak load (all clients equally loaded) for loadfactor in 0.7; do - bins/$config/qps_worker -driver_port 10000 & - bins/$config/qps_worker -driver_port 10010 & + bins/$config/qps_worker -driver_port $port1 & + bins/$config/qps_worker -driver_port $port2 & + export QPS_WORKERS="localhost:$port1,localhost:$port2" + port1=`expr $port1 + 1` + port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ @@ -125,8 +143,11 @@ for secure in true false; do rm /tmp/qps-test.$$ # Scenario 4: Single-channel bidirectional throughput test (like TCP_STREAM). - bins/$config/qps_worker -driver_port 10000 & - bins/$config/qps_worker -driver_port 10010 & + bins/$config/qps_worker -driver_port $port1 & + bins/$config/qps_worker -driver_port $port2 & + export QPS_WORKERS="localhost:$port1,localhost:$port2" + port1=`expr $port1 + 1` + port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=1 --bbuf_req_size=$big --bbuf_resp_size=$big \ @@ -136,8 +157,11 @@ for secure in true false; do bins/$config/qps_driver --quit=true # Scenario 5: Sync unary ping-pong with protobufs - bins/$config/qps_worker -driver_port 10000 & - bins/$config/qps_worker -driver_port 10010 & + bins/$config/qps_worker -driver_port $port1 & + bins/$config/qps_worker -driver_port $port2 & + export QPS_WORKERS="localhost:$port1,localhost:$port2" + port1=`expr $port1 + 1` + port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=UNARY --client_type=SYNC_CLIENT \ --server_type=SYNC_SERVER --outstanding_rpcs_per_channel=1 \ --client_channels=1 --simple_req_size=0 --simple_resp_size=0 \ From edd96e4926373644bac1c169431c213d361bed21 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 4 Mar 2016 14:32:50 -0500 Subject: [PATCH 28/33] Revert "Properly integrate async API with server-side cancellations." --- .../grpc++/impl/codegen/completion_queue.h | 1 - include/grpc++/impl/codegen/server_context.h | 3 - src/cpp/server/server_context.cc | 26 +- test/cpp/end2end/async_end2end_test.cc | 232 +++++------------- 4 files changed, 64 insertions(+), 198 deletions(-) diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 928ab2db317..102831e1c9b 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -184,7 +184,6 @@ class CompletionQueue : private GrpcLibrary { bool Pluck(CompletionQueueTag* tag); /// Performs a single polling pluck on \a tag. - /// \warning Must not be mixed with calls to \a Next. void TryPluck(CompletionQueueTag* tag); grpc_completion_queue* cq_; // owned diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 91ebe574b14..ad08b8210d6 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -103,9 +103,6 @@ class ServerContext { void AddInitialMetadata(const grpc::string& key, const grpc::string& value); void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); - // IsCancelled is always safe to call when using sync API - // When using async API, it is only safe to call IsCancelled after - // the AsyncNotifyWhenDone tag has been delivered bool IsCancelled() const; // Cancel the Call from the server. This is a best-effort API and depending on diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index eb49b210379..e205a1969b3 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -62,11 +62,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; - bool CheckCancelled(CompletionQueue* cq) { - cq->TryPluck(this); - return CheckCancelledNoPluck(); - } - bool CheckCancelledAsync() { return CheckCancelledNoPluck(); } + bool CheckCancelled(CompletionQueue* cq); void set_tag(void* tag) { has_tag_ = true; @@ -76,11 +72,6 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void Unref(); private: - bool CheckCancelledNoPluck() { - grpc::lock_guard g(mu_); - return finalized_ ? (cancelled_ != 0) : false; - } - bool has_tag_; void* tag_; grpc::mutex mu_; @@ -97,6 +88,12 @@ void ServerContext::CompletionOp::Unref() { } } +bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { + cq->TryPluck(this); + grpc::lock_guard g(mu_); + return finalized_ ? cancelled_ != 0 : false; +} + void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->data.recv_close_on_server.cancelled = &cancelled_; @@ -185,14 +182,7 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - if (has_notify_when_done_tag_) { - // when using async API, but the result is only valid - // if the tag has already been delivered at the completion queue - return completion_op_ && completion_op_->CheckCancelledAsync(); - } else { - // when using sync API - return completion_op_ && completion_op_->CheckCancelled(cq_); - } + return completion_op_ && completion_op_->CheckCancelled(cq_); } void ServerContext::set_compression_level(grpc_compression_level level) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index dc8c2bb6e5b..9ca3bf98f85 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -68,7 +68,6 @@ namespace testing { namespace { void* tag(int i) { return (void*)(intptr_t)i; } -int detag(void* p) { return static_cast(reinterpret_cast(p)); } #ifdef GPR_POSIX_SOCKET static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, @@ -107,50 +106,37 @@ class PollingOverrider { class Verifier { public: explicit Verifier(bool spin) : spin_(spin) {} - // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { expectations_[tag(i)] = expect_ok; return *this; } - // Next waits for 1 async tag to complete, checks its - // expectations, and returns the tag - int Next(CompletionQueue* cq, bool ignore_ok) { - bool ok; - void* got_tag; - if (spin_) { - for (;;) { - auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); - if (r == CompletionQueue::TIMEOUT) continue; - if (r == CompletionQueue::GOT_EVENT) break; - gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); - abort(); - } - } else { - EXPECT_TRUE(cq->Next(&got_tag, &ok)); - } - auto it = expectations_.find(got_tag); - EXPECT_TRUE(it != expectations_.end()); - if (!ignore_ok) { - EXPECT_EQ(it->second, ok); - } - expectations_.erase(it); - return detag(got_tag); - } - - // Verify keeps calling Next until all currently set - // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } - // This version of Verify allows optionally ignoring the - // outcome of the expectation void Verify(CompletionQueue* cq, bool ignore_ok) { GPR_ASSERT(!expectations_.empty()); while (!expectations_.empty()) { - Next(cq, ignore_ok); + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + } + auto it = expectations_.find(got_tag); + EXPECT_TRUE(it != expectations_.end()); + if (!ignore_ok) { + EXPECT_EQ(it->second, ok); + } + expectations_.erase(it); } } - // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { if (expectations_.empty()) { @@ -807,8 +793,7 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { } // This class is for testing scenarios where RPCs are cancelled on the server -// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone -// API to check for cancellation +// by calling ServerContext::TryCancel() class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { protected: typedef enum { @@ -818,6 +803,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { CANCEL_AFTER_PROCESSING } ServerTryCancelRequestPhase; + void ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel()"); + EXPECT_TRUE(context->IsCancelled()); + } + // Helper for testing client-streaming RPCs which are cancelled on the server. // Depending on the value of server_try_cancel parameter, this will test one // of the following three scenarios: @@ -851,7 +843,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client - srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -867,12 +858,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_server_cq_result = true; bool ignore_cq_result = false; - bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); + ServerTryCancel(&srv_ctx); // Since cancellation is done before server reads any results, we know // for sure that all cq results will return false from this point forward @@ -880,39 +868,22 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; - - auto verif = Verifier(GetParam()); - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread(&ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); // Server will cancel the RPC in a parallel thread while reading the // requests from the client. Since the cancellation can happen at anytime, // some of the cq results (i.e those until cancellation) might be true but // its non deterministic. So better to ignore the cq results ignore_cq_result = true; - // Expect that we might possibly see the done tag that - // indicates cancellation completion in this case - want_done_tag = true; - verif.Expect(11, true); } // Server reads 3 messages (tags 6, 7 and 8) - // But if want_done_tag is true, we might also see tag 11 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { srv_stream.Read(&recv_request, tag(tag_idx)); - // Note that we'll add something to the verifier and verify that - // something was seen, but it might be tag 11 and not what we - // just added - int got_tag = verif.Expect(tag_idx, expected_server_cq_result) - .Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); - } + Verifier(GetParam()) + .Expect(tag_idx, expected_server_cq_result) + .Verify(cq_.get(), ignore_cq_result); } if (server_try_cancel_thd != NULL) { @@ -921,15 +892,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - srv_ctx.TryCancel(); - want_done_tag = true; - verif.Expect(11, true); - } - - if (want_done_tag) { - verif.Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; + ServerTryCancel(&srv_ctx); } // The RPC has been cancelled at this point for sure (i.e irrespective of @@ -982,7 +945,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client - srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -990,12 +952,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; - bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); + ServerTryCancel(&srv_ctx); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -1003,41 +962,24 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; - - auto verif = Verifier(GetParam()); - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread(&ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); // Server will cancel the RPC in a parallel thread while writing responses // to the client. Since the cancellation can happen at anytime, some of // the cq results (i.e those until cancellation) might be true but it is // non deterministic. So better to ignore the cq results ignore_cq_result = true; - // Expect that we might possibly see the done tag that - // indicates cancellation completion in this case - want_done_tag = true; - verif.Expect(11, true); } // Server sends three messages (tags 3, 4 and 5) - // But if want_done tag is true, we might also see tag 11 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { send_response.set_message("Pong " + std::to_string(tag_idx)); srv_stream.Write(send_response, tag(tag_idx)); - // Note that we'll add something to the verifier and verify that - // something was seen, but it might be tag 11 and not what we - // just added - int got_tag = verif.Expect(tag_idx, expected_cq_result) - .Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); - } + Verifier(GetParam()) + .Expect(tag_idx, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); } if (server_try_cancel_thd != NULL) { @@ -1046,21 +988,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - srv_ctx.TryCancel(); - want_done_tag = true; - verif.Expect(11, true); + ServerTryCancel(&srv_ctx); // Client reads may fail bacause it is notified that the stream is // cancelled. ignore_cq_result = true; } - if (want_done_tag) { - verif.Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - } - // Client attemts to read the three messages from the server for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { cli_stream->Read(&recv_response, tag(tag_idx)); @@ -1118,7 +1052,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client - srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -1130,12 +1063,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; - bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); + ServerTryCancel(&srv_ctx); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -1143,84 +1073,42 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; - - auto verif = Verifier(GetParam()); - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread(&ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); // Since server is going to cancel the RPC in a parallel thread, some of // the cq results (i.e those until the cancellation) might be true. Since // that number is non-deterministic, it is better to ignore the cq results ignore_cq_result = true; - // Expect that we might possibly see the done tag that - // indicates cancellation completion in this case - want_done_tag = true; - verif.Expect(11, true); } - int got_tag; srv_stream.Read(&recv_request, tag(4)); - verif.Expect(4, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4); - } + Verifier(GetParam()) + .Expect(4, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); send_response.set_message("Pong"); srv_stream.Write(send_response, tag(5)); - verif.Expect(5, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5); - } + Verifier(GetParam()) + .Expect(5, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); cli_stream->Read(&recv_response, tag(6)); - verif.Expect(6, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6); - } + Verifier(GetParam()) + .Expect(6, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); // This is expected to succeed in all cases cli_stream->WritesDone(tag(7)); - verif.Expect(7, true); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7); - } + Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); // This is expected to fail in all cases i.e for all values of // server_try_cancel. This is because at this point, either there are no // more msgs from the client (because client called WritesDone) or the RPC // is cancelled on the server srv_stream.Read(&recv_request, tag(8)); - verif.Expect(8, false); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8); - } + Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); if (server_try_cancel_thd != NULL) { server_try_cancel_thd->join(); @@ -1228,15 +1116,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - srv_ctx.TryCancel(); - want_done_tag = true; - verif.Expect(11, true); - } - - if (want_done_tag) { - verif.Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; + ServerTryCancel(&srv_ctx); } // The RPC has been cancelled at this point for sure (i.e irrespective of From 60d38cbc1d4c3d5a460ab3e718e7d9acc34c3fba Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 4 Mar 2016 13:05:04 -0800 Subject: [PATCH 29/33] Simplify Node Windows tests slightly Simplify command that removes OpenSSL headers and remove now-extraneous post-test script. --- tools/run_tests/build_node.bat | 2 +- tools/run_tests/post_test_node.bat | 30 ------------------------------ tools/run_tests/run_tests.py | 5 +---- 3 files changed, 2 insertions(+), 35 deletions(-) delete mode 100644 tools/run_tests/post_test_node.bat diff --git a/tools/run_tests/build_node.bat b/tools/run_tests/build_node.bat index 886af0610ff..82e82083486 100644 --- a/tools/run_tests/build_node.bat +++ b/tools/run_tests/build_node.bat @@ -35,7 +35,7 @@ call npm install --build-from-source @rem delete the redundant openssl headers for /f "delims=v" %%v in ('node --version') do ( - rmdir "%HOMEDRIVE%%HOMEPATH%\.node-gyp\%%v\include\node\openssl" /S /Q + rmdir "%USERPROFILE%\.node-gyp\%%v\include\node\openssl" /S /Q ) @rem rebuild, because it probably failed the first time diff --git a/tools/run_tests/post_test_node.bat b/tools/run_tests/post_test_node.bat deleted file mode 100644 index 1a2a5491fa7..00000000000 --- a/tools/run_tests/post_test_node.bat +++ /dev/null @@ -1,30 +0,0 @@ -@rem Copyright 2016, Google Inc. -@rem All rights reserved. -@rem -@rem Redistribution and use in source and binary forms, with or without -@rem modification, are permitted provided that the following conditions are -@rem met: -@rem -@rem * Redistributions of source code must retain the above copyright -@rem notice, this list of conditions and the following disclaimer. -@rem * Redistributions in binary form must reproduce the above -@rem copyright notice, this list of conditions and the following disclaimer -@rem in the documentation and/or other materials provided with the -@rem distribution. -@rem * Neither the name of Google Inc. nor the names of its -@rem contributors may be used to endorse or promote products derived from -@rem this software without specific prior written permission. -@rem -@rem THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -@rem "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -@rem LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -@rem A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -@rem OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -@rem SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -@rem LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -@rem DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -@rem THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -@rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -@rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -rmdir node_modules /S /Q \ No newline at end of file diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index c55d1fbe633..cc004f38d7f 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -290,10 +290,7 @@ class NodeLanguage(object): return [['tools/run_tests/build_node.sh', self.node_version]] def post_tests_steps(self): - if self.platform == 'windows': - return [['tools\\run_tests\\post_test_node.bat']] - else: - return [] + return [] def makefile_name(self): return 'Makefile' From aaf66a9981a658b62be005ac5e04d704545ab2da Mon Sep 17 00:00:00 2001 From: Greg Haines Date: Fri, 4 Mar 2016 13:10:26 -0800 Subject: [PATCH 30/33] Add comments and feature flag. --- src/objective-c/GRPCClient/private/GRPCCompletionQueue.m | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m index ffbb14374d1..b7d5af67d9b 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m @@ -69,7 +69,11 @@ const int64_t kGRPCCompletionQueueDefaultTimeoutSecs = 60; gDefaultConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); }); dispatch_async(gDefaultConcurrentQueue, ^{ - gpr_timespec deadline = gpr_time_from_seconds(timeoutSecs, GPR_CLOCK_REALTIME); + // Using a non-infinite deadline to re-enter grpc_completion_queue_next() + // alleviates https://github.com/grpc/grpc/issues/5593 + gpr_timespec deadline = (timeoutSecs < 0) + ? gpr_inf_future(GPR_CLOCK_REALTIME) + : gpr_time_from_seconds(timeoutSecs, GPR_CLOCK_REALTIME); while (YES) { // The following call blocks until an event is available or the deadline elapses. grpc_event event = grpc_completion_queue_next(unmanagedQueue, deadline, NULL); From 459edd97123b522de40e03302eb8741841aad8ec Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 4 Mar 2016 14:32:11 -0800 Subject: [PATCH 31/33] Revert "Pass a non-infinite deadline to grpc_completion_queue_next() to prevent queues from blocking indefinitely in poll()" --- .../GRPCClient/private/GRPCCompletionQueue.h | 7 ------ .../GRPCClient/private/GRPCCompletionQueue.m | 24 ++++--------------- 2 files changed, 5 insertions(+), 26 deletions(-) diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h index 03fd2e0d955..fe3b8f39d12 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h @@ -36,8 +36,6 @@ typedef void(^GRPCQueueCompletionHandler)(bool success); -extern const int64_t kGRPCCompletionQueueDefaultTimeoutSecs; - /** * This class lets one more easily use |grpc_completion_queue|. To use it, pass the value of the * |unmanagedQueue| property of an instance of this class to |grpc_channel_create_call|. Then for @@ -51,11 +49,6 @@ extern const int64_t kGRPCCompletionQueueDefaultTimeoutSecs; */ @interface GRPCCompletionQueue : NSObject @property(nonatomic, readonly) grpc_completion_queue *unmanagedQueue; -@property(nonatomic, readonly) int64_t timeoutSecs; + (instancetype)completionQueue; - -- (instancetype)init; -- (instancetype)initWithTimeout:(int64_t)timeoutSecs NS_DESIGNATED_INITIALIZER; - @end diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m index b7d5af67d9b..ea2b01ee1d7 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m @@ -35,9 +35,6 @@ #import - -const int64_t kGRPCCompletionQueueDefaultTimeoutSecs = 60; - @implementation GRPCCompletionQueue + (instancetype)completionQueue { @@ -45,13 +42,8 @@ const int64_t kGRPCCompletionQueueDefaultTimeoutSecs = 60; } - (instancetype)init { - return [self initWithTimeout:kGRPCCompletionQueueDefaultTimeoutSecs]; -} - -- (instancetype)initWithTimeout:(int64_t)timeoutSecs { if ((self = [super init])) { _unmanagedQueue = grpc_completion_queue_create(NULL); - _timeoutSecs = timeoutSecs; // This is for the following block to capture the pointer by value (instead // of retaining self and doing self->_unmanagedQueue). This is essential @@ -69,28 +61,22 @@ const int64_t kGRPCCompletionQueueDefaultTimeoutSecs = 60; gDefaultConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); }); dispatch_async(gDefaultConcurrentQueue, ^{ - // Using a non-infinite deadline to re-enter grpc_completion_queue_next() - // alleviates https://github.com/grpc/grpc/issues/5593 - gpr_timespec deadline = (timeoutSecs < 0) - ? gpr_inf_future(GPR_CLOCK_REALTIME) - : gpr_time_from_seconds(timeoutSecs, GPR_CLOCK_REALTIME); while (YES) { - // The following call blocks until an event is available or the deadline elapses. - grpc_event event = grpc_completion_queue_next(unmanagedQueue, deadline, NULL); + // The following call blocks until an event is available. + grpc_event event = grpc_completion_queue_next(unmanagedQueue, + gpr_inf_future(GPR_CLOCK_REALTIME), + NULL); GRPCQueueCompletionHandler handler; switch (event.type) { case GRPC_OP_COMPLETE: handler = (__bridge_transfer GRPCQueueCompletionHandler)event.tag; handler(event.success); break; - case GRPC_QUEUE_TIMEOUT: - // Nothing to do here - break; case GRPC_QUEUE_SHUTDOWN: grpc_completion_queue_destroy(unmanagedQueue); return; default: - [NSException raise:@"Unrecognized completion type" format:@"type=%d", event.type]; + [NSException raise:@"Unrecognized completion type" format:@""]; } }; }); From f5dca1089639ab124e9cbc0db6893f6f0ada0fc6 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 4 Mar 2016 14:45:39 -0800 Subject: [PATCH 32/33] Revert "Improve perf smoke test stability" --- tools/jenkins/run_performance.sh | 59 ++++++-------------------------- 1 file changed, 11 insertions(+), 48 deletions(-) diff --git a/tools/jenkins/run_performance.sh b/tools/jenkins/run_performance.sh index b06fce7acb9..fbc078330f7 100755 --- a/tools/jenkins/run_performance.sh +++ b/tools/jenkins/run_performance.sh @@ -42,10 +42,17 @@ config=opt make CONFIG=$config qps_worker qps_driver -j8 +bins/$config/qps_worker -driver_port 10000 & +PID1=$! +bins/$config/qps_worker -driver_port 10010 & +PID2=$! + # # Put a timeout on these tests # -((sleep 900; killall qps_worker && rm -f /tmp/qps-test.$$ && kill $$)&) +((sleep 900; kill $$ && killall qps_worker && rm -f /tmp/qps-test.$$ )&) + +export QPS_WORKERS="localhost:10000,localhost:10010" # big is the size in bytes of large messages (0 is the size otherwise) big=65536 @@ -62,74 +69,40 @@ deep=100 cores=`grep -c ^processor /proc/cpuinfo` halfcores=`expr $cores / 2` -# -# Keep changing the ports -port1=10000 -port2=10100 - for secure in true false; do # Scenario 1: generic async streaming ping-pong (contentionless latency) - bins/$config/qps_worker -driver_port $port1 & - bins/$config/qps_worker -driver_port $port2 & - export QPS_WORKERS="localhost:$port1,localhost:$port2" - port1=`expr $port1 + 1` - port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=1 \ --client_channels=1 --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=1 --async_server_threads=1 --secure_test=$secure \ --num_servers=1 --num_clients=1 \ --server_core_limit=$halfcores --client_core_limit=0 - bins/$config/qps_driver --quit=true # Scenario 2: generic async streaming "unconstrained" (QPS) - bins/$config/qps_worker -driver_port $port1 & - bins/$config/qps_worker -driver_port $port2 & - export QPS_WORKERS="localhost:$port1,localhost:$port2" - port1=`expr $port1 + 1` - port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=0 \ --server_core_limit=$halfcores --client_core_limit=0 |& tee /tmp/qps-test.$$ - bins/$config/qps_driver --quit=true # Scenario 2b: QPS with a single server core - bins/$config/qps_worker -driver_port $port1 & - bins/$config/qps_worker -driver_port $port2 & - export QPS_WORKERS="localhost:$port1,localhost:$port2" - port1=`expr $port1 + 1` - port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=0 --server_core_limit=1 --client_core_limit=0 - bins/$config/qps_driver --quit=true # Scenario 2c: protobuf-based QPS - bins/$config/qps_worker -driver_port $port1 & - bins/$config/qps_worker -driver_port $port2 & - export QPS_WORKERS="localhost:$port1,localhost:$port2" - port1=`expr $port1 + 1` - port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --simple_req_size=0 --simple_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=0 \ --server_core_limit=$halfcores --client_core_limit=0 - bins/$config/qps_driver --quit=true # Scenario 3: Latency at sub-peak load (all clients equally loaded) for loadfactor in 0.7; do - bins/$config/qps_worker -driver_port $port1 & - bins/$config/qps_worker -driver_port $port2 & - export QPS_WORKERS="localhost:$port1,localhost:$port2" - port1=`expr $port1 + 1` - port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ @@ -137,37 +110,27 @@ for secure in true false; do --num_servers=1 --num_clients=0 --poisson_load=`awk -v lf=$loadfactor \ '$5 == "QPS:" {print int(lf * $6); exit}' /tmp/qps-test.$$` \ --server_core_limit=$halfcores --client_core_limit=0 - bins/$config/qps_driver --quit=true done rm /tmp/qps-test.$$ # Scenario 4: Single-channel bidirectional throughput test (like TCP_STREAM). - bins/$config/qps_worker -driver_port $port1 & - bins/$config/qps_worker -driver_port $port2 & - export QPS_WORKERS="localhost:$port1,localhost:$port2" - port1=`expr $port1 + 1` - port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=1 --bbuf_req_size=$big --bbuf_resp_size=$big \ --async_client_threads=1 --async_server_threads=1 --secure_test=$secure \ --num_servers=1 --num_clients=1 \ --server_core_limit=$halfcores --client_core_limit=0 - bins/$config/qps_driver --quit=true # Scenario 5: Sync unary ping-pong with protobufs - bins/$config/qps_worker -driver_port $port1 & - bins/$config/qps_worker -driver_port $port2 & - export QPS_WORKERS="localhost:$port1,localhost:$port2" - port1=`expr $port1 + 1` - port2=`expr $port2 + 1` bins/$config/qps_driver --rpc_type=UNARY --client_type=SYNC_CLIENT \ --server_type=SYNC_SERVER --outstanding_rpcs_per_channel=1 \ --client_channels=1 --simple_req_size=0 --simple_resp_size=0 \ --secure_test=$secure --num_servers=1 --num_clients=1 \ --server_core_limit=$halfcores --client_core_limit=0 - bins/$config/qps_driver --quit=true + done +bins/$config/qps_driver --quit=true + wait From c1edf224797367b62cd88ecff37dae23d333d308 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 4 Mar 2016 15:01:03 -0800 Subject: [PATCH 33/33] Fix sanity --- src/objective-c/GRPCClient/private/GRPCCompletionQueue.h | 2 +- src/objective-c/GRPCClient/private/GRPCCompletionQueue.m | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h index fe3b8f39d12..7b66cd4c329 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m index ea2b01ee1d7..ff3031678c5 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without