diff --git a/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel b/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel index 21d207119d0..506d744400f 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel +++ b/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel @@ -8,5 +8,22 @@ py_binary( "//src/proto/grpc/testing:py_test_proto", "//src/proto/grpc/testing:test_py_pb2_grpc", "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + ], +) + +py_binary( + name = "xds_interop_server", + srcs = ["xds_interop_server.py"], + python_version = "PY3", + deps = [ + "//src/proto/grpc/testing:empty_py_pb2", + "//src/proto/grpc/testing:py_messages_proto", + "//src/proto/grpc/testing:py_test_proto", + "//src/proto/grpc/testing:test_py_pb2_grpc", + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + "//src/python/grpcio_health_checking/grpc_health/v1:grpc_health", + "//src/python/grpcio_reflection/grpc_reflection/v1alpha:grpc_reflection", ], ) diff --git a/src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.client b/src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.client new file mode 100644 index 00000000000..e8184e7bf49 --- /dev/null +++ b/src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.client @@ -0,0 +1,25 @@ +FROM phusion/baseimage:master@sha256:65ea10d5f757e5e86272625f8675d437dd83d8db64bdb429e2354d58f5462750 + +RUN apt-get update -y && \ + apt-get install -y \ + build-essential \ + clang \ + python3 \ + python3-dev + +WORKDIR /workdir + +RUN ln -s /usr/bin/python3 /usr/bin/python +RUN mkdir /artifacts + +COPY . . +RUN tools/bazel build -c dbg //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client +RUN cp -rL /workdir/bazel-bin/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client* /artifacts/ + +FROM phusion/baseimage:master@sha256:65ea10d5f757e5e86272625f8675d437dd83d8db64bdb429e2354d58f5462750 +COPY --from=0 /artifacts ./ + +RUN apt-get update -y && apt-get install -y python3 +RUN ln -s /usr/bin/python3 /usr/bin/python + +ENTRYPOINT ["/xds_interop_client"] diff --git a/src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.server b/src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.server new file mode 100644 index 00000000000..b7259bf995d --- /dev/null +++ b/src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.server @@ -0,0 +1,25 @@ +FROM phusion/baseimage:master@sha256:65ea10d5f757e5e86272625f8675d437dd83d8db64bdb429e2354d58f5462750 + +RUN apt-get update -y && \ + apt-get install -y \ + build-essential \ + clang \ + python3 \ + python3-dev + +WORKDIR /workdir + +RUN ln -s /usr/bin/python3 /usr/bin/python +RUN mkdir /artifacts + +COPY . . +RUN tools/bazel build -c dbg //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_server +RUN cp -rL /workdir/bazel-bin/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server* /artifacts/ + +FROM phusion/baseimage:master@sha256:65ea10d5f757e5e86272625f8675d437dd83d8db64bdb429e2354d58f5462750 +COPY --from=0 /artifacts ./ + +RUN apt-get update -y && apt-get install -y python3 +RUN ln -s /usr/bin/python3 /usr/bin/python + +ENTRYPOINT ["/xds_interop_server"] diff --git a/src/python/grpcio_tests/tests_py3_only/interop/build_client_image.sh b/src/python/grpcio_tests/tests_py3_only/interop/build_client_image.sh new file mode 100755 index 00000000000..83c2d6ec4a2 --- /dev/null +++ b/src/python/grpcio_tests/tests_py3_only/interop/build_client_image.sh @@ -0,0 +1,28 @@ +# Copyright 2021 The gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#!/bin/bash + +set -x + +VERSION=$(git rev-parse HEAD) + +PROJECT=grpc-testing +TAG=gcr.io/${PROJECT}/python_xds_interop_client:$VERSION + +cd "$(dirname "${BASH_SOURCE[0]}")/../../../../.." + +docker build \ + -t ${TAG} \ + -f src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.client \ + . diff --git a/src/python/grpcio_tests/tests_py3_only/interop/build_server_image.sh b/src/python/grpcio_tests/tests_py3_only/interop/build_server_image.sh new file mode 100755 index 00000000000..ad62ced6903 --- /dev/null +++ b/src/python/grpcio_tests/tests_py3_only/interop/build_server_image.sh @@ -0,0 +1,28 @@ +# Copyright 2021 The gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#!/bin/bash + +set -x + +VERSION=$(git rev-parse HEAD) + +PROJECT=grpc-testing +TAG=gcr.io/${PROJECT}/python_xds_interop_server:$VERSION + +cd "$(dirname "${BASH_SOURCE[0]}")/../../../../.." + +docker build \ + -t ${TAG} \ + -f src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.server \ + . diff --git a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py index fa953e5ac09..e71be526bd8 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py @@ -27,6 +27,7 @@ import collections from concurrent import futures import grpc +from grpc_channelz.v1 import channelz from src.proto.grpc.testing import test_pb2 from src.proto.grpc.testing import test_pb2_grpc @@ -257,9 +258,9 @@ class _ChannelConfiguration: When accessing any of its members, the lock member should be held. """ - def __init__(self, method: str, metadata: Sequence[Tuple[str, - str]], qps: int, - server: str, rpc_timeout_sec: int, print_response: bool): + def __init__(self, method: str, metadata: Sequence[Tuple[str, str]], + qps: int, server: str, rpc_timeout_sec: int, + print_response: bool, secure_mode: bool): # condition is signalled when a change is made to the config. self.condition = threading.Condition() @@ -269,13 +270,21 @@ class _ChannelConfiguration: self.server = server self.rpc_timeout_sec = rpc_timeout_sec self.print_response = print_response + self.secure_mode = secure_mode def _run_single_channel(config: _ChannelConfiguration) -> None: global _global_rpc_id # pylint: disable=global-statement with config.condition: server = config.server - with grpc.insecure_channel(server) as channel: + channel = None + if config.secure_mode: + fallback_creds = grpc.experimental.insecure_channel_credentials() + channel_creds = grpc.xds_channel_credentials(fallback_creds) + channel = grpc.secure_channel(server, channel_creds) + else: + channel = grpc.insecure_channel(server) + with channel: stub = test_pb2_grpc.TestServiceStub(channel) futures: Dict[int, Tuple[grpc.Future, str]] = {} while not _stop_event.is_set(): @@ -382,7 +391,7 @@ def _run(args: argparse.Namespace, methods: Sequence[str], qps = 0 channel_config = _ChannelConfiguration( method, per_method_metadata.get(method, []), qps, args.server, - args.rpc_timeout_sec, args.print_response) + args.rpc_timeout_sec, args.print_response, args.secure_mode) channel_configs[method] = channel_config method_handles.append(_MethodHandle(args.num_channels, channel_config)) _global_server = grpc.server(futures.ThreadPoolExecutor()) @@ -392,6 +401,7 @@ def _run(args: argparse.Namespace, methods: Sequence[str], test_pb2_grpc.add_XdsUpdateClientConfigureServiceServicer_to_server( _XdsUpdateClientConfigureServicer(channel_configs, args.qps), _global_server) + channelz.add_channelz_servicer(_global_server) _global_server.start() _global_server.wait_for_termination() for method_handle in method_handles: @@ -420,6 +430,15 @@ def parse_rpc_arg(rpc_arg: str) -> Sequence[str]: return methods +def bool_arg(arg: str) -> bool: + if arg.lower() in ("true", "yes", "y"): + return True + elif arg.lower() in ("false", "no", "n"): + return False + else: + raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.") + + if __name__ == "__main__": parser = argparse.ArgumentParser( description='Run Python XDS interop client.') @@ -429,8 +448,8 @@ if __name__ == "__main__": type=int, help="The number of channels from which to send requests.") parser.add_argument("--print_response", - default=False, - action="store_true", + default="False", + type=bool_arg, help="Write RPC response to STDOUT.") parser.add_argument( "--qps", @@ -449,6 +468,11 @@ if __name__ == "__main__": default=50052, type=int, help="The port on which to expose the peer distribution stats service.") + parser.add_argument( + "--secure_mode", + default="False", + type=bool_arg, + help="If specified, uses xDS credentials to connect to the server.") parser.add_argument('--verbose', help='verbose log output', default=False, diff --git a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server.py new file mode 100644 index 00000000000..b0901b2070a --- /dev/null +++ b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server.py @@ -0,0 +1,178 @@ +# Copyright 2021 The gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import signal +import threading +import time +import socket +import sys + +from typing import DefaultDict, Dict, List, Mapping, Set, Sequence, Tuple +import collections + +from concurrent import futures + +import grpc +from grpc_channelz.v1 import channelz +from grpc_channelz.v1 import channelz_pb2 +from grpc_health.v1 import health_pb2, health_pb2_grpc +from grpc_health.v1 import health as grpc_health +from grpc_reflection.v1alpha import reflection + +from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import empty_pb2 + +# NOTE: This interop server is not fully compatible with all xDS interop tests. +# It currently only implements enough functionality to pass the xDS security +# tests. + +_LISTEN_HOST = "[::]" + +_THREAD_POOL_SIZE = 256 + +logger = logging.getLogger() +console_handler = logging.StreamHandler() +formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + + +class TestService(test_pb2_grpc.TestServiceServicer): + + def __init__(self, server_id, hostname): + self._server_id = server_id + self._hostname = hostname + + def EmptyCall(self, _: empty_pb2.Empty, + context: grpc.ServicerContext) -> empty_pb2.Empty: + return empty_pb2.Empty() + + def UnaryCall(self, request: messages_pb2.SimpleRequest, + context: grpc.ServicerContext) -> messages_pb2.SimpleResponse: + response = messages_pb2.SimpleResponse() + response.server_id = self._server_id + response.hostname = self._hostname + return response + + +def _configure_maintenance_server(server: grpc.Server, + maintenance_port: int) -> None: + channelz.add_channelz_servicer(server) + listen_address = f"{_LISTEN_HOST}:{maintenance_port}" + server.add_insecure_port(listen_address) + health_servicer = grpc_health.HealthServicer( + experimental_non_blocking=True, + experimental_thread_pool=futures.ThreadPoolExecutor( + max_workers=_THREAD_POOL_SIZE)) + + health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server) + SERVICE_NAMES = ( + test_pb2.DESCRIPTOR.services_by_name["TestService"].full_name, + health_pb2.DESCRIPTOR.services_by_name["Health"].full_name, + channelz_pb2.DESCRIPTOR.services_by_name["Channelz"].full_name, + reflection.SERVICE_NAME, + ) + for service in SERVICE_NAMES: + health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING) + reflection.enable_server_reflection(SERVICE_NAMES, server) + + +def _configure_test_server(server: grpc.Server, port: int, secure_mode: bool, + server_id: str) -> None: + test_pb2_grpc.add_TestServiceServicer_to_server( + TestService(server_id, socket.gethostname()), server) + listen_address = f"{_LISTEN_HOST}:{port}" + if not secure_mode: + server.add_insecure_port(listen_address) + else: + logger.info("Running with xDS Server credentials") + server_fallback_creds = grpc.insecure_server_credentials() + server_creds = grpc.xds_server_credentials(server_fallback_creds) + server.add_secure_port(listen_address, server_creds) + + +def _run(port: int, maintenance_port: int, secure_mode: bool, + server_id: str) -> None: + if port == maintenance_port: + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE)) + _configure_test_server(server, port, secure_mode, server_id) + _configure_maintenance_server(server, maintenance_port) + server.start() + logger.info("Test server listening on port %d", port) + logger.info("Maintenance server listening on port %d", maintenance_port) + server.wait_for_termination() + else: + test_server = grpc.server( + futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE), + xds=secure_mode) + _configure_test_server(test_server, port, secure_mode, server_id) + test_server.start() + logger.info("Test server listening on port %d", port) + maintenance_server = grpc.server( + futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE)) + _configure_maintenance_server(maintenance_server, maintenance_port) + maintenance_server.start() + logger.info("Maintenance server listening on port %d", maintenance_port) + test_server.wait_for_termination() + maintenance_server.wait_for_termination() + + +def bool_arg(arg: str) -> bool: + if arg.lower() in ("true", "yes", "y"): + return True + elif arg.lower() in ("false", "no", "n"): + return False + else: + raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Run Python xDS interop server.") + parser.add_argument("--port", + type=int, + default=8080, + help="Port for test server.") + parser.add_argument("--maintenance_port", + type=int, + default=8080, + help="Port for servers besides test server.") + parser.add_argument( + "--secure_mode", + type=bool_arg, + default="False", + help="If specified, uses xDS to retrieve server credentials.") + parser.add_argument("--server_id", + type=str, + default="python_server", + help="The server ID to return in responses..") + parser.add_argument('--verbose', + help='verbose log output', + default=False, + action='store_true') + args = parser.parse_args() + if args.verbose: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + if args.secure_mode and args.port == args.maintenance_port: + raise ValueError( + "--port and --maintenance_port must not be the same when --secure_mode is set." + ) + _run(args.port, args.maintenance_port, args.secure_mode, args.server_id)