|
|
|
@ -185,48 +185,3 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): |
|
|
|
|
yield request |
|
|
|
|
except queue.Empty: |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AsyncReceiver(face.ResponseReceiver): |
|
|
|
|
"""Receiver for async stream responses.""" |
|
|
|
|
|
|
|
|
|
def __init__(self, send_time_queue, response_handler): |
|
|
|
|
self._send_time_queue = send_time_queue |
|
|
|
|
self._response_handler = response_handler |
|
|
|
|
|
|
|
|
|
def initial_metadata(self, initial_mdetadata): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
def response(self, response): |
|
|
|
|
end_time = time.time() |
|
|
|
|
self._response_handler(end_time - self._send_time_queue.get_nowait()) |
|
|
|
|
|
|
|
|
|
def complete(self, terminal_metadata, code, details): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StreamingAsyncBenchmarkClient(BenchmarkClient): |
|
|
|
|
|
|
|
|
|
def __init__(self, server, config, hist): |
|
|
|
|
super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) |
|
|
|
|
self._send_time_queue = queue.Queue() |
|
|
|
|
self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response) |
|
|
|
|
self._rendezvous = None |
|
|
|
|
|
|
|
|
|
def send_request(self): |
|
|
|
|
if self._rendezvous is not None: |
|
|
|
|
self._send_time_queue.put(time.time()) |
|
|
|
|
self._rendezvous.consume(self._request) |
|
|
|
|
|
|
|
|
|
def start(self): |
|
|
|
|
if self._generic: |
|
|
|
|
stream_callable = self._stub.stream_stream( |
|
|
|
|
'grpc.testing.BenchmarkService', 'StreamingCall') |
|
|
|
|
else: |
|
|
|
|
stream_callable = self._stub.StreamingCall |
|
|
|
|
self._rendezvous = stream_callable.event( |
|
|
|
|
self._receiver, lambda *args: None, _TIMEOUT) |
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
|
self._rendezvous.terminate() |
|
|
|
|
self._rendezvous = None |
|
|
|
|