diff --git a/bazel/cython_library.bzl b/bazel/cython_library.bzl index 3f34e185380..c9f864231c1 100644 --- a/bazel/cython_library.bzl +++ b/bazel/cython_library.bzl @@ -63,12 +63,15 @@ def pyx_library(name, deps = [], py_deps = [], srcs = [], **kwargs): ) shared_objects.append(shared_object_name) + data = shared_objects[:] + data += kwargs.pop("data", []) + # Now create a py_library with these shared objects as data. native.py_library( name = name, srcs = py_srcs, deps = py_deps, srcs_version = "PY2AND3", - data = shared_objects, + data = data, **kwargs ) diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index c7ae040a804..646d31068ca 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -2,6 +2,13 @@ package(default_visibility = ["//visibility:public"]) load("//bazel:cython_library.bzl", "pyx_library") +genrule( + name = "copy_roots_pem", + srcs = ["//:etc/roots.pem"], + outs = ["_credentials/roots.pem"], + cmd = "cp $(SRCS) $(@)", +) + pyx_library( name = "cygrpc", srcs = glob([ @@ -9,6 +16,7 @@ pyx_library( "cygrpc.pxd", "cygrpc.pyx", ]), + data = [":copy_roots_pem"], deps = [ "//:grpc", ], diff --git a/src/python/grpcio_tests/tests/interop/BUILD.bazel b/src/python/grpcio_tests/tests/interop/BUILD.bazel index 4685852162b..cd47bee2172 100644 --- a/src/python/grpcio_tests/tests/interop/BUILD.bazel +++ b/src/python/grpcio_tests/tests/interop/BUILD.bazel @@ -115,3 +115,16 @@ py2and3_test( "//src/python/grpcio_tests/tests/unit:test_common", ], ) + +py_binary( + name = "xds_interop_client", + srcs = ["xds_interop_client.py"], + python_version = "PY3", + deps = [ + "//src/proto/grpc/testing:empty_py_pb2", + "//src/proto/grpc/testing:py_messages_proto", + "//src/proto/grpc/testing:py_test_proto", + "//src/proto/grpc/testing:test_py_pb2_grpc", + "//src/python/grpcio/grpc:grpcio", + ], +) diff --git a/src/python/grpcio_tests/tests/interop/xds_interop_client.py b/src/python/grpcio_tests/tests/interop/xds_interop_client.py new file mode 100644 index 00000000000..3a79ef0f32a --- /dev/null +++ b/src/python/grpcio_tests/tests/interop/xds_interop_client.py @@ -0,0 +1,201 @@ +# Copyright 2020 The gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import signal +import threading +import time +import sys + +from typing import DefaultDict, List, Set +import collections + +from concurrent import futures + +import grpc + +from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import empty_pb2 + + +# TODO: Back with a LoadBalancerStatsResponse proto? +class _StatsWatcher: + _start: int + _end: int + _rpcs_needed: int + _rpcs_by_peer: DefaultDict[str, int] + _no_remote_peer: int + _lock: threading.Lock + _condition: threading.Condition + + def __init__(self, start: int, end: int): + self._start = start + self._end = end + self._rpcs_needed = end - start + self._rpcs_by_peer = collections.defaultdict(int) + self._lock = threading.Lock() + self._condition = threading.Condition(self._lock) + self._no_remote_peer = 0 + + def on_rpc_complete(self, request_id: int, peer: str) -> None: + """Records statistics for a single RPC.""" + if self._start <= request_id < self._end: + with self._lock: + if not peer: + self._no_remote_peer += 1 + else: + self._rpcs_by_peer[peer] += 1 + self._rpcs_needed -= 1 + self._condition.notify() + + def await_rpc_stats_response(self, timeout_sec: int + ) -> messages_pb2.LoadBalancerStatsResponse: + """Blocks until a full response has been collected.""" + with self._lock: + self._condition.wait_for(lambda: not self._rpcs_needed, + timeout=float(timeout_sec)) + response = messages_pb2.LoadBalancerStatsResponse() + for peer, count in self._rpcs_by_peer.items(): + response.rpcs_by_peer[peer] = count + response.num_failures = self._no_remote_peer + self._rpcs_needed + return response + + +_global_lock = threading.Lock() +_stop_event = threading.Event() +_global_rpc_id: int = 0 +_watchers: Set[_StatsWatcher] = set() +_global_server = None + + +def _handle_sigint(sig, frame): + _stop_event.set() + _global_server.stop(None) + + +class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer + ): + + def __init__(self): + super(_LoadBalancerStatsServicer).__init__() + + def GetClientStats(self, request: messages_pb2.LoadBalancerStatsRequest, + context: grpc.ServicerContext + ) -> messages_pb2.LoadBalancerStatsResponse: + print("Received stats request.") + sys.stdout.flush() + start = None + end = None + watcher = None + with _global_lock: + start = _global_rpc_id + 1 + end = start + request.num_rpcs + watcher = _StatsWatcher(start, end) + _watchers.add(watcher) + response = watcher.await_rpc_stats_response(request.timeout_sec) + with _global_lock: + _watchers.remove(watcher) + return response + + +# TODO: Accept finer-grained arguments. +def _run_single_channel(args: argparse.Namespace): + global _global_rpc_id # pylint: disable=global-statement + duration_per_query = 1.0 / float(args.qps) + with grpc.insecure_channel(args.server) as channel: + stub = test_pb2_grpc.TestServiceStub(channel) + while not _stop_event.is_set(): + request_id = None + with _global_lock: + request_id = _global_rpc_id + _global_rpc_id += 1 + print(f"Sending request to backend: {request_id}") + sys.stdout.flush() + start = time.time() + end = start + duration_per_query + call, _ = stub.UnaryCall.with_call(messages_pb2.SimpleRequest(), + timeout=float( + args.rpc_timeout_sec)) + print(f"Got result {request_id}") + sys.stdout.flush() + with _global_lock: + for watcher in _watchers: + # TODO: Implement a peer details getter. + peer = f"192.168.1.{request_id % 255}" + watcher.on_rpc_complete(request_id, peer) + if args.print_response: + if call.code() == grpc.StatusCode.OK: + print("Successful response.") + sys.stdout.flush() + else: + print(f"RPC failed: {call}") + sys.stdout.flush() + now = time.time() + while now < end: + time.sleep(end - now) + now = time.time() + + +# TODO: Accept finer-grained arguments. +def _run(args: argparse.Namespace) -> None: + global _global_server # pylint: disable=global-statement + channel_threads: List[threading.Thread] = [] + for i in range(args.num_channels): + thread = threading.Thread(target=_run_single_channel, args=(args,)) + thread.start() + channel_threads.append(thread) + _global_server = grpc.server(futures.ThreadPoolExecutor()) + _global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}") + test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server( + _LoadBalancerStatsServicer(), _global_server) + _global_server.start() + _global_server.wait_for_termination() + for i in range(args.num_channels): + thread.join() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description='Run Python XDS interop client.') + parser.add_argument( + "--num_channels", + default=1, + type=int, + help="The number of channels from which to send requests.") + parser.add_argument("--print_response", + default=False, + action="store_true", + help="Write RPC response to STDOUT.") + parser.add_argument( + "--qps", + default=1, + type=int, + help="The number of queries to send from each channel per second.") + parser.add_argument("--rpc_timeout_sec", + default=10, + type=int, + help="The per-RPC timeout in seconds.") + parser.add_argument("--server", + default="localhost:50051", + help="The address of the server.") + parser.add_argument( + "--stats_port", + default=50052, + type=int, + help="The port on which to expose the peer distribution stats service.") + args = parser.parse_args() + signal.signal(signal.SIGINT, _handle_sigint) + _run(args) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 1b1435a93b1..ed950fcad63 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -71,6 +71,10 @@ argp.add_argument( 'Continue with test even when an error occurs during setup. Intended for ' 'manual testing, where attempts to recreate any GCP resources already ' 'existing will result in an error') +argp.add_argument('--service_port', + default=55551, + type=int, + help='The port on which the test server will listen.') argp.add_argument('--verbose', help='verbose log output', default=False, @@ -97,7 +101,7 @@ TARGET_PROXY_NAME = 'test-target-proxy' + args.gcp_suffix FORWARDING_RULE_NAME = 'test-forwarding-rule' + args.gcp_suffix KEEP_GCP_RESOURCES = args.keep_gcp_resources TOLERATE_GCP_ERRORS = args.tolerate_gcp_errors -SERVICE_PORT = 55551 +SERVICE_PORT = args.service_port STATS_PORT = 55552 INSTANCE_GROUP_SIZE = 2 WAIT_FOR_OPERATION_SEC = 60