mirror of https://github.com/grpc/grpc.git
commit
b1652b4443
52 changed files with 1293 additions and 427 deletions
@ -0,0 +1,130 @@ |
||||
# Copyright 2020 The gRPC Authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""Tests the behaviour of the Call classes under a secure channel.""" |
||||
|
||||
import unittest |
||||
import logging |
||||
|
||||
import grpc |
||||
from grpc.experimental import aio |
||||
from src.proto.grpc.testing import messages_pb2, test_pb2_grpc |
||||
from tests_aio.unit._test_base import AioTestBase |
||||
from tests_aio.unit._test_server import start_test_server |
||||
from tests.unit import resources |
||||
|
||||
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' |
||||
_NUM_STREAM_RESPONSES = 5 |
||||
_RESPONSE_PAYLOAD_SIZE = 42 |
||||
|
||||
|
||||
class _SecureCallMixin: |
||||
"""A Mixin to run the call tests over a secure channel.""" |
||||
|
||||
async def setUp(self): |
||||
server_credentials = grpc.ssl_server_credentials([ |
||||
(resources.private_key(), resources.certificate_chain()) |
||||
]) |
||||
channel_credentials = grpc.ssl_channel_credentials( |
||||
resources.test_root_certificates()) |
||||
|
||||
self._server_address, self._server = await start_test_server( |
||||
secure=True, server_credentials=server_credentials) |
||||
channel_options = (( |
||||
'grpc.ssl_target_name_override', |
||||
_SERVER_HOST_OVERRIDE, |
||||
),) |
||||
self._channel = aio.secure_channel(self._server_address, |
||||
channel_credentials, channel_options) |
||||
self._stub = test_pb2_grpc.TestServiceStub(self._channel) |
||||
|
||||
async def tearDown(self): |
||||
await self._channel.close() |
||||
await self._server.stop(None) |
||||
|
||||
|
||||
class TestUnaryUnarySecureCall(_SecureCallMixin, AioTestBase): |
||||
"""unary_unary Calls made over a secure channel.""" |
||||
|
||||
async def test_call_ok_over_secure_channel(self): |
||||
call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) |
||||
response = await call |
||||
self.assertIsInstance(response, messages_pb2.SimpleResponse) |
||||
self.assertEqual(await call.code(), grpc.StatusCode.OK) |
||||
|
||||
async def test_call_with_credentials(self): |
||||
call_credentials = grpc.composite_call_credentials( |
||||
grpc.access_token_call_credentials("abc"), |
||||
grpc.access_token_call_credentials("def"), |
||||
) |
||||
call = self._stub.UnaryCall(messages_pb2.SimpleRequest(), |
||||
credentials=call_credentials) |
||||
response = await call |
||||
|
||||
self.assertIsInstance(response, messages_pb2.SimpleResponse) |
||||
|
||||
|
||||
class TestUnaryStreamSecureCall(_SecureCallMixin, AioTestBase): |
||||
"""unary_stream calls over a secure channel""" |
||||
|
||||
async def test_unary_stream_async_generator_secure(self): |
||||
request = messages_pb2.StreamingOutputCallRequest() |
||||
request.response_parameters.extend( |
||||
messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,) |
||||
for _ in range(_NUM_STREAM_RESPONSES)) |
||||
call_credentials = grpc.composite_call_credentials( |
||||
grpc.access_token_call_credentials("abc"), |
||||
grpc.access_token_call_credentials("def"), |
||||
) |
||||
call = self._stub.StreamingOutputCall(request, |
||||
credentials=call_credentials) |
||||
|
||||
async for response in call: |
||||
self.assertIsInstance(response, |
||||
messages_pb2.StreamingOutputCallResponse) |
||||
self.assertEqual(len(response.payload.body), _RESPONSE_PAYLOAD_SIZE) |
||||
|
||||
self.assertEqual(await call.code(), grpc.StatusCode.OK) |
||||
|
||||
|
||||
# Prepares the request that stream in a ping-pong manner. |
||||
_STREAM_OUTPUT_REQUEST_ONE_RESPONSE = messages_pb2.StreamingOutputCallRequest() |
||||
_STREAM_OUTPUT_REQUEST_ONE_RESPONSE.response_parameters.append( |
||||
messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) |
||||
|
||||
|
||||
class TestStreamStreamSecureCall(_SecureCallMixin, AioTestBase): |
||||
_STREAM_ITERATIONS = 2 |
||||
|
||||
async def test_async_generator_secure_channel(self): |
||||
|
||||
async def request_generator(): |
||||
for _ in range(self._STREAM_ITERATIONS): |
||||
yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE |
||||
|
||||
call_credentials = grpc.composite_call_credentials( |
||||
grpc.access_token_call_credentials("abc"), |
||||
grpc.access_token_call_credentials("def"), |
||||
) |
||||
|
||||
call = self._stub.FullDuplexCall(request_generator(), |
||||
credentials=call_credentials) |
||||
async for response in call: |
||||
self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) |
||||
|
||||
self.assertEqual(await call.code(), grpc.StatusCode.OK) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
logging.basicConfig(level=logging.DEBUG) |
||||
unittest.main(verbosity=2) |
@ -0,0 +1,12 @@ |
||||
py_binary( |
||||
name = "xds_interop_client", |
||||
srcs = ["xds_interop_client.py"], |
||||
python_version = "PY3", |
||||
deps = [ |
||||
"//src/proto/grpc/testing:empty_py_pb2", |
||||
"//src/proto/grpc/testing:py_messages_proto", |
||||
"//src/proto/grpc/testing:py_test_proto", |
||||
"//src/proto/grpc/testing:test_py_pb2_grpc", |
||||
"//src/python/grpcio/grpc:grpcio", |
||||
], |
||||
) |
@ -0,0 +1,254 @@ |
||||
# Copyright 2020 The gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
import argparse |
||||
import logging |
||||
import signal |
||||
import threading |
||||
import time |
||||
import sys |
||||
|
||||
from typing import DefaultDict, Dict, List, Mapping, Set |
||||
import collections |
||||
|
||||
from concurrent import futures |
||||
|
||||
import grpc |
||||
|
||||
from src.proto.grpc.testing import test_pb2 |
||||
from src.proto.grpc.testing import test_pb2_grpc |
||||
from src.proto.grpc.testing import messages_pb2 |
||||
from src.proto.grpc.testing import empty_pb2 |
||||
|
||||
logger = logging.getLogger() |
||||
console_handler = logging.StreamHandler() |
||||
formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') |
||||
console_handler.setFormatter(formatter) |
||||
logger.addHandler(console_handler) |
||||
|
||||
|
||||
class _StatsWatcher: |
||||
_start: int |
||||
_end: int |
||||
_rpcs_needed: int |
||||
_rpcs_by_peer: DefaultDict[str, int] |
||||
_no_remote_peer: int |
||||
_lock: threading.Lock |
||||
_condition: threading.Condition |
||||
|
||||
def __init__(self, start: int, end: int): |
||||
self._start = start |
||||
self._end = end |
||||
self._rpcs_needed = end - start |
||||
self._rpcs_by_peer = collections.defaultdict(int) |
||||
self._condition = threading.Condition() |
||||
self._no_remote_peer = 0 |
||||
|
||||
def on_rpc_complete(self, request_id: int, peer: str) -> None: |
||||
"""Records statistics for a single RPC.""" |
||||
if self._start <= request_id < self._end: |
||||
with self._condition: |
||||
if not peer: |
||||
self._no_remote_peer += 1 |
||||
else: |
||||
self._rpcs_by_peer[peer] += 1 |
||||
self._rpcs_needed -= 1 |
||||
self._condition.notify() |
||||
|
||||
def await_rpc_stats_response(self, timeout_sec: int |
||||
) -> messages_pb2.LoadBalancerStatsResponse: |
||||
"""Blocks until a full response has been collected.""" |
||||
with self._condition: |
||||
self._condition.wait_for(lambda: not self._rpcs_needed, |
||||
timeout=float(timeout_sec)) |
||||
response = messages_pb2.LoadBalancerStatsResponse() |
||||
for peer, count in self._rpcs_by_peer.items(): |
||||
response.rpcs_by_peer[peer] = count |
||||
response.num_failures = self._no_remote_peer + self._rpcs_needed |
||||
return response |
||||
|
||||
|
||||
_global_lock = threading.Lock() |
||||
_stop_event = threading.Event() |
||||
_global_rpc_id: int = 0 |
||||
_watchers: Set[_StatsWatcher] = set() |
||||
_global_server = None |
||||
|
||||
|
||||
def _handle_sigint(sig, frame): |
||||
_stop_event.set() |
||||
_global_server.stop(None) |
||||
|
||||
|
||||
class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer |
||||
): |
||||
|
||||
def __init__(self): |
||||
super(_LoadBalancerStatsServicer).__init__() |
||||
|
||||
def GetClientStats(self, request: messages_pb2.LoadBalancerStatsRequest, |
||||
context: grpc.ServicerContext |
||||
) -> messages_pb2.LoadBalancerStatsResponse: |
||||
logger.info("Received stats request.") |
||||
start = None |
||||
end = None |
||||
watcher = None |
||||
with _global_lock: |
||||
start = _global_rpc_id + 1 |
||||
end = start + request.num_rpcs |
||||
watcher = _StatsWatcher(start, end) |
||||
_watchers.add(watcher) |
||||
response = watcher.await_rpc_stats_response(request.timeout_sec) |
||||
with _global_lock: |
||||
_watchers.remove(watcher) |
||||
logger.info("Returning stats response: {}".format(response)) |
||||
return response |
||||
|
||||
|
||||
def _start_rpc(request_id: int, stub: test_pb2_grpc.TestServiceStub, |
||||
timeout: float, futures: Mapping[int, grpc.Future]) -> None: |
||||
logger.info(f"Sending request to backend: {request_id}") |
||||
future = stub.UnaryCall.future(messages_pb2.SimpleRequest(), |
||||
timeout=timeout) |
||||
futures[request_id] = future |
||||
|
||||
|
||||
def _on_rpc_done(rpc_id: int, future: grpc.Future, |
||||
print_response: bool) -> None: |
||||
exception = future.exception() |
||||
hostname = "" |
||||
if exception is not None: |
||||
if exception.code() == grpc.StatusCode.DEADLINE_EXCEEDED: |
||||
logger.error(f"RPC {rpc_id} timed out") |
||||
else: |
||||
logger.error(exception) |
||||
else: |
||||
response = future.result() |
||||
logger.info(f"Got result {rpc_id}") |
||||
hostname = response.hostname |
||||
if print_response: |
||||
if future.code() == grpc.StatusCode.OK: |
||||
logger.info("Successful response.") |
||||
else: |
||||
logger.info(f"RPC failed: {call}") |
||||
with _global_lock: |
||||
for watcher in _watchers: |
||||
watcher.on_rpc_complete(rpc_id, hostname) |
||||
|
||||
|
||||
def _remove_completed_rpcs(futures: Mapping[int, grpc.Future], |
||||
print_response: bool) -> None: |
||||
logger.debug("Removing completed RPCs") |
||||
done = [] |
||||
for future_id, future in futures.items(): |
||||
if future.done(): |
||||
_on_rpc_done(future_id, future, args.print_response) |
||||
done.append(future_id) |
||||
for rpc_id in done: |
||||
del futures[rpc_id] |
||||
|
||||
|
||||
def _cancel_all_rpcs(futures: Mapping[int, grpc.Future]) -> None: |
||||
logger.info("Cancelling all remaining RPCs") |
||||
for future in futures.values(): |
||||
future.cancel() |
||||
|
||||
|
||||
def _run_single_channel(args: argparse.Namespace): |
||||
global _global_rpc_id # pylint: disable=global-statement |
||||
duration_per_query = 1.0 / float(args.qps) |
||||
with grpc.insecure_channel(args.server) as channel: |
||||
stub = test_pb2_grpc.TestServiceStub(channel) |
||||
futures: Dict[int, grpc.Future] = {} |
||||
while not _stop_event.is_set(): |
||||
request_id = None |
||||
with _global_lock: |
||||
request_id = _global_rpc_id |
||||
_global_rpc_id += 1 |
||||
start = time.time() |
||||
end = start + duration_per_query |
||||
_start_rpc(request_id, stub, float(args.rpc_timeout_sec), futures) |
||||
_remove_completed_rpcs(futures, args.print_response) |
||||
logger.debug(f"Currently {len(futures)} in-flight RPCs") |
||||
now = time.time() |
||||
while now < end: |
||||
time.sleep(end - now) |
||||
now = time.time() |
||||
_cancel_all_rpcs(futures) |
||||
|
||||
|
||||
def _run(args: argparse.Namespace) -> None: |
||||
logger.info("Starting python xDS Interop Client.") |
||||
global _global_server # pylint: disable=global-statement |
||||
channel_threads: List[threading.Thread] = [] |
||||
for i in range(args.num_channels): |
||||
thread = threading.Thread(target=_run_single_channel, args=(args,)) |
||||
thread.start() |
||||
channel_threads.append(thread) |
||||
_global_server = grpc.server(futures.ThreadPoolExecutor()) |
||||
_global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}") |
||||
test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server( |
||||
_LoadBalancerStatsServicer(), _global_server) |
||||
_global_server.start() |
||||
_global_server.wait_for_termination() |
||||
for i in range(args.num_channels): |
||||
thread.join() |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
parser = argparse.ArgumentParser( |
||||
description='Run Python XDS interop client.') |
||||
parser.add_argument( |
||||
"--num_channels", |
||||
default=1, |
||||
type=int, |
||||
help="The number of channels from which to send requests.") |
||||
parser.add_argument("--print_response", |
||||
default=False, |
||||
action="store_true", |
||||
help="Write RPC response to STDOUT.") |
||||
parser.add_argument( |
||||
"--qps", |
||||
default=1, |
||||
type=int, |
||||
help="The number of queries to send from each channel per second.") |
||||
parser.add_argument("--rpc_timeout_sec", |
||||
default=30, |
||||
type=int, |
||||
help="The per-RPC timeout in seconds.") |
||||
parser.add_argument("--server", |
||||
default="localhost:50051", |
||||
help="The address of the server.") |
||||
parser.add_argument( |
||||
"--stats_port", |
||||
default=50052, |
||||
type=int, |
||||
help="The port on which to expose the peer distribution stats service.") |
||||
parser.add_argument('--verbose', |
||||
help='verbose log output', |
||||
default=False, |
||||
action='store_true') |
||||
parser.add_argument("--log_file", |
||||
default=None, |
||||
type=str, |
||||
help="A file to log to.") |
||||
args = parser.parse_args() |
||||
signal.signal(signal.SIGINT, _handle_sigint) |
||||
if args.verbose: |
||||
logger.setLevel(logging.DEBUG) |
||||
if args.log_file: |
||||
file_handler = logging.FileHandler(args.log_file, mode='a') |
||||
file_handler.setFormatter(formatter) |
||||
logger.addHandler(file_handler) |
||||
_run(args) |
@ -0,0 +1,56 @@ |
||||
#!/usr/bin/env bash |
||||
# Copyright 2020 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
set -ex -o igncr || set -ex |
||||
|
||||
mkdir -p /var/local/git |
||||
git clone /var/local/jenkins/grpc /var/local/git/grpc |
||||
(cd /var/local/jenkins/grpc/ && git submodule foreach 'cd /var/local/git/grpc \ |
||||
&& git submodule update --init --reference /var/local/jenkins/grpc/${name} \ |
||||
${name}') |
||||
cd /var/local/git/grpc |
||||
|
||||
VIRTUAL_ENV=$(mktemp -d) |
||||
virtualenv "$VIRTUAL_ENV" |
||||
PYTHON="$VIRTUAL_ENV"/bin/python |
||||
"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client |
||||
|
||||
# Prepare generated Python code. |
||||
TOOLS_DIR=tools/run_tests |
||||
PROTO_SOURCE_DIR=src/proto/grpc/testing |
||||
PROTO_DEST_DIR="$TOOLS_DIR"/"$PROTO_SOURCE_DIR" |
||||
mkdir -p "$PROTO_DEST_DIR" |
||||
touch "$TOOLS_DIR"/src/__init__.py |
||||
touch "$TOOLS_DIR"/src/proto/__init__.py |
||||
touch "$TOOLS_DIR"/src/proto/grpc/__init__.py |
||||
touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py |
||||
|
||||
"$PYTHON" -m grpc_tools.protoc \ |
||||
--proto_path=. \ |
||||
--python_out="$TOOLS_DIR" \ |
||||
--grpc_python_out="$TOOLS_DIR" \ |
||||
"$PROTO_SOURCE_DIR"/test.proto \ |
||||
"$PROTO_SOURCE_DIR"/messages.proto \ |
||||
"$PROTO_SOURCE_DIR"/empty.proto |
||||
|
||||
bazel build //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client |
||||
|
||||
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,xds_lb "$PYTHON" \ |
||||
tools/run_tests/run_xds_tests.py \ |
||||
--test_case=all \ |
||||
--project_id=grpc-testing \ |
||||
--gcp_suffix=$(date '+%s') \ |
||||
--verbose \ |
||||
--client_cmd='bazel run //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client -- --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps} --verbose' |
@ -0,0 +1,23 @@ |
||||
# Copyright 2020 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
# Config file for the internal CI (in protobuf text format) |
||||
|
||||
# Location of the continuous shell script in repository. |
||||
build_file: "grpc/tools/internal_ci/linux/grpc_bazel.sh" |
||||
timeout_mins: 90 |
||||
env_vars { |
||||
key: "BAZEL_SCRIPT" |
||||
value: "tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh" |
||||
} |
Loading…
Reference in new issue