|
|
|
@ -82,6 +82,7 @@ class BenchmarkClient: |
|
|
|
|
self._response_callbacks = [] |
|
|
|
|
|
|
|
|
|
def add_response_callback(self, callback): |
|
|
|
|
"""callback will be invoked as callback(client, query_time)""" |
|
|
|
|
self._response_callbacks.append(callback) |
|
|
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
@ -95,10 +96,10 @@ class BenchmarkClient: |
|
|
|
|
def stop(self): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
def _handle_response(self, query_time): |
|
|
|
|
def _handle_response(self, client, query_time): |
|
|
|
|
self._hist.add(query_time * 1e9) # Report times in nanoseconds |
|
|
|
|
for callback in self._response_callbacks: |
|
|
|
|
callback(query_time) |
|
|
|
|
callback(client, query_time) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UnarySyncBenchmarkClient(BenchmarkClient): |
|
|
|
@ -121,7 +122,7 @@ class UnarySyncBenchmarkClient(BenchmarkClient): |
|
|
|
|
start_time = time.time() |
|
|
|
|
self._stub.UnaryCall(self._request, _TIMEOUT) |
|
|
|
|
end_time = time.time() |
|
|
|
|
self._handle_response(end_time - start_time) |
|
|
|
|
self._handle_response(self, end_time - start_time) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UnaryAsyncBenchmarkClient(BenchmarkClient): |
|
|
|
@ -136,19 +137,20 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient): |
|
|
|
|
def _response_received(self, start_time, resp): |
|
|
|
|
resp.result() |
|
|
|
|
end_time = time.time() |
|
|
|
|
self._handle_response(end_time - start_time) |
|
|
|
|
self._handle_response(self, end_time - start_time) |
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
|
self._stub = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StreamingSyncBenchmarkClient(BenchmarkClient): |
|
|
|
|
class _SyncStream(object): |
|
|
|
|
|
|
|
|
|
def __init__(self, server, config, hist): |
|
|
|
|
super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) |
|
|
|
|
def __init__(self, stub, generic, request, handle_response): |
|
|
|
|
self._stub = stub |
|
|
|
|
self._generic = generic |
|
|
|
|
self._request = request |
|
|
|
|
self._handle_response = handle_response |
|
|
|
|
self._is_streaming = False |
|
|
|
|
self._pool = futures.ThreadPoolExecutor(max_workers=1) |
|
|
|
|
# Use a thread-safe queue to put requests on the stream |
|
|
|
|
self._request_queue = queue.Queue() |
|
|
|
|
self._send_time_queue = queue.Queue() |
|
|
|
|
|
|
|
|
@ -157,15 +159,6 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): |
|
|
|
|
self._request_queue.put(self._request) |
|
|
|
|
|
|
|
|
|
def start(self): |
|
|
|
|
self._is_streaming = True |
|
|
|
|
self._pool.submit(self._request_stream) |
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
|
self._is_streaming = False |
|
|
|
|
self._pool.shutdown(wait=True) |
|
|
|
|
self._stub = None |
|
|
|
|
|
|
|
|
|
def _request_stream(self): |
|
|
|
|
self._is_streaming = True |
|
|
|
|
if self._generic: |
|
|
|
|
stream_callable = self._stub.stream_stream( |
|
|
|
@ -175,8 +168,11 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): |
|
|
|
|
|
|
|
|
|
response_stream = stream_callable(self._request_generator(), _TIMEOUT) |
|
|
|
|
for _ in response_stream: |
|
|
|
|
end_time = time.time() |
|
|
|
|
self._handle_response(end_time - self._send_time_queue.get_nowait()) |
|
|
|
|
self._handle_response( |
|
|
|
|
self, time.time() - self._send_time_queue.get_nowait()) |
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
|
self._is_streaming = False |
|
|
|
|
|
|
|
|
|
def _request_generator(self): |
|
|
|
|
while self._is_streaming: |
|
|
|
@ -185,3 +181,30 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): |
|
|
|
|
yield request |
|
|
|
|
except queue.Empty: |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StreamingSyncBenchmarkClient(BenchmarkClient): |
|
|
|
|
|
|
|
|
|
def __init__(self, server, config, hist): |
|
|
|
|
super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) |
|
|
|
|
self._pool = futures.ThreadPoolExecutor( |
|
|
|
|
max_workers=config.outstanding_rpcs_per_channel) |
|
|
|
|
self._streams = [_SyncStream(self._stub, self._generic, |
|
|
|
|
self._request, self._handle_response) |
|
|
|
|
for _ in xrange(config.outstanding_rpcs_per_channel)] |
|
|
|
|
self._curr_stream = 0 |
|
|
|
|
|
|
|
|
|
def send_request(self): |
|
|
|
|
# Use a round_robin scheduler to determine what stream to send on |
|
|
|
|
self._streams[self._curr_stream].send_request() |
|
|
|
|
self._curr_stream = (self._curr_stream + 1) % len(self._streams) |
|
|
|
|
|
|
|
|
|
def start(self): |
|
|
|
|
for stream in self._streams: |
|
|
|
|
self._pool.submit(stream.start) |
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
|
for stream in self._streams: |
|
|
|
|
stream.stop() |
|
|
|
|
self._pool.shutdown(wait=True) |
|
|
|
|
self._stub = None |
|
|
|
|