Python PSM Security Interop Client+Server (#25991)

* Add channelz to security client

* Make client changes. Some config needs to be reverted

* Add missing security field to channelz Socket

* Add test

* Fix test

* Remove whitespaces for windows

* Remove local_certificate check from PSM security tests

* Byte pack Python channelz IPs

* Move address packing to Core

* WIP

* Unbork security tests

* Python interop server

* Turn up server logging

* Clean up PR

* Clean up some more

* Yapf

* Fix up docker images

* Swap fillllllles!

* Add more robust boolean arg parsing

* Use bool_arg parsing in server as well

* Add copyright

Co-authored-by: Yash Tibrewal <yashkt@google.com>
pull/26073/head
Richard Belleville 4 years ago committed by GitHub
parent 2fc80c234b
commit 53c72c5936
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel
  2. 25
      src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.client
  3. 25
      src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.server
  4. 28
      src/python/grpcio_tests/tests_py3_only/interop/build_client_image.sh
  5. 28
      src/python/grpcio_tests/tests_py3_only/interop/build_server_image.sh
  6. 38
      src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py
  7. 178
      src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server.py

@ -8,5 +8,22 @@ py_binary(
"//src/proto/grpc/testing:py_test_proto",
"//src/proto/grpc/testing:test_py_pb2_grpc",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz",
],
)
py_binary(
name = "xds_interop_server",
srcs = ["xds_interop_server.py"],
python_version = "PY3",
deps = [
"//src/proto/grpc/testing:empty_py_pb2",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:py_test_proto",
"//src/proto/grpc/testing:test_py_pb2_grpc",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz",
"//src/python/grpcio_health_checking/grpc_health/v1:grpc_health",
"//src/python/grpcio_reflection/grpc_reflection/v1alpha:grpc_reflection",
],
)

@ -0,0 +1,25 @@
FROM phusion/baseimage:master@sha256:65ea10d5f757e5e86272625f8675d437dd83d8db64bdb429e2354d58f5462750
RUN apt-get update -y && \
apt-get install -y \
build-essential \
clang \
python3 \
python3-dev
WORKDIR /workdir
RUN ln -s /usr/bin/python3 /usr/bin/python
RUN mkdir /artifacts
COPY . .
RUN tools/bazel build -c dbg //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client
RUN cp -rL /workdir/bazel-bin/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client* /artifacts/
FROM phusion/baseimage:master@sha256:65ea10d5f757e5e86272625f8675d437dd83d8db64bdb429e2354d58f5462750
COPY --from=0 /artifacts ./
RUN apt-get update -y && apt-get install -y python3
RUN ln -s /usr/bin/python3 /usr/bin/python
ENTRYPOINT ["/xds_interop_client"]

@ -0,0 +1,25 @@
FROM phusion/baseimage:master@sha256:65ea10d5f757e5e86272625f8675d437dd83d8db64bdb429e2354d58f5462750
RUN apt-get update -y && \
apt-get install -y \
build-essential \
clang \
python3 \
python3-dev
WORKDIR /workdir
RUN ln -s /usr/bin/python3 /usr/bin/python
RUN mkdir /artifacts
COPY . .
RUN tools/bazel build -c dbg //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_server
RUN cp -rL /workdir/bazel-bin/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server* /artifacts/
FROM phusion/baseimage:master@sha256:65ea10d5f757e5e86272625f8675d437dd83d8db64bdb429e2354d58f5462750
COPY --from=0 /artifacts ./
RUN apt-get update -y && apt-get install -y python3
RUN ln -s /usr/bin/python3 /usr/bin/python
ENTRYPOINT ["/xds_interop_server"]

@ -0,0 +1,28 @@
# Copyright 2021 The gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/bash
set -x
VERSION=$(git rev-parse HEAD)
PROJECT=grpc-testing
TAG=gcr.io/${PROJECT}/python_xds_interop_client:$VERSION
cd "$(dirname "${BASH_SOURCE[0]}")/../../../../.."
docker build \
-t ${TAG} \
-f src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.client \
.

@ -0,0 +1,28 @@
# Copyright 2021 The gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/bash
set -x
VERSION=$(git rev-parse HEAD)
PROJECT=grpc-testing
TAG=gcr.io/${PROJECT}/python_xds_interop_server:$VERSION
cd "$(dirname "${BASH_SOURCE[0]}")/../../../../.."
docker build \
-t ${TAG} \
-f src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.server \
.

@ -27,6 +27,7 @@ import collections
from concurrent import futures
import grpc
from grpc_channelz.v1 import channelz
from src.proto.grpc.testing import test_pb2
from src.proto.grpc.testing import test_pb2_grpc
@ -257,9 +258,9 @@ class _ChannelConfiguration:
When accessing any of its members, the lock member should be held.
"""
def __init__(self, method: str, metadata: Sequence[Tuple[str,
str]], qps: int,
server: str, rpc_timeout_sec: int, print_response: bool):
def __init__(self, method: str, metadata: Sequence[Tuple[str, str]],
qps: int, server: str, rpc_timeout_sec: int,
print_response: bool, secure_mode: bool):
# condition is signalled when a change is made to the config.
self.condition = threading.Condition()
@ -269,13 +270,21 @@ class _ChannelConfiguration:
self.server = server
self.rpc_timeout_sec = rpc_timeout_sec
self.print_response = print_response
self.secure_mode = secure_mode
def _run_single_channel(config: _ChannelConfiguration) -> None:
global _global_rpc_id # pylint: disable=global-statement
with config.condition:
server = config.server
with grpc.insecure_channel(server) as channel:
channel = None
if config.secure_mode:
fallback_creds = grpc.experimental.insecure_channel_credentials()
channel_creds = grpc.xds_channel_credentials(fallback_creds)
channel = grpc.secure_channel(server, channel_creds)
else:
channel = grpc.insecure_channel(server)
with channel:
stub = test_pb2_grpc.TestServiceStub(channel)
futures: Dict[int, Tuple[grpc.Future, str]] = {}
while not _stop_event.is_set():
@ -382,7 +391,7 @@ def _run(args: argparse.Namespace, methods: Sequence[str],
qps = 0
channel_config = _ChannelConfiguration(
method, per_method_metadata.get(method, []), qps, args.server,
args.rpc_timeout_sec, args.print_response)
args.rpc_timeout_sec, args.print_response, args.secure_mode)
channel_configs[method] = channel_config
method_handles.append(_MethodHandle(args.num_channels, channel_config))
_global_server = grpc.server(futures.ThreadPoolExecutor())
@ -392,6 +401,7 @@ def _run(args: argparse.Namespace, methods: Sequence[str],
test_pb2_grpc.add_XdsUpdateClientConfigureServiceServicer_to_server(
_XdsUpdateClientConfigureServicer(channel_configs, args.qps),
_global_server)
channelz.add_channelz_servicer(_global_server)
_global_server.start()
_global_server.wait_for_termination()
for method_handle in method_handles:
@ -420,6 +430,15 @@ def parse_rpc_arg(rpc_arg: str) -> Sequence[str]:
return methods
def bool_arg(arg: str) -> bool:
if arg.lower() in ("true", "yes", "y"):
return True
elif arg.lower() in ("false", "no", "n"):
return False
else:
raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Run Python XDS interop client.')
@ -429,8 +448,8 @@ if __name__ == "__main__":
type=int,
help="The number of channels from which to send requests.")
parser.add_argument("--print_response",
default=False,
action="store_true",
default="False",
type=bool_arg,
help="Write RPC response to STDOUT.")
parser.add_argument(
"--qps",
@ -449,6 +468,11 @@ if __name__ == "__main__":
default=50052,
type=int,
help="The port on which to expose the peer distribution stats service.")
parser.add_argument(
"--secure_mode",
default="False",
type=bool_arg,
help="If specified, uses xDS credentials to connect to the server.")
parser.add_argument('--verbose',
help='verbose log output',
default=False,

@ -0,0 +1,178 @@
# Copyright 2021 The gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import signal
import threading
import time
import socket
import sys
from typing import DefaultDict, Dict, List, Mapping, Set, Sequence, Tuple
import collections
from concurrent import futures
import grpc
from grpc_channelz.v1 import channelz
from grpc_channelz.v1 import channelz_pb2
from grpc_health.v1 import health_pb2, health_pb2_grpc
from grpc_health.v1 import health as grpc_health
from grpc_reflection.v1alpha import reflection
from src.proto.grpc.testing import test_pb2
from src.proto.grpc.testing import test_pb2_grpc
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import empty_pb2
# NOTE: This interop server is not fully compatible with all xDS interop tests.
# It currently only implements enough functionality to pass the xDS security
# tests.
_LISTEN_HOST = "[::]"
_THREAD_POOL_SIZE = 256
logger = logging.getLogger()
console_handler = logging.StreamHandler()
formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
class TestService(test_pb2_grpc.TestServiceServicer):
def __init__(self, server_id, hostname):
self._server_id = server_id
self._hostname = hostname
def EmptyCall(self, _: empty_pb2.Empty,
context: grpc.ServicerContext) -> empty_pb2.Empty:
return empty_pb2.Empty()
def UnaryCall(self, request: messages_pb2.SimpleRequest,
context: grpc.ServicerContext) -> messages_pb2.SimpleResponse:
response = messages_pb2.SimpleResponse()
response.server_id = self._server_id
response.hostname = self._hostname
return response
def _configure_maintenance_server(server: grpc.Server,
maintenance_port: int) -> None:
channelz.add_channelz_servicer(server)
listen_address = f"{_LISTEN_HOST}:{maintenance_port}"
server.add_insecure_port(listen_address)
health_servicer = grpc_health.HealthServicer(
experimental_non_blocking=True,
experimental_thread_pool=futures.ThreadPoolExecutor(
max_workers=_THREAD_POOL_SIZE))
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
SERVICE_NAMES = (
test_pb2.DESCRIPTOR.services_by_name["TestService"].full_name,
health_pb2.DESCRIPTOR.services_by_name["Health"].full_name,
channelz_pb2.DESCRIPTOR.services_by_name["Channelz"].full_name,
reflection.SERVICE_NAME,
)
for service in SERVICE_NAMES:
health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
reflection.enable_server_reflection(SERVICE_NAMES, server)
def _configure_test_server(server: grpc.Server, port: int, secure_mode: bool,
server_id: str) -> None:
test_pb2_grpc.add_TestServiceServicer_to_server(
TestService(server_id, socket.gethostname()), server)
listen_address = f"{_LISTEN_HOST}:{port}"
if not secure_mode:
server.add_insecure_port(listen_address)
else:
logger.info("Running with xDS Server credentials")
server_fallback_creds = grpc.insecure_server_credentials()
server_creds = grpc.xds_server_credentials(server_fallback_creds)
server.add_secure_port(listen_address, server_creds)
def _run(port: int, maintenance_port: int, secure_mode: bool,
server_id: str) -> None:
if port == maintenance_port:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE))
_configure_test_server(server, port, secure_mode, server_id)
_configure_maintenance_server(server, maintenance_port)
server.start()
logger.info("Test server listening on port %d", port)
logger.info("Maintenance server listening on port %d", maintenance_port)
server.wait_for_termination()
else:
test_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE),
xds=secure_mode)
_configure_test_server(test_server, port, secure_mode, server_id)
test_server.start()
logger.info("Test server listening on port %d", port)
maintenance_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE))
_configure_maintenance_server(maintenance_server, maintenance_port)
maintenance_server.start()
logger.info("Maintenance server listening on port %d", maintenance_port)
test_server.wait_for_termination()
maintenance_server.wait_for_termination()
def bool_arg(arg: str) -> bool:
if arg.lower() in ("true", "yes", "y"):
return True
elif arg.lower() in ("false", "no", "n"):
return False
else:
raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run Python xDS interop server.")
parser.add_argument("--port",
type=int,
default=8080,
help="Port for test server.")
parser.add_argument("--maintenance_port",
type=int,
default=8080,
help="Port for servers besides test server.")
parser.add_argument(
"--secure_mode",
type=bool_arg,
default="False",
help="If specified, uses xDS to retrieve server credentials.")
parser.add_argument("--server_id",
type=str,
default="python_server",
help="The server ID to return in responses..")
parser.add_argument('--verbose',
help='verbose log output',
default=False,
action='store_true')
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
if args.secure_mode and args.port == args.maintenance_port:
raise ValueError(
"--port and --maintenance_port must not be the same when --secure_mode is set."
)
_run(args.port, args.maintenance_port, args.secure_mode, args.server_id)
Loading…
Cancel
Save