|
|
|
@ -13,6 +13,7 @@ |
|
|
|
|
# limitations under the License. |
|
|
|
|
|
|
|
|
|
import asyncio |
|
|
|
|
import logging |
|
|
|
|
import multiprocessing |
|
|
|
|
import time |
|
|
|
|
from typing import Tuple |
|
|
|
@ -117,11 +118,11 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): |
|
|
|
|
server, port = _create_server(config) |
|
|
|
|
await server.start() |
|
|
|
|
start_time = time.time() |
|
|
|
|
yield self._get_server_status(start_time, start_time, port) |
|
|
|
|
yield _get_server_status(start_time, start_time, port) |
|
|
|
|
|
|
|
|
|
async for request in request_iterator: |
|
|
|
|
end_time = time.time() |
|
|
|
|
status = self._get_server_status(start_time, end_time, port) |
|
|
|
|
status = _get_server_status(start_time, end_time, port) |
|
|
|
|
if request.mark.reset: |
|
|
|
|
start_time = end_time |
|
|
|
|
yield status |
|
|
|
@ -138,16 +139,16 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): |
|
|
|
|
# Create a client for each channel as asyncio.Task |
|
|
|
|
for i in range(config.client_channels): |
|
|
|
|
server = config.server_targets[i % len(config.server_targets)] |
|
|
|
|
client = self._create_client(server, config, qps_data) |
|
|
|
|
client = _create_client(server, config, qps_data) |
|
|
|
|
running_tasks.append(self._loop.create_task(client.run())) |
|
|
|
|
|
|
|
|
|
end_time = time.time() |
|
|
|
|
yield self._get_client_status(start_time, end_time, qps_data) |
|
|
|
|
yield _get_client_status(start_time, end_time, qps_data) |
|
|
|
|
|
|
|
|
|
# Respond to stat requests |
|
|
|
|
async for request in request_iterator: |
|
|
|
|
end_time = time.time() |
|
|
|
|
status = self._get_client_status(start_time, end_time, qps_data) |
|
|
|
|
status = _get_client_status(start_time, end_time, qps_data) |
|
|
|
|
if request.mark.reset: |
|
|
|
|
qps_data.reset() |
|
|
|
|
start_time = time.time() |
|
|
|
|