pull/23892/head
Richard Belleville 4 years ago
parent d7658d52ff
commit efd0521731
  1. 77
      src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py
  2. 8
      tools/run_tests/run_xds_tests.py

@ -37,10 +37,14 @@ formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
_SUPPORTED_METHODS = ("UnaryCall", "EmptyCall",)
_SUPPORTED_METHODS = (
"UnaryCall",
"EmptyCall",
)
PerMethodMetadataType = Mapping[str, Sequence[Tuple[str, str]]]
class _StatsWatcher:
_start: int
_end: int
@ -56,7 +60,8 @@ class _StatsWatcher:
self._end = end
self._rpcs_needed = end - start
self._rpcs_by_peer = collections.defaultdict(int)
self._rpcs_by_method = collections.defaultdict(lambda: collections.defaultdict(int))
self._rpcs_by_method = collections.defaultdict(
lambda: collections.defaultdict(int))
self._condition = threading.Condition()
self._no_remote_peer = 0
@ -125,8 +130,10 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer
return response
def _start_rpc(method: str, metadata: Sequence[Tuple[str, str]], request_id: int, stub: test_pb2_grpc.TestServiceStub,
timeout: float, futures: Mapping[int, Tuple[grpc.Future, str]]) -> None:
def _start_rpc(method: str, metadata: Sequence[Tuple[str, str]],
request_id: int, stub: test_pb2_grpc.TestServiceStub,
timeout: float,
futures: Mapping[int, Tuple[grpc.Future, str]]) -> None:
logger.info(f"Sending {method} request to backend: {request_id}")
if method == "UnaryCall":
future = stub.UnaryCall.future(messages_pb2.SimpleRequest(),
@ -140,8 +147,8 @@ def _start_rpc(method: str, metadata: Sequence[Tuple[str, str]], request_id: int
raise ValueError(f"Unrecognized method '{method}'.")
futures[request_id] = (future, method)
def _on_rpc_done(rpc_id: int, future: grpc.Future,
method: str,
def _on_rpc_done(rpc_id: int, future: grpc.Future, method: str,
print_response: bool) -> None:
exception = future.exception()
hostname = ""
@ -152,7 +159,10 @@ def _on_rpc_done(rpc_id: int, future: grpc.Future,
logger.error(exception)
else:
response = future.result()
hostname_metadata = [metadatum for metadatum in future.initial_metadata() if metadatum[0] == "hostname"]
hostname_metadata = [
metadatum for metadatum in future.initial_metadata()
if metadatum[0] == "hostname"
]
hostname = None
if len(hostname_metadata) == 1:
hostname = hostname_metadata[0][1]
@ -186,7 +196,9 @@ def _cancel_all_rpcs(futures: Mapping[int, Tuple[grpc.Future, str]]) -> None:
future.cancel()
def _run_single_channel(method: str, metadata: Sequence[Tuple[str, str]], qps: int, server: str, rpc_timeout_sec: int, print_response: bool):
def _run_single_channel(method: str, metadata: Sequence[Tuple[str, str]],
qps: int, server: str, rpc_timeout_sec: int,
print_response: bool):
global _global_rpc_id # pylint: disable=global-statement
duration_per_query = 1.0 / float(qps)
with grpc.insecure_channel(server) as channel:
@ -199,7 +211,8 @@ def _run_single_channel(method: str, metadata: Sequence[Tuple[str, str]], qps: i
_global_rpc_id += 1
start = time.time()
end = start + duration_per_query
_start_rpc(method, metadata, request_id, stub, float(rpc_timeout_sec), futures)
_start_rpc(method, metadata, request_id, stub,
float(rpc_timeout_sec), futures)
_remove_completed_rpcs(futures, print_response)
logger.debug(f"Currently {len(futures)} in-flight RPCs")
now = time.time()
@ -208,16 +221,27 @@ def _run_single_channel(method: str, metadata: Sequence[Tuple[str, str]], qps: i
now = time.time()
_cancel_all_rpcs(futures)
class _MethodHandle:
"""An object grouping together threads driving RPCs for a method."""
_channel_threads: List[threading.Thread]
def __init__(self, method: str, metadata: Sequence[Tuple[str, str]], num_channels: int, qps: int, server: str, rpc_timeout_sec: int, print_response: bool):
def __init__(self, method: str, metadata: Sequence[Tuple[str, str]],
num_channels: int, qps: int, server: str, rpc_timeout_sec: int,
print_response: bool):
"""Creates and starts a group of threads running the indicated method."""
self._channel_threads = []
for i in range(num_channels):
thread = threading.Thread(target=_run_single_channel, args=(method, metadata, qps, server, rpc_timeout_sec, print_response,))
thread = threading.Thread(target=_run_single_channel,
args=(
method,
metadata,
qps,
server,
rpc_timeout_sec,
print_response,
))
thread.start()
self._channel_threads.append(thread)
@ -227,12 +251,16 @@ class _MethodHandle:
channel_thread.join()
def _run(args: argparse.Namespace, methods: Sequence[str], per_method_metadata: PerMethodMetadataType) -> None:
def _run(args: argparse.Namespace, methods: Sequence[str],
per_method_metadata: PerMethodMetadataType) -> None:
logger.info("Starting python xDS Interop Client.")
global _global_server # pylint: disable=global-statement
method_handles = []
for method in methods:
method_handles.append(_MethodHandle(method, per_method_metadata.get(method, []), args.num_channels, args.qps, args.server, args.rpc_timeout_sec, args.print_response))
method_handles.append(
_MethodHandle(method, per_method_metadata.get(method, []),
args.num_channels, args.qps, args.server,
args.rpc_timeout_sec, args.print_response))
_global_server = grpc.server(futures.ThreadPoolExecutor())
_global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}")
test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server(
@ -283,17 +311,12 @@ if __name__ == "__main__":
rpc_help = "A comma-delimited list of RPC methods to run. Must be one of "
rpc_help += ", ".join(_SUPPORTED_METHODS)
rpc_help += "."
parser.add_argument("--rpc",
default="UnaryCall",
type=str,
help=rpc_help)
metadata_help = ("A comma-delimited list of 3-tuples of the form " +
"METHOD:KEY:VALUE, e.g. " +
"EmptyCall:key1:value1,UnaryCall:key2:value2,EmptyCall:k3:v3")
parser.add_argument("--metadata",
default="",
type=str,
help=metadata_help)
parser.add_argument("--rpc", default="UnaryCall", type=str, help=rpc_help)
metadata_help = (
"A comma-delimited list of 3-tuples of the form " +
"METHOD:KEY:VALUE, e.g. " +
"EmptyCall:key1:value1,UnaryCall:key2:value2,EmptyCall:k3:v3")
parser.add_argument("--metadata", default="", type=str, help=metadata_help)
args = parser.parse_args()
signal.signal(signal.SIGINT, _handle_sigint)
if args.verbose:
@ -304,13 +327,15 @@ if __name__ == "__main__":
logger.addHandler(file_handler)
methods = args.rpc.split(",")
if set(methods) - set(_SUPPORTED_METHODS):
raise ValueError("--rpc supported methods: {}".format(", ".join(_SUPPORTED_METHODS)))
raise ValueError("--rpc supported methods: {}".format(
", ".join(_SUPPORTED_METHODS)))
per_method_metadata = collections.defaultdict(list)
metadata = args.metadata.split(",") if args.metadata else []
for metadatum in metadata:
elems = metadatum.split(":")
if len(elems) != 3:
raise ValueError(f"'{metadatum}' was not in the form 'METHOD:KEY:VALUE'")
raise ValueError(
f"'{metadatum}' was not in the form 'METHOD:KEY:VALUE'")
if elems[0] not in _SUPPORTED_METHODS:
raise ValueError(f"Unrecognized method '{elems[0]}'")
per_method_metadata[elems[0]].append((elems[1], elems[2]))

@ -1803,6 +1803,7 @@ try:
stderr=subprocess.STDOUT,
stdout=test_log_file)
client_logged = threading.Event()
def _log_client(client_process, client_logged):
# NOTE(rbellevi): Runs on another thread and logs the
# client's output as soon as it terminates. This enables
@ -1826,7 +1827,12 @@ try:
logger.info(client_output.read())
client_logged.set()
logging_thread = threading.Thread(target=_log_client, args=(client_process, client_logged,), daemon=True)
logging_thread = threading.Thread(target=_log_client,
args=(
client_process,
client_logged,
),
daemon=True)
logging_thread.start()
if test_case == 'backends_restart':
test_backends_restart(gcp, backend_service, instance_group)

Loading…
Cancel
Save