From 7adf2f5c650b157958e7c165c481e85370154d9c Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Fri, 21 Feb 2020 23:28:19 +0000 Subject: [PATCH 1/9] Implement first pass at Python XDS interop client. I discovered a couple of existing shortcomings while implementing this client that mean this is *not yet ready for inclusion in CI*. I nonetheless want to get an early review and integrate this to the master branch, in the interest of small easily reviewable code changes. The first problem was that the bazel build has apparently never pulled roots.pem in as a data dependency. This appears not to have been a problem until XDS enters into the mix. This has been added into the Bazel build with a small change to the pyx_library rule. The larger problem is that there is currently no public Python API to get a peer's identity from the client side. This is crucial to determining the correctness of an interaction of a client-server pair under XDS. I intend to add such an method to the RpcContext interface in an upcoming PR and use it in the interop client. For the moment, I simply fake the peer details. Finally, I add a knob to run_xds_tests.py allowing multiple instances of this test to exist at once. Multiple instances forwarding to the same port cannot exist on GCE, so this enables multiple people to run tests at the same time. --- bazel/cython_library.bzl | 5 +- src/python/grpcio/grpc/_cython/BUILD.bazel | 8 + .../grpcio_tests/tests/interop/BUILD.bazel | 13 ++ .../tests/interop/xds_interop_client.py | 201 ++++++++++++++++++ tools/run_tests/run_xds_tests.py | 6 +- 5 files changed, 231 insertions(+), 2 deletions(-) create mode 100644 src/python/grpcio_tests/tests/interop/xds_interop_client.py diff --git a/bazel/cython_library.bzl b/bazel/cython_library.bzl index 3f34e185380..c9f864231c1 100644 --- a/bazel/cython_library.bzl +++ b/bazel/cython_library.bzl @@ -63,12 +63,15 @@ def pyx_library(name, deps = [], py_deps = [], srcs = [], **kwargs): ) shared_objects.append(shared_object_name) + data = shared_objects[:] + data += kwargs.pop("data", []) + # Now create a py_library with these shared objects as data. native.py_library( name = name, srcs = py_srcs, deps = py_deps, srcs_version = "PY2AND3", - data = shared_objects, + data = data, **kwargs ) diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index c7ae040a804..646d31068ca 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -2,6 +2,13 @@ package(default_visibility = ["//visibility:public"]) load("//bazel:cython_library.bzl", "pyx_library") +genrule( + name = "copy_roots_pem", + srcs = ["//:etc/roots.pem"], + outs = ["_credentials/roots.pem"], + cmd = "cp $(SRCS) $(@)", +) + pyx_library( name = "cygrpc", srcs = glob([ @@ -9,6 +16,7 @@ pyx_library( "cygrpc.pxd", "cygrpc.pyx", ]), + data = [":copy_roots_pem"], deps = [ "//:grpc", ], diff --git a/src/python/grpcio_tests/tests/interop/BUILD.bazel b/src/python/grpcio_tests/tests/interop/BUILD.bazel index 4685852162b..cd47bee2172 100644 --- a/src/python/grpcio_tests/tests/interop/BUILD.bazel +++ b/src/python/grpcio_tests/tests/interop/BUILD.bazel @@ -115,3 +115,16 @@ py2and3_test( "//src/python/grpcio_tests/tests/unit:test_common", ], ) + +py_binary( + name = "xds_interop_client", + srcs = ["xds_interop_client.py"], + python_version = "PY3", + deps = [ + "//src/proto/grpc/testing:empty_py_pb2", + "//src/proto/grpc/testing:py_messages_proto", + "//src/proto/grpc/testing:py_test_proto", + "//src/proto/grpc/testing:test_py_pb2_grpc", + "//src/python/grpcio/grpc:grpcio", + ], +) diff --git a/src/python/grpcio_tests/tests/interop/xds_interop_client.py b/src/python/grpcio_tests/tests/interop/xds_interop_client.py new file mode 100644 index 00000000000..3a79ef0f32a --- /dev/null +++ b/src/python/grpcio_tests/tests/interop/xds_interop_client.py @@ -0,0 +1,201 @@ +# Copyright 2020 The gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import signal +import threading +import time +import sys + +from typing import DefaultDict, List, Set +import collections + +from concurrent import futures + +import grpc + +from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import empty_pb2 + + +# TODO: Back with a LoadBalancerStatsResponse proto? +class _StatsWatcher: + _start: int + _end: int + _rpcs_needed: int + _rpcs_by_peer: DefaultDict[str, int] + _no_remote_peer: int + _lock: threading.Lock + _condition: threading.Condition + + def __init__(self, start: int, end: int): + self._start = start + self._end = end + self._rpcs_needed = end - start + self._rpcs_by_peer = collections.defaultdict(int) + self._lock = threading.Lock() + self._condition = threading.Condition(self._lock) + self._no_remote_peer = 0 + + def on_rpc_complete(self, request_id: int, peer: str) -> None: + """Records statistics for a single RPC.""" + if self._start <= request_id < self._end: + with self._lock: + if not peer: + self._no_remote_peer += 1 + else: + self._rpcs_by_peer[peer] += 1 + self._rpcs_needed -= 1 + self._condition.notify() + + def await_rpc_stats_response(self, timeout_sec: int + ) -> messages_pb2.LoadBalancerStatsResponse: + """Blocks until a full response has been collected.""" + with self._lock: + self._condition.wait_for(lambda: not self._rpcs_needed, + timeout=float(timeout_sec)) + response = messages_pb2.LoadBalancerStatsResponse() + for peer, count in self._rpcs_by_peer.items(): + response.rpcs_by_peer[peer] = count + response.num_failures = self._no_remote_peer + self._rpcs_needed + return response + + +_global_lock = threading.Lock() +_stop_event = threading.Event() +_global_rpc_id: int = 0 +_watchers: Set[_StatsWatcher] = set() +_global_server = None + + +def _handle_sigint(sig, frame): + _stop_event.set() + _global_server.stop(None) + + +class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer + ): + + def __init__(self): + super(_LoadBalancerStatsServicer).__init__() + + def GetClientStats(self, request: messages_pb2.LoadBalancerStatsRequest, + context: grpc.ServicerContext + ) -> messages_pb2.LoadBalancerStatsResponse: + print("Received stats request.") + sys.stdout.flush() + start = None + end = None + watcher = None + with _global_lock: + start = _global_rpc_id + 1 + end = start + request.num_rpcs + watcher = _StatsWatcher(start, end) + _watchers.add(watcher) + response = watcher.await_rpc_stats_response(request.timeout_sec) + with _global_lock: + _watchers.remove(watcher) + return response + + +# TODO: Accept finer-grained arguments. +def _run_single_channel(args: argparse.Namespace): + global _global_rpc_id # pylint: disable=global-statement + duration_per_query = 1.0 / float(args.qps) + with grpc.insecure_channel(args.server) as channel: + stub = test_pb2_grpc.TestServiceStub(channel) + while not _stop_event.is_set(): + request_id = None + with _global_lock: + request_id = _global_rpc_id + _global_rpc_id += 1 + print(f"Sending request to backend: {request_id}") + sys.stdout.flush() + start = time.time() + end = start + duration_per_query + call, _ = stub.UnaryCall.with_call(messages_pb2.SimpleRequest(), + timeout=float( + args.rpc_timeout_sec)) + print(f"Got result {request_id}") + sys.stdout.flush() + with _global_lock: + for watcher in _watchers: + # TODO: Implement a peer details getter. + peer = f"192.168.1.{request_id % 255}" + watcher.on_rpc_complete(request_id, peer) + if args.print_response: + if call.code() == grpc.StatusCode.OK: + print("Successful response.") + sys.stdout.flush() + else: + print(f"RPC failed: {call}") + sys.stdout.flush() + now = time.time() + while now < end: + time.sleep(end - now) + now = time.time() + + +# TODO: Accept finer-grained arguments. +def _run(args: argparse.Namespace) -> None: + global _global_server # pylint: disable=global-statement + channel_threads: List[threading.Thread] = [] + for i in range(args.num_channels): + thread = threading.Thread(target=_run_single_channel, args=(args,)) + thread.start() + channel_threads.append(thread) + _global_server = grpc.server(futures.ThreadPoolExecutor()) + _global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}") + test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server( + _LoadBalancerStatsServicer(), _global_server) + _global_server.start() + _global_server.wait_for_termination() + for i in range(args.num_channels): + thread.join() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description='Run Python XDS interop client.') + parser.add_argument( + "--num_channels", + default=1, + type=int, + help="The number of channels from which to send requests.") + parser.add_argument("--print_response", + default=False, + action="store_true", + help="Write RPC response to STDOUT.") + parser.add_argument( + "--qps", + default=1, + type=int, + help="The number of queries to send from each channel per second.") + parser.add_argument("--rpc_timeout_sec", + default=10, + type=int, + help="The per-RPC timeout in seconds.") + parser.add_argument("--server", + default="localhost:50051", + help="The address of the server.") + parser.add_argument( + "--stats_port", + default=50052, + type=int, + help="The port on which to expose the peer distribution stats service.") + args = parser.parse_args() + signal.signal(signal.SIGINT, _handle_sigint) + _run(args) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 1b1435a93b1..ed950fcad63 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -71,6 +71,10 @@ argp.add_argument( 'Continue with test even when an error occurs during setup. Intended for ' 'manual testing, where attempts to recreate any GCP resources already ' 'existing will result in an error') +argp.add_argument('--service_port', + default=55551, + type=int, + help='The port on which the test server will listen.') argp.add_argument('--verbose', help='verbose log output', default=False, @@ -97,7 +101,7 @@ TARGET_PROXY_NAME = 'test-target-proxy' + args.gcp_suffix FORWARDING_RULE_NAME = 'test-forwarding-rule' + args.gcp_suffix KEEP_GCP_RESOURCES = args.keep_gcp_resources TOLERATE_GCP_ERRORS = args.tolerate_gcp_errors -SERVICE_PORT = 55551 +SERVICE_PORT = args.service_port STATS_PORT = 55552 INSTANCE_GROUP_SIZE = 2 WAIT_FOR_OPERATION_SEC = 60 From 7a129dac1c95e60838ae1d35cb58abf49d0755d3 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Fri, 13 Mar 2020 20:29:08 +0000 Subject: [PATCH 2/9] Don't let threads die over timed out RPCs --- .../tests/interop/xds_interop_client.py | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/python/grpcio_tests/tests/interop/xds_interop_client.py b/src/python/grpcio_tests/tests/interop/xds_interop_client.py index 3a79ef0f32a..8c82091a0c8 100644 --- a/src/python/grpcio_tests/tests/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests/interop/xds_interop_client.py @@ -126,23 +126,28 @@ def _run_single_channel(args: argparse.Namespace): sys.stdout.flush() start = time.time() end = start + duration_per_query - call, _ = stub.UnaryCall.with_call(messages_pb2.SimpleRequest(), - timeout=float( - args.rpc_timeout_sec)) - print(f"Got result {request_id}") - sys.stdout.flush() - with _global_lock: - for watcher in _watchers: - # TODO: Implement a peer details getter. - peer = f"192.168.1.{request_id % 255}" - watcher.on_rpc_complete(request_id, peer) - if args.print_response: - if call.code() == grpc.StatusCode.OK: - print("Successful response.") - sys.stdout.flush() + try: + response, call = stub.UnaryCall.with_call(messages_pb2.SimpleRequest(), + timeout=float( + args.rpc_timeout_sec)) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + print(f"RPC timed out after {args.rpc_timeout_sec}") else: - print(f"RPC failed: {call}") - sys.stdout.flush() + raise + else: + print(f"Got result {request_id}") + sys.stdout.flush() + with _global_lock: + for watcher in _watchers: + watcher.on_rpc_complete(request_id, response.hostname) + if args.print_response: + if call.code() == grpc.StatusCode.OK: + print("Successful response.") + sys.stdout.flush() + else: + print(f"RPC failed: {call}") + sys.stdout.flush() now = time.time() while now < end: time.sleep(end - now) From f26f80d5324c334ead720e9a47b2aeba7ba5fcd8 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Sat, 14 Mar 2020 02:33:52 +0000 Subject: [PATCH 3/9] Log TID --- src/python/grpcio_tests/tests/interop/xds_interop_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio_tests/tests/interop/xds_interop_client.py b/src/python/grpcio_tests/tests/interop/xds_interop_client.py index 8c82091a0c8..1f010762508 100644 --- a/src/python/grpcio_tests/tests/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests/interop/xds_interop_client.py @@ -122,7 +122,7 @@ def _run_single_channel(args: argparse.Namespace): with _global_lock: request_id = _global_rpc_id _global_rpc_id += 1 - print(f"Sending request to backend: {request_id}") + print(f"[{threading.get_ident()}] Sending request to backend: {request_id}") sys.stdout.flush() start = time.time() end = start + duration_per_query From 34e320a439bf75b51b029cc4ed506397463b81df Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 17 Mar 2020 19:13:10 +0000 Subject: [PATCH 4/9] Working client --- .../tests/interop/xds_interop_client.py | 109 +++++++++++++----- 1 file changed, 81 insertions(+), 28 deletions(-) diff --git a/src/python/grpcio_tests/tests/interop/xds_interop_client.py b/src/python/grpcio_tests/tests/interop/xds_interop_client.py index 1f010762508..8d9709f10c3 100644 --- a/src/python/grpcio_tests/tests/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests/interop/xds_interop_client.py @@ -13,12 +13,13 @@ # limitations under the License. import argparse +import logging import signal import threading import time import sys -from typing import DefaultDict, List, Set +from typing import DefaultDict, Dict, List, Mapping, Set import collections from concurrent import futures @@ -30,6 +31,16 @@ from src.proto.grpc.testing import test_pb2_grpc from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import empty_pb2 +logger = logging.getLogger() +console_handler = logging.StreamHandler() +formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + +# TODO: Make this logfile configurable. +file_handler = logging.FileHandler('/tmp/python_xds_interop_client.log', mode='a') +file_handler.setFormatter(formatter) +logger.addHandler(file_handler) # TODO: Back with a LoadBalancerStatsResponse proto? class _StatsWatcher: @@ -64,13 +75,17 @@ class _StatsWatcher: def await_rpc_stats_response(self, timeout_sec: int ) -> messages_pb2.LoadBalancerStatsResponse: """Blocks until a full response has been collected.""" + logger.info("Awaiting RPC stats response") with self._lock: + logger.debug(f"Waiting for {timeout_sec} on condition variable.") self._condition.wait_for(lambda: not self._rpcs_needed, timeout=float(timeout_sec)) + logger.debug(f"Waited for {timeout_sec} on condition variable.") response = messages_pb2.LoadBalancerStatsResponse() for peer, count in self._rpcs_by_peer.items(): response.rpcs_by_peer[peer] = count response.num_failures = self._no_remote_peer + self._rpcs_needed + logger.info("Finished awaiting rpc stats response") return response @@ -95,8 +110,7 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer def GetClientStats(self, request: messages_pb2.LoadBalancerStatsRequest, context: grpc.ServicerContext ) -> messages_pb2.LoadBalancerStatsResponse: - print("Received stats request.") - sys.stdout.flush() + logger.info("Received stats request.") start = None end = None watcher = None @@ -108,8 +122,62 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer response = watcher.await_rpc_stats_response(request.timeout_sec) with _global_lock: _watchers.remove(watcher) + logger.info("Returning stats response: {}".format(response)) return response +def _start_rpc(request_id: int, + stub: test_pb2_grpc.TestServiceStub, + timeout: float, + futures: Mapping[int, grpc.Future]) -> None: + logger.info(f"[{threading.get_ident()}] Sending request to backend: {request_id}") + future = stub.UnaryCall.future(messages_pb2.SimpleRequest(), + timeout=timeout) + futures[request_id] = future + + +def _on_rpc_done(rpc_id: int, + future: grpc.Future, + print_response: bool) -> None: + exception = future.exception() + hostname = "" + if exception is not None: + if exception.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + logger.error(f"RPC {rpc_id} timed out") + else: + logger.error(exception) + else: + response = future.result() + logger.info(f"Got result {rpc_id}") + hostname = response.hostname + if print_response: + if future.code() == grpc.StatusCode.OK: + logger.info("Successful response.") + else: + logger.info(f"RPC failed: {call}") + with _global_lock: + for watcher in _watchers: + watcher.on_rpc_complete(rpc_id, hostname) + +def _remove_completed_rpcs(futures: Mapping[int, grpc.Future], + print_response: bool) -> None: + logger.debug("Removing completed RPCs") + done = [] + for future_id, future in futures.items(): + if future.done(): + logger.debug("Calling _on_rpc_done") + _on_rpc_done(future_id, future, args.print_response) + logger.debug("Called _on_rpc_done") + done.append(future_id) + for rpc_id in done: + del futures[rpc_id] + logger.debug("Removed completed RPCs") + + +def _cancel_all_rpcs(futures: Mapping[int, grpc.Future]) -> None: + logger.info("Cancelling all remaining RPCs") + for future in futures.values(): + future.cancel() + # TODO: Accept finer-grained arguments. def _run_single_channel(args: argparse.Namespace): @@ -117,45 +185,28 @@ def _run_single_channel(args: argparse.Namespace): duration_per_query = 1.0 / float(args.qps) with grpc.insecure_channel(args.server) as channel: stub = test_pb2_grpc.TestServiceStub(channel) + futures: Dict[int, grpc.Future] = {} while not _stop_event.is_set(): request_id = None with _global_lock: request_id = _global_rpc_id _global_rpc_id += 1 - print(f"[{threading.get_ident()}] Sending request to backend: {request_id}") - sys.stdout.flush() start = time.time() end = start + duration_per_query - try: - response, call = stub.UnaryCall.with_call(messages_pb2.SimpleRequest(), - timeout=float( - args.rpc_timeout_sec)) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: - print(f"RPC timed out after {args.rpc_timeout_sec}") - else: - raise - else: - print(f"Got result {request_id}") - sys.stdout.flush() - with _global_lock: - for watcher in _watchers: - watcher.on_rpc_complete(request_id, response.hostname) - if args.print_response: - if call.code() == grpc.StatusCode.OK: - print("Successful response.") - sys.stdout.flush() - else: - print(f"RPC failed: {call}") - sys.stdout.flush() + _start_rpc(request_id, stub, float(args.rpc_timeout_sec), futures) + # TODO: Complete RPCs more frequently than 1 / QPS? + _remove_completed_rpcs(futures, args.print_response) + logger.debug(f"Currently {len(futures)} in-flight RPCs") now = time.time() while now < end: time.sleep(end - now) now = time.time() + _cancel_all_rpcs(futures) # TODO: Accept finer-grained arguments. def _run(args: argparse.Namespace) -> None: + logger.info("Starting python xDS Interop Client.") global _global_server # pylint: disable=global-statement channel_threads: List[threading.Thread] = [] for i in range(args.num_channels): @@ -190,7 +241,7 @@ if __name__ == "__main__": type=int, help="The number of queries to send from each channel per second.") parser.add_argument("--rpc_timeout_sec", - default=10, + default=30, type=int, help="The per-RPC timeout in seconds.") parser.add_argument("--server", @@ -203,4 +254,6 @@ if __name__ == "__main__": help="The port on which to expose the peer distribution stats service.") args = parser.parse_args() signal.signal(signal.SIGINT, _handle_sigint) + logger.setLevel(logging.DEBUG) + # logging.basicConfig(level=logging.INFO, stream=sys.stderr) _run(args) From 7fd0c8fc1a426c56be5ebe2d7c0fa30a27f056b6 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 17 Mar 2020 20:34:25 +0000 Subject: [PATCH 5/9] Clean up client --- .../tests/interop/xds_interop_client.py | 55 +++++++++---------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/src/python/grpcio_tests/tests/interop/xds_interop_client.py b/src/python/grpcio_tests/tests/interop/xds_interop_client.py index 8d9709f10c3..bd13d9aeca6 100644 --- a/src/python/grpcio_tests/tests/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests/interop/xds_interop_client.py @@ -37,12 +37,7 @@ formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) -# TODO: Make this logfile configurable. -file_handler = logging.FileHandler('/tmp/python_xds_interop_client.log', mode='a') -file_handler.setFormatter(formatter) -logger.addHandler(file_handler) -# TODO: Back with a LoadBalancerStatsResponse proto? class _StatsWatcher: _start: int _end: int @@ -57,14 +52,13 @@ class _StatsWatcher: self._end = end self._rpcs_needed = end - start self._rpcs_by_peer = collections.defaultdict(int) - self._lock = threading.Lock() - self._condition = threading.Condition(self._lock) + self._condition = threading.Condition() self._no_remote_peer = 0 def on_rpc_complete(self, request_id: int, peer: str) -> None: """Records statistics for a single RPC.""" if self._start <= request_id < self._end: - with self._lock: + with self._condition: if not peer: self._no_remote_peer += 1 else: @@ -75,17 +69,13 @@ class _StatsWatcher: def await_rpc_stats_response(self, timeout_sec: int ) -> messages_pb2.LoadBalancerStatsResponse: """Blocks until a full response has been collected.""" - logger.info("Awaiting RPC stats response") - with self._lock: - logger.debug(f"Waiting for {timeout_sec} on condition variable.") + with self._condition: self._condition.wait_for(lambda: not self._rpcs_needed, timeout=float(timeout_sec)) - logger.debug(f"Waited for {timeout_sec} on condition variable.") response = messages_pb2.LoadBalancerStatsResponse() for peer, count in self._rpcs_by_peer.items(): response.rpcs_by_peer[peer] = count response.num_failures = self._no_remote_peer + self._rpcs_needed - logger.info("Finished awaiting rpc stats response") return response @@ -125,18 +115,16 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer logger.info("Returning stats response: {}".format(response)) return response -def _start_rpc(request_id: int, - stub: test_pb2_grpc.TestServiceStub, - timeout: float, - futures: Mapping[int, grpc.Future]) -> None: - logger.info(f"[{threading.get_ident()}] Sending request to backend: {request_id}") + +def _start_rpc(request_id: int, stub: test_pb2_grpc.TestServiceStub, + timeout: float, futures: Mapping[int, grpc.Future]) -> None: + logger.info(f"Sending request to backend: {request_id}") future = stub.UnaryCall.future(messages_pb2.SimpleRequest(), - timeout=timeout) + timeout=timeout) futures[request_id] = future -def _on_rpc_done(rpc_id: int, - future: grpc.Future, +def _on_rpc_done(rpc_id: int, future: grpc.Future, print_response: bool) -> None: exception = future.exception() hostname = "" @@ -158,19 +146,17 @@ def _on_rpc_done(rpc_id: int, for watcher in _watchers: watcher.on_rpc_complete(rpc_id, hostname) + def _remove_completed_rpcs(futures: Mapping[int, grpc.Future], - print_response: bool) -> None: + print_response: bool) -> None: logger.debug("Removing completed RPCs") done = [] for future_id, future in futures.items(): if future.done(): - logger.debug("Calling _on_rpc_done") _on_rpc_done(future_id, future, args.print_response) - logger.debug("Called _on_rpc_done") done.append(future_id) for rpc_id in done: del futures[rpc_id] - logger.debug("Removed completed RPCs") def _cancel_all_rpcs(futures: Mapping[int, grpc.Future]) -> None: @@ -179,7 +165,6 @@ def _cancel_all_rpcs(futures: Mapping[int, grpc.Future]) -> None: future.cancel() -# TODO: Accept finer-grained arguments. def _run_single_channel(args: argparse.Namespace): global _global_rpc_id # pylint: disable=global-statement duration_per_query = 1.0 / float(args.qps) @@ -194,7 +179,6 @@ def _run_single_channel(args: argparse.Namespace): start = time.time() end = start + duration_per_query _start_rpc(request_id, stub, float(args.rpc_timeout_sec), futures) - # TODO: Complete RPCs more frequently than 1 / QPS? _remove_completed_rpcs(futures, args.print_response) logger.debug(f"Currently {len(futures)} in-flight RPCs") now = time.time() @@ -204,7 +188,6 @@ def _run_single_channel(args: argparse.Namespace): _cancel_all_rpcs(futures) -# TODO: Accept finer-grained arguments. def _run(args: argparse.Namespace) -> None: logger.info("Starting python xDS Interop Client.") global _global_server # pylint: disable=global-statement @@ -252,8 +235,20 @@ if __name__ == "__main__": default=50052, type=int, help="The port on which to expose the peer distribution stats service.") + parser.add_argument('--verbose', + help='verbose log output', + default=False, + action='store_true') + parser.add_argument("--log_file", + default=None, + type=str, + help="A file to log to.") args = parser.parse_args() signal.signal(signal.SIGINT, _handle_sigint) - logger.setLevel(logging.DEBUG) - # logging.basicConfig(level=logging.INFO, stream=sys.stderr) + if args.verbose: + logger.setLevel(logging.DEBUG) + if args.log_file: + file_handler = logging.FileHandler(args.log_file, mode='a') + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) _run(args) From e88f582cb843cb5d6d4c08061ac98aeaccaa5d52 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 17 Mar 2020 21:10:41 +0000 Subject: [PATCH 6/9] Add Python interop client to CI --- .../linux/grpc_xds_bazel_test_in_docker.sh | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh index 17380860f21..e94cc12fab5 100755 --- a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh @@ -45,12 +45,24 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py "$PROTO_SOURCE_DIR"/messages.proto \ "$PROTO_SOURCE_DIR"/empty.proto -bazel build test/cpp/interop:xds_interop_client +export GRPC_VERBOSITY=debug +export GRPC_TRACE=xds_client,xds_resolver,cds_lb,xds_lb -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,xds_lb "$PYTHON" \ +bazel build test/cpp/interop:xds_interop_client +"$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case=all \ --project_id=grpc-testing \ --gcp_suffix=$(date '+%s') \ --verbose \ --client_cmd='bazel-bin/test/cpp/interop/xds_interop_client --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps}' + + +bazel build //src/python/grpcio_tests/tests/interop:xds_interop_client +"$PYTHON" \ + tools/run_tests/run_xds_tests.py \ + --test_case=all \ + --project_id=grpc-testing \ + --gcp_suffix=$(date '+%s') \ + --verbose \ + --client_cmd='bazel run //src/python/grpcio_tests/tests/interop:xds_interop_client -- --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps} --verbose' From 7ec0eda19161b05c93bf406274fb48334408a1e0 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 17 Mar 2020 23:08:57 +0000 Subject: [PATCH 7/9] Split xDS tests up into separate kokoro jobs --- ...h => grpc_xds_bazel_cpp_test_in_docker.sh} | 16 +----- .../grpc_xds_bazel_python_test_in_docker.sh | 56 +++++++++++++++++++ .../linux/{grpc_xds.cfg => grpc_xds_cpp.cfg} | 2 +- tools/internal_ci/linux/grpc_xds_python.cfg | 23 ++++++++ 4 files changed, 82 insertions(+), 15 deletions(-) rename tools/internal_ci/linux/{grpc_xds_bazel_test_in_docker.sh => grpc_xds_bazel_cpp_test_in_docker.sh} (80%) create mode 100755 tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh rename tools/internal_ci/linux/{grpc_xds.cfg => grpc_xds_cpp.cfg} (91%) create mode 100644 tools/internal_ci/linux/grpc_xds_python.cfg diff --git a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_cpp_test_in_docker.sh similarity index 80% rename from tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh rename to tools/internal_ci/linux/grpc_xds_bazel_cpp_test_in_docker.sh index e94cc12fab5..17380860f21 100755 --- a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_bazel_cpp_test_in_docker.sh @@ -45,24 +45,12 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py "$PROTO_SOURCE_DIR"/messages.proto \ "$PROTO_SOURCE_DIR"/empty.proto -export GRPC_VERBOSITY=debug -export GRPC_TRACE=xds_client,xds_resolver,cds_lb,xds_lb - bazel build test/cpp/interop:xds_interop_client -"$PYTHON" \ - tools/run_tests/run_xds_tests.py \ - --test_case=all \ - --project_id=grpc-testing \ - --gcp_suffix=$(date '+%s') \ - --verbose \ - --client_cmd='bazel-bin/test/cpp/interop/xds_interop_client --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps}' - -bazel build //src/python/grpcio_tests/tests/interop:xds_interop_client -"$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,xds_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case=all \ --project_id=grpc-testing \ --gcp_suffix=$(date '+%s') \ --verbose \ - --client_cmd='bazel run //src/python/grpcio_tests/tests/interop:xds_interop_client -- --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps} --verbose' + --client_cmd='bazel-bin/test/cpp/interop/xds_interop_client --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps}' diff --git a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh new file mode 100755 index 00000000000..1eecd28015c --- /dev/null +++ b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# Copyright 2020 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex -o igncr || set -ex + +mkdir -p /var/local/git +git clone /var/local/jenkins/grpc /var/local/git/grpc +(cd /var/local/jenkins/grpc/ && git submodule foreach 'cd /var/local/git/grpc \ +&& git submodule update --init --reference /var/local/jenkins/grpc/${name} \ +${name}') +cd /var/local/git/grpc + +VIRTUAL_ENV=$(mktemp -d) +virtualenv "$VIRTUAL_ENV" +PYTHON="$VIRTUAL_ENV"/bin/python +"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client + +# Prepare generated Python code. +TOOLS_DIR=tools/run_tests +PROTO_SOURCE_DIR=src/proto/grpc/testing +PROTO_DEST_DIR="$TOOLS_DIR"/"$PROTO_SOURCE_DIR" +mkdir -p "$PROTO_DEST_DIR" +touch "$TOOLS_DIR"/src/__init__.py +touch "$TOOLS_DIR"/src/proto/__init__.py +touch "$TOOLS_DIR"/src/proto/grpc/__init__.py +touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py + +"$PYTHON" -m grpc_tools.protoc \ + --proto_path=. \ + --python_out="$TOOLS_DIR" \ + --grpc_python_out="$TOOLS_DIR" \ + "$PROTO_SOURCE_DIR"/test.proto \ + "$PROTO_SOURCE_DIR"/messages.proto \ + "$PROTO_SOURCE_DIR"/empty.proto + +bazel build //src/python/grpcio_tests/tests/interop:xds_interop_client + +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,xds_lb "$PYTHON" \ + tools/run_tests/run_xds_tests.py \ + --test_case=all \ + --project_id=grpc-testing \ + --gcp_suffix=$(date '+%s') \ + --verbose \ + --client_cmd='bazel run //src/python/grpcio_tests/tests/interop:xds_interop_client -- --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps} --verbose' diff --git a/tools/internal_ci/linux/grpc_xds.cfg b/tools/internal_ci/linux/grpc_xds_cpp.cfg similarity index 91% rename from tools/internal_ci/linux/grpc_xds.cfg rename to tools/internal_ci/linux/grpc_xds_cpp.cfg index 888be05cde5..89e42a45084 100644 --- a/tools/internal_ci/linux/grpc_xds.cfg +++ b/tools/internal_ci/linux/grpc_xds_cpp.cfg @@ -19,5 +19,5 @@ build_file: "grpc/tools/internal_ci/linux/grpc_bazel.sh" timeout_mins: 90 env_vars { key: "BAZEL_SCRIPT" - value: "tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh" + value: "tools/internal_ci/linux/grpc_xds_cpp_bazel_test_in_docker.sh" } diff --git a/tools/internal_ci/linux/grpc_xds_python.cfg b/tools/internal_ci/linux/grpc_xds_python.cfg new file mode 100644 index 00000000000..cf6f3d5757b --- /dev/null +++ b/tools/internal_ci/linux/grpc_xds_python.cfg @@ -0,0 +1,23 @@ +# Copyright 2020 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/linux/grpc_bazel.sh" +timeout_mins: 90 +env_vars { + key: "BAZEL_SCRIPT" + value: "tools/internal_ci/linux/grpc_xds_python_bazel_test_in_docker.sh" +} From bbecc319cc91b0bc4f994bcc7f1cb71103074224 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 17 Mar 2020 23:25:28 +0000 Subject: [PATCH 8/9] Unrename cpp tests --- tools/internal_ci/linux/{grpc_xds_cpp.cfg => grpc_xds.cfg} | 2 +- ...l_cpp_test_in_docker.sh => grpc_xds_bazel_test_in_docker.sh} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename tools/internal_ci/linux/{grpc_xds_cpp.cfg => grpc_xds.cfg} (91%) rename tools/internal_ci/linux/{grpc_xds_bazel_cpp_test_in_docker.sh => grpc_xds_bazel_test_in_docker.sh} (100%) diff --git a/tools/internal_ci/linux/grpc_xds_cpp.cfg b/tools/internal_ci/linux/grpc_xds.cfg similarity index 91% rename from tools/internal_ci/linux/grpc_xds_cpp.cfg rename to tools/internal_ci/linux/grpc_xds.cfg index 89e42a45084..888be05cde5 100644 --- a/tools/internal_ci/linux/grpc_xds_cpp.cfg +++ b/tools/internal_ci/linux/grpc_xds.cfg @@ -19,5 +19,5 @@ build_file: "grpc/tools/internal_ci/linux/grpc_bazel.sh" timeout_mins: 90 env_vars { key: "BAZEL_SCRIPT" - value: "tools/internal_ci/linux/grpc_xds_cpp_bazel_test_in_docker.sh" + value: "tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh" } diff --git a/tools/internal_ci/linux/grpc_xds_bazel_cpp_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh similarity index 100% rename from tools/internal_ci/linux/grpc_xds_bazel_cpp_test_in_docker.sh rename to tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh From a82f8f3dd79025913e53de5e9fb544f8093399a0 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 18 Mar 2020 17:15:09 +0000 Subject: [PATCH 9/9] Ensure unit tests don't pick up interop client --- src/python/grpcio_tests/tests/interop/BUILD.bazel | 13 ------------- .../grpcio_tests/tests_py3_only/interop/BUILD.bazel | 12 ++++++++++++ .../interop/xds_interop_client.py | 0 3 files changed, 12 insertions(+), 13 deletions(-) create mode 100644 src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel rename src/python/grpcio_tests/{tests => tests_py3_only}/interop/xds_interop_client.py (100%) diff --git a/src/python/grpcio_tests/tests/interop/BUILD.bazel b/src/python/grpcio_tests/tests/interop/BUILD.bazel index cd47bee2172..4685852162b 100644 --- a/src/python/grpcio_tests/tests/interop/BUILD.bazel +++ b/src/python/grpcio_tests/tests/interop/BUILD.bazel @@ -115,16 +115,3 @@ py2and3_test( "//src/python/grpcio_tests/tests/unit:test_common", ], ) - -py_binary( - name = "xds_interop_client", - srcs = ["xds_interop_client.py"], - python_version = "PY3", - deps = [ - "//src/proto/grpc/testing:empty_py_pb2", - "//src/proto/grpc/testing:py_messages_proto", - "//src/proto/grpc/testing:py_test_proto", - "//src/proto/grpc/testing:test_py_pb2_grpc", - "//src/python/grpcio/grpc:grpcio", - ], -) diff --git a/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel b/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel new file mode 100644 index 00000000000..21d207119d0 --- /dev/null +++ b/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel @@ -0,0 +1,12 @@ +py_binary( + name = "xds_interop_client", + srcs = ["xds_interop_client.py"], + python_version = "PY3", + deps = [ + "//src/proto/grpc/testing:empty_py_pb2", + "//src/proto/grpc/testing:py_messages_proto", + "//src/proto/grpc/testing:py_test_proto", + "//src/proto/grpc/testing:test_py_pb2_grpc", + "//src/python/grpcio/grpc:grpcio", + ], +) diff --git a/src/python/grpcio_tests/tests/interop/xds_interop_client.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py similarity index 100% rename from src/python/grpcio_tests/tests/interop/xds_interop_client.py rename to src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py