|
|
|
@ -116,11 +116,17 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer |
|
|
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _start_rpc(request_id: int, stub: test_pb2_grpc.TestServiceStub, |
|
|
|
|
def _start_rpc(method: str, request_id: int, stub: test_pb2_grpc.TestServiceStub, |
|
|
|
|
timeout: float, futures: Mapping[int, grpc.Future]) -> None: |
|
|
|
|
logger.info(f"Sending request to backend: {request_id}") |
|
|
|
|
future = stub.UnaryCall.future(messages_pb2.SimpleRequest(), |
|
|
|
|
timeout=timeout) |
|
|
|
|
if method == "UnaryCall": |
|
|
|
|
future = stub.UnaryCall.future(messages_pb2.SimpleRequest(), |
|
|
|
|
timeout=timeout) |
|
|
|
|
elif method == "EmptyCall": |
|
|
|
|
future = stub.EmptyCall.future(empty_pb2.Empty(), |
|
|
|
|
timeout=timeout) |
|
|
|
|
else: |
|
|
|
|
raise ValueError(f"Unrecognized method '{method}'.") |
|
|
|
|
futures[request_id] = future |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -165,7 +171,7 @@ def _cancel_all_rpcs(futures: Mapping[int, grpc.Future]) -> None: |
|
|
|
|
future.cancel() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _run_single_channel(qps: int, server: str, rpc_timeout_sec: int, print_response: bool): |
|
|
|
|
def _run_single_channel(method: str, qps: int, server: str, rpc_timeout_sec: int, print_response: bool): |
|
|
|
|
global _global_rpc_id # pylint: disable=global-statement |
|
|
|
|
duration_per_query = 1.0 / float(qps) |
|
|
|
|
with grpc.insecure_channel(server) as channel: |
|
|
|
@ -178,7 +184,7 @@ def _run_single_channel(qps: int, server: str, rpc_timeout_sec: int, print_respo |
|
|
|
|
_global_rpc_id += 1 |
|
|
|
|
start = time.time() |
|
|
|
|
end = start + duration_per_query |
|
|
|
|
_start_rpc(request_id, stub, float(rpc_timeout_sec), futures) |
|
|
|
|
_start_rpc(method, request_id, stub, float(rpc_timeout_sec), futures) |
|
|
|
|
_remove_completed_rpcs(futures, print_response) |
|
|
|
|
logger.debug(f"Currently {len(futures)} in-flight RPCs") |
|
|
|
|
now = time.time() |
|
|
|
@ -193,7 +199,7 @@ def _run(args: argparse.Namespace) -> None: |
|
|
|
|
global _global_server # pylint: disable=global-statement |
|
|
|
|
channel_threads: List[threading.Thread] = [] |
|
|
|
|
for i in range(args.num_channels): |
|
|
|
|
thread = threading.Thread(target=_run_single_channel, args=(args.qps, args.server, args.rpc_timeout_sec, args.print_response,)) |
|
|
|
|
thread = threading.Thread(target=_run_single_channel, args=('UnaryCall', args.qps, args.server, args.rpc_timeout_sec, args.print_response,)) |
|
|
|
|
thread.start() |
|
|
|
|
channel_threads.append(thread) |
|
|
|
|
_global_server = grpc.server(futures.ThreadPoolExecutor()) |
|
|
|
|