From 1ee45615e231057c93c3732b5bedc249bf877226 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 4 Feb 2020 09:55:46 -0800 Subject: [PATCH] Add benchmark client, server, and worker --- src/proto/grpc/core/BUILD | 11 ++ src/proto/grpc/testing/BUILD | 49 +++++ src/python/grpcio_tests/tests/qps/BUILD.bazel | 27 +++ .../tests_aio/benchmark/BUILD.bazel | 62 ++++++- .../tests_aio/benchmark/benchmark_client.py | 136 ++++++++++++++ .../tests_aio/benchmark/benchmark_servicer.py | 55 ++++++ .../tests_aio/benchmark/server.py | 18 +- .../tests_aio/benchmark/worker.py | 53 ++++++ .../tests_aio/benchmark/worker_servicer.py | 168 ++++++++++++++++++ 9 files changed, 558 insertions(+), 21 deletions(-) create mode 100644 src/python/grpcio_tests/tests/qps/BUILD.bazel create mode 100644 src/python/grpcio_tests/tests_aio/benchmark/benchmark_client.py create mode 100644 src/python/grpcio_tests/tests_aio/benchmark/benchmark_servicer.py create mode 100644 src/python/grpcio_tests/tests_aio/benchmark/worker.py create mode 100644 src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py diff --git a/src/proto/grpc/core/BUILD b/src/proto/grpc/core/BUILD index 051ff2fb073..c79e15d639b 100644 --- a/src/proto/grpc/core/BUILD +++ b/src/proto/grpc/core/BUILD @@ -15,6 +15,7 @@ licenses(["notice"]) # Apache v2 load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library") +load("//bazel:python_rules.bzl", "py_proto_library") grpc_package( name = "core", @@ -25,3 +26,13 @@ grpc_proto_library( name = "stats_proto", srcs = ["stats.proto"], ) + +proto_library( + name = "stats_descriptor", + srcs = ["stats.proto"], +) + +py_proto_library( + name = "stats_py_pb2", + deps = [":stats_descriptor"], +) diff --git a/src/proto/grpc/testing/BUILD b/src/proto/grpc/testing/BUILD index db187e2356a..e18661df162 100644 --- a/src/proto/grpc/testing/BUILD +++ b/src/proto/grpc/testing/BUILD @@ -233,3 +233,52 @@ py_grpc_library( srcs = [":test_proto_descriptor"], deps = [":py_test_proto"], ) + +proto_library( + name = "worker_service_descriptor", + srcs = ["worker_service.proto"], + deps = [":control_descriptor"], +) + +py_proto_library( + name = "worker_service_py_pb2", + deps = [":worker_service_descriptor"], +) + +py_grpc_library( + name = "worker_service_py_pb2_grpc", + srcs = [":worker_service_descriptor"], + deps = [":worker_service_py_pb2"], +) + +proto_library( + name = "stats_descriptor", + srcs = ["stats.proto"], + deps = ["//src/proto/grpc/core:stats_descriptor"], +) + +py_proto_library( + name = "stats_py_pb2", + deps = [":stats_descriptor"], +) + +proto_library( + name = "payloads_descriptor", + srcs = ["payloads.proto"], +) + +py_proto_library( + name = "payloads_py_pb2", + deps = [":payloads_descriptor"], +) + +proto_library( + name = "control_descriptor", + srcs = ["control.proto"], + deps = [":payloads_descriptor", ":stats_descriptor"] +) + +py_proto_library( + name = "control_py_pb2", + deps = [":control_descriptor"], +) diff --git a/src/python/grpcio_tests/tests/qps/BUILD.bazel b/src/python/grpcio_tests/tests/qps/BUILD.bazel new file mode 100644 index 00000000000..c4f5d4dbd1a --- /dev/null +++ b/src/python/grpcio_tests/tests/qps/BUILD.bazel @@ -0,0 +1,27 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package( + default_testonly = 1, + default_visibility = ["//visibility:public"], +) + +py_library( + name = "histogram", + srcs = ["histogram.py"], + srcs_version = "PY2AND3", + deps = [ + "//src/proto/grpc/testing:stats_py_pb2", + ], +) diff --git a/src/python/grpcio_tests/tests_aio/benchmark/BUILD.bazel b/src/python/grpcio_tests/tests_aio/benchmark/BUILD.bazel index 992b850c353..579d3b0bf1f 100644 --- a/src/python/grpcio_tests/tests_aio/benchmark/BUILD.bazel +++ b/src/python/grpcio_tests/tests_aio/benchmark/BUILD.bazel @@ -17,16 +17,66 @@ package( default_visibility = ["//visibility:public"], ) +py_library( + name = "benchmark_client", + srcs = ["benchmark_client.py"], + srcs_version = "PY3", + deps = [ + "//src/proto/grpc/testing:benchmark_service_py_pb2_grpc", + "//src/proto/grpc/testing:py_messages_proto", + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_tests/tests/qps:histogram", + "//src/python/grpcio_tests/tests/unit:resources", + ], +) + +py_library( + name = "benchmark_servicer", + srcs = ["benchmark_servicer.py"], + srcs_version = "PY3", + deps = [ + "//src/proto/grpc/testing:benchmark_service_py_pb2_grpc", + "//src/proto/grpc/testing:py_messages_proto", + "//src/python/grpcio/grpc:grpcio", + ], +) + +py_library( + name = "worker_servicer", + srcs = ["worker_servicer.py"], + srcs_version = "PY3", + data = [ + "//src/python/grpcio_tests/tests/unit/credentials", + ], + deps = [ + ":benchmark_client", + ":benchmark_servicer", + "//src/python/grpcio_tests/tests/qps:histogram", + "//src/python/grpcio_tests/tests/unit:resources", + "//src/proto/grpc/testing:benchmark_service_py_pb2_grpc", + "//src/proto/grpc/testing:worker_service_py_pb2_grpc", + "//src/proto/grpc/testing:control_py_pb2", + "//src/proto/grpc/testing:payloads_py_pb2", + "//src/proto/grpc/core:stats_py_pb2", + "//src/proto/grpc/testing:stats_py_pb2", + "//src/python/grpcio/grpc:grpcio", + ], +) + py_binary( name = "server", srcs = ["server.py"], python_version = "PY3", + deps = [":benchmark_servicer"], +) + +py_binary( + name = "worker", + srcs = ["worker.py"], + imports = ["../../"], + python_version = "PY3", deps = [ - "//src/proto/grpc/testing:benchmark_service_py_pb2", - "//src/proto/grpc/testing:benchmark_service_py_pb2_grpc", - "//src/proto/grpc/testing:py_messages_proto", - "//src/python/grpcio/grpc:grpcio", - "//src/python/grpcio_tests/tests/unit/framework/common", - "@six", + ":worker_servicer", + "//src/proto/grpc/testing:worker_service_py_pb2_grpc", ], ) diff --git a/src/python/grpcio_tests/tests_aio/benchmark/benchmark_client.py b/src/python/grpcio_tests/tests_aio/benchmark/benchmark_client.py new file mode 100644 index 00000000000..2cbe3da29a7 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/benchmark/benchmark_client.py @@ -0,0 +1,136 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python AsyncIO Benchmark Clients.""" + +import abc +import asyncio +import time + +import grpc +from grpc.experimental import aio + +from src.proto.grpc.testing import (benchmark_service_pb2_grpc, control_pb2, + messages_pb2) +from tests.qps import histogram +from tests.unit import resources + + +class GenericStub(object): + + def __init__(self, channel: aio.Channel): + self.UnaryCall = channel.unary_unary( + '/grpc.testing.BenchmarkService/UnaryCall') + self.StreamingCall = channel.stream_stream( + '/grpc.testing.BenchmarkService/StreamingCall') + + +class BenchmarkClient(abc.ABC): + """Benchmark client interface that exposes a non-blocking send_request().""" + + def __init__(self, address: str, config: control_pb2.ClientConfig, hist: histogram.Histogram): + # Creates the channel + if config.HasField('security_params'): + channel_credentials = grpc.ssl_channel_credentials( + resources.test_root_certificates()) + self._channel = aio.secure_channel(address, channel_credentials, (( + 'grpc.ssl_target_name_override', + config.security_params.server_host_override, + ),)) + else: + self._channel = aio.insecure_channel(address) + + # Creates the stub + if config.payload_config.WhichOneof('payload') == 'simple_params': + self._generic = False + self._stub = benchmark_service_pb2_grpc.BenchmarkServiceStub( + channel) + payload = messages_pb2.Payload( + body='\0' * config.payload_config.simple_params.req_size) + self._request = messages_pb2.SimpleRequest( + payload=payload, + response_size=config.payload_config.simple_params.resp_size) + else: + self._generic = True + self._stub = GenericStub(channel) + self._request = '\0' * config.payload_config.bytebuf_params.req_size + + self._hist = hist + self._response_callbacks = [] + self._concurrency = config.outstanding_rpcs_per_channel + + async def run(self) -> None: + await aio.channel_ready(self._channel) + + async def stop(self) -> None: + await self._channel.close() + + def _record_query_time(self, query_time: float) -> None: + self._hist.add(query_time * 1e9) + + +class UnaryAsyncBenchmarkClient(BenchmarkClient): + + def __init__(self, address: str, config: control_pb2.ClientConfig, hist: histogram.Histogram): + super().__init__(address, config, hist) + self._running = None + self._stopped = asyncio.Event() + + async def _send_request(self): + start_time = time.time() + await self._stub.UnaryCall(self._request) + self._record_query_time(self, time.time() - start_time) + + async def _infinite_sender(self) -> None: + while self._running: + await self._send_request() + + async def run(self) -> None: + await super().run() + self._running = True + senders = (self._infinite_sender() for _ in range(self._concurrency)) + await asyncio.wait(senders) + self._stopped.set() + + async def stop(self) -> None: + self._running = False + await self._stopped.wait() + await super().stop() + + +class StreamingAsyncBenchmarkClient(BenchmarkClient): + + def __init__(self, address: str, config: control_pb2.ClientConfig, hist: histogram.Histogram): + super().__init__(address, config, hist) + self._running = None + self._stopped = asyncio.Event() + + async def _one_streamming_call(self): + call = self._stub.StreamingCall() + while self._running: + start_time = time.time() + await call.write(self._request) + await call.read() + self._record_query_time(self, time.time() - start_time) + + async def run(self): + await super().run() + self._running = True + senders = (self._one_streamming_call() for _ in range(self._concurrency)) + await asyncio.wait(senders) + self._stopped.set() + + async def stop(self): + self._running = False + await self._stopped.wait() + await super().stop() diff --git a/src/python/grpcio_tests/tests_aio/benchmark/benchmark_servicer.py b/src/python/grpcio_tests/tests_aio/benchmark/benchmark_servicer.py new file mode 100644 index 00000000000..6aff17f7cac --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/benchmark/benchmark_servicer.py @@ -0,0 +1,55 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python AsyncIO Benchmark Servicers.""" + +import asyncio +import logging +import unittest + +from grpc.experimental import aio + +from src.proto.grpc.testing import benchmark_service_pb2_grpc, messages_pb2 + + +class BenchmarkServicer(benchmark_service_pb2_grpc.BenchmarkServiceServicer): + + async def UnaryCall(self, request, unused_context): + payload = messages_pb2.Payload(body=b'\0' * request.response_size) + return messages_pb2.SimpleResponse(payload=payload) + + async def StreamingFromServer(self, request, unused_context): + payload = messages_pb2.Payload(body=b'\0' * request.response_size) + # Sends response at full capacity! + while True: + yield messages_pb2.SimpleResponse(payload=payload) + + async def StreamingCall(self, request_iterator, unused_context): + payload = messages_pb2.Payload(body='\0' * request.response_size) + async for request in request_iterator: + yield messages_pb2.SimpleResponse(payload=payload) + + +class GenericBenchmarkServicer(benchmark_service_pb2_grpc.BenchmarkServiceServicer + ): + """Generic (no-codec) Server implementation for the Benchmark service.""" + + def __init__(self, resp_size): + self._response = '\0' * resp_size + + async def UnaryCall(self, unused_request, unused_context): + return self._response + + async def StreamingCall(self, request_iterator, unused_context): + async for request in request_iterator: + yield self._response diff --git a/src/python/grpcio_tests/tests_aio/benchmark/server.py b/src/python/grpcio_tests/tests_aio/benchmark/server.py index 1489bebcefa..05479a2997d 100644 --- a/src/python/grpcio_tests/tests_aio/benchmark/server.py +++ b/src/python/grpcio_tests/tests_aio/benchmark/server.py @@ -17,28 +17,16 @@ import logging import unittest from grpc.experimental import aio -from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import benchmark_service_pb2_grpc - - -class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer): - async def UnaryCall(self, request, context): - payload = messages_pb2.Payload(body=b'\0' * request.response_size) - return messages_pb2.SimpleResponse(payload=payload) - - async def StreamingFromServer(self, request, context): - payload = messages_pb2.Payload(body=b'\0' * request.response_size) - # Sends response at full capacity! - while True: - yield messages_pb2.SimpleResponse(payload=payload) +from src.proto.grpc.testing import benchmark_service_pb2_grpc +from tests_aio.benchmark import benchmark_servicer async def _start_async_server(): server = aio.server() port = server.add_insecure_port('localhost:%s' % 50051) - servicer = BenchmarkServer() + servicer = benchmark_servicer.BenchmarkServicer() benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server( servicer, server) diff --git a/src/python/grpcio_tests/tests_aio/benchmark/worker.py b/src/python/grpcio_tests/tests_aio/benchmark/worker.py new file mode 100644 index 00000000000..628a8885d15 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/benchmark/worker.py @@ -0,0 +1,53 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import asyncio +import logging + +from grpc.experimental import aio + +from src.proto.grpc.testing import worker_service_pb2_grpc +from tests_aio.benchmark import worker_servicer + + +async def run_worker_server(port: int) -> None: + aio.init_grpc_aio() + server = aio.server() + + servicer = worker_servicer.WorkerServicer() + worker_service_pb2_grpc.add_WorkerServiceServicer_to_server( + servicer, server) + + server.add_insecure_port('[::]:{}'.format(port)) + + await server.start() + + await servicer.wait_for_quit() + await server.stop(None) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + parser = argparse.ArgumentParser( + description='gRPC Python performance testing worker') + parser.add_argument('--driver_port', + type=int, + dest='port', + help='The port the worker should listen on') + args = parser.parse_args() + + asyncio.get_event_loop().run_until_complete( + run_worker_server(args.port) + ) diff --git a/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py b/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py new file mode 100644 index 00000000000..79dc014c1a1 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py @@ -0,0 +1,168 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import multiprocessing +import time +from typing import Tuple + +import grpc +from grpc.experimental import aio + +from src.proto.grpc.testing import (benchmark_service_pb2_grpc, control_pb2, + stats_pb2, worker_service_pb2_grpc) +from tests.qps import histogram +from tests.unit import resources +from tests_aio.benchmark import benchmark_client, benchmark_servicer + +_NUM_CORES = multiprocessing.cpu_count() +_NUM_CORE_PYTHON_CAN_USE = 1 + + +def _get_server_status(start_time: float, end_time: float, port: int) -> control_pb2.ServerStatus: + end_time = time.time() + elapsed_time = end_time - start_time + stats = stats_pb2.ServerStats(time_elapsed=elapsed_time, + time_user=elapsed_time, + time_system=elapsed_time) + return control_pb2.ServerStatus(stats=stats, port=port, cores=_NUM_CORE_PYTHON_CAN_USE) + + +def _create_server(config: control_pb2.ServerConfig) -> Tuple[aio.Server, int]: + if config.async_server_threads != 1: + logging.warning('config.async_server_threads [%d] != 1', config.async_server_threads) + + server = aio.server() + if config.server_type == control_pb2.ASYNC_SERVER: + servicer = benchmark_servicer.BenchmarkServicer() + benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server( + servicer, server) + elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: + resp_size = config.payload_config.bytebuf_params.resp_size + servicer = benchmark_servicer.GenericBenchmarkServicer(resp_size) + method_implementations = { + 'StreamingCall': + grpc.stream_stream_rpc_method_handler(servicer.StreamingCall + ), + 'UnaryCall': + grpc.unary_unary_rpc_method_handler(servicer.UnaryCall), + } + handler = grpc.method_handlers_generic_handler( + 'grpc.testing.BenchmarkService', method_implementations) + server.add_generic_rpc_handlers((handler,)) + else: + raise NotImplementedError('Unsupported server type {}'.format( + config.server_type)) + + if config.HasField('security_params'): # Use SSL + server_creds = grpc.ssl_server_credentials( + ((resources.private_key(), resources.certificate_chain()),)) + port = server.add_secure_port('[::]:{}'.format(config.port), + server_creds) + else: + port = server.add_insecure_port('[::]:{}'.format(config.port)) + + return server, port + + +def _get_client_status(start_time: float, end_time: float, qps_data: histogram.Histogram) -> control_pb2.ClientStatus: + latencies = qps_data.get_data() + end_time = time.time() + elapsed_time = end_time - start_time + stats = stats_pb2.ClientStats(latencies=latencies, + time_elapsed=elapsed_time, + time_user=elapsed_time, + time_system=elapsed_time) + return control_pb2.ClientStatus(stats=stats) + + +def _create_client(server: str, config: control_pb2.ClientConfig, qps_data: histogram.Histogram) -> benchmark_client.BenchmarkClient: + if config.load_params.WhichOneof('load') != 'closed_loop': + raise NotImplementedError(f'Unsupported load parameter {config.load_params}') + + if config.client_type == control_pb2.ASYNC_CLIENT: + if config.rpc_type == control_pb2.UNARY: + client_type = benchmark_client.UnaryAsyncBenchmarkClient + if config.rpc_type == control_pb2.STREAMING: + client_type = benchmark_client.StreamingAsyncBenchmarkClient + else: + raise NotImplementedError(f'Unsupported rpc_type [{config.rpc_type}]') + else: + raise NotImplementedError(f'Unsupported client type {config.client_type}') + + return client_type(server, config, qps_data) + + +class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): + """Python Worker Server implementation.""" + + def __init__(self): + self._loop = asyncio.get_event_loop() + self._quit_event = asyncio.Event() + + async def RunServer(self, request_iterator, context): + config = (await context.read()).setup + + server, port = _create_server(config) + await server.start() + start_time = time.time() + yield self._get_server_status(start_time, start_time, port) + + async for request in request_iterator: + end_time = time.time() + status = self._get_server_status(start_time, end_time, port) + if request.mark.reset: + start_time = end_time + yield status + server.stop(None) + + async def RunClient(self, request_iterator, context): + config = (await context.read()).setup + + running_tasks = [] + qps_data = histogram.Histogram(config.histogram_params.resolution, + config.histogram_params.max_possible) + start_time = time.time() + + # Create a client for each channel as asyncio.Task + for i in range(config.client_channels): + server = config.server_targets[i % len(config.server_targets)] + client = self._create_client(server, config, qps_data) + running_tasks.append(self._loop.create_task(client.run())) + + end_time = time.time() + yield self._get_client_status(start_time, end_time, qps_data) + + # Respond to stat requests + async for request in request_iterator: + end_time = time.time() + status = self._get_client_status(start_time, end_time, qps_data) + if request.mark.reset: + qps_data.reset() + start_time = time.time() + yield status + + # Cleanup the clients + for task in running_tasks: + task.cancel() + + async def CoreCount(self, request, context): + return control_pb2.CoreResponse(cores=_NUM_CORES) + + async def QuitWorker(self, request, context): + self._quit_event.set() + return control_pb2.Void() + + async def wait_for_quit(self): + await self._quit_event.wait()