From 9ff053f140d10af9a196869ea82091bfbf5fece4 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 22 Oct 2019 22:59:34 -0700 Subject: [PATCH] Yapf --- src/python/grpcio/grpc/__init__.py | 4 +- src/python/grpcio/grpc/_channel.py | 59 +++++++++++-------- .../tests/stress/unary_stream_benchmark.py | 57 ++++++++++-------- .../tests/unit/_metadata_code_details_test.py | 1 - 4 files changed, 71 insertions(+), 50 deletions(-) diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index d62ab4b075c..bfdeb7db155 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -1954,14 +1954,14 @@ class Compression(enum.IntEnum): class ChannelOptions(object): - """Indicates a channel option unique to gRPC Python. + """Indicates a channel option unique to gRPC Python. This enumeration is part of an EXPERIMENTAL API. Attributes: SingleThreadedUnaryStream: Perform unary-stream RPCs on a single thread. """ - SingleThreadedUnaryStream = "SingleThreadedUnaryStream" + SingleThreadedUnaryStream = "SingleThreadedUnaryStream" ################################### __all__ ################################# diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 26351f3494d..f51b39c4e3a 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -266,6 +266,7 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to _deadline: A float representing the deadline of the RPC in seconds. Or possibly None, to represent an RPC with no deadline at all. """ + def __init__(self, state, call, response_deserializer, deadline): super(_SingleThreadedRendezvous, self).__init__() self._state = state @@ -292,7 +293,8 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to if self._state.code is None: code = grpc.StatusCode.CANCELLED details = 'Locally cancelled by application!' - self._call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) + self._call.cancel( + _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) self._state.cancelled = True _abort(self._state, code, details) self._state.condition.notify_all() @@ -352,7 +354,8 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to def _next(self): with self._state.condition: if self._state.code is None: - operating = self._call.operate((cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None) + operating = self._call.operate( + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None) if operating: self._state.due.add(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: @@ -362,15 +365,16 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to while True: event = self._call.next_event() with self._state.condition: - callbacks = _handle_event(event, self._state, self._response_deserializer) + callbacks = _handle_event(event, self._state, + self._response_deserializer) for callback in callbacks: try: callback() except Exception as e: # pylint: disable=broad-except # NOTE(rbellevi): We suppress but log errors here so as not to # kill the channel spin thread. - logging.error('Exception in callback %s: %s', repr( - callback.func), repr(e)) + logging.error('Exception in callback %s: %s', + repr(callback.func), repr(e)) if self._state.response is not None: response = self._state.response self._state.response = None @@ -402,7 +406,8 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to def _repr(self): with self._state.condition: if self._state.code is None: - return '<{} object of in-flight RPC>'.format(self.__class__.__name__) + return '<{} object of in-flight RPC>'.format( + self.__class__.__name__) elif self._state.code is grpc.StatusCode.OK: return _OK_RENDEZVOUS_REPR_FORMAT.format( self._state.code, self._state.details) @@ -441,7 +446,8 @@ class _Rendezvous(_SingleThreadedRendezvous, grpc.Future): # pylint: disable=to """ def __init__(self, state, call, response_deserializer, deadline): - super(_Rendezvous, self).__init__(state, call, response_deserializer, deadline) + super(_Rendezvous, self).__init__(state, call, response_deserializer, + deadline) def cancelled(self): with self._state.condition: @@ -737,10 +743,11 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): wait_for_ready=None, compression=None): deadline = _deadline(timeout) - serialized_request = _common.serialize(request, self._request_serializer) + serialized_request = _common.serialize(request, + self._request_serializer) if serialized_request is None: - raise _RPCState((), (), (), grpc.StatusCode.INTERNAL, - 'Exception serializing request!') + raise _RPCState((), (), (), grpc.StatusCode.INTERNAL, + 'Exception serializing request!') state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) call_credentials = None if credentials is None else credentials._credentials @@ -750,16 +757,18 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): metadata, compression) operations_and_tags = ( ((cygrpc.SendInitialMetadataOperation(augmented_metadata, - initial_metadata_flags), - cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)), None), + initial_metadata_flags), + cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)), None), ((cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None), ) - call = self._channel.segregated_call(cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, - self._method, None, _determine_deadline(deadline), - metadata, call_credentials, operations_and_tags, self._context) - return _SingleThreadedRendezvous(state, call, self._response_deserializer, deadline) + call = self._channel.segregated_call( + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, _determine_deadline(deadline), metadata, call_credentials, + operations_and_tags, self._context) + return _SingleThreadedRendezvous(state, call, + self._response_deserializer, deadline) class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): @@ -1233,12 +1242,16 @@ class Channel(grpc.Channel): # remains the default. if self._single_threaded_unary_stream: return _SingleThreadedUnaryStreamMultiCallable( - self._channel, _channel_managed_call_management(self._call_state), - _common.encode(method), request_serializer, response_deserializer) + self._channel, _channel_managed_call_management( + self._call_state), _common.encode(method), + request_serializer, response_deserializer) else: - return _UnaryStreamMultiCallable( - self._channel, _channel_managed_call_management(self._call_state), - _common.encode(method), request_serializer, response_deserializer) + return _UnaryStreamMultiCallable(self._channel, + _channel_managed_call_management( + self._call_state), + _common.encode(method), + request_serializer, + response_deserializer) def stream_unary(self, method, diff --git a/src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py b/src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py index a01564bc429..06b392ca280 100644 --- a/src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py +++ b/src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py @@ -15,7 +15,6 @@ _PORT = 5741 _MESSAGE_SIZE = 4 _RESPONSE_COUNT = 32 * 1024 - _SERVER_CODE = """ import datetime import threading @@ -48,33 +47,43 @@ _GRPC_CHANNEL_OPTIONS = [ @contextlib.contextmanager def _running_server(): - server_process = subprocess.Popen([sys.executable, '-c', _SERVER_CODE], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - try: - yield - finally: - server_process.terminate() - server_process.wait() - sys.stdout.write("stdout: {}".format(server_process.stdout.read())); sys.stdout.flush() - sys.stdout.write("stderr: {}".format(server_process.stderr.read())); sys.stdout.flush() + server_process = subprocess.Popen( + [sys.executable, '-c', _SERVER_CODE], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + try: + yield + finally: + server_process.terminate() + server_process.wait() + sys.stdout.write("stdout: {}".format(server_process.stdout.read())) + sys.stdout.flush() + sys.stdout.write("stderr: {}".format(server_process.stderr.read())) + sys.stdout.flush() + def profile(message_size, response_count): - request = unary_stream_benchmark_pb2.BenchmarkRequest(message_size=message_size, response_count=response_count) - with grpc.insecure_channel('[::]:{}'.format(_PORT), options=_GRPC_CHANNEL_OPTIONS) as channel: - stub = unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub(channel) - start = datetime.datetime.now() - call = stub.Benchmark(request, wait_for_ready=True) - for message in call: - pass - end = datetime.datetime.now() - return end - start + request = unary_stream_benchmark_pb2.BenchmarkRequest( + message_size=message_size, response_count=response_count) + with grpc.insecure_channel( + '[::]:{}'.format(_PORT), options=_GRPC_CHANNEL_OPTIONS) as channel: + stub = unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub( + channel) + start = datetime.datetime.now() + call = stub.Benchmark(request, wait_for_ready=True) + for message in call: + pass + end = datetime.datetime.now() + return end - start + def main(): - with _running_server(): - for i in range(1000): - latency = profile(_MESSAGE_SIZE, 1024) - sys.stdout.write("{}\n".format(latency.total_seconds())) - sys.stdout.flush() + with _running_server(): + for i in range(1000): + latency = profile(_MESSAGE_SIZE, 1024) + sys.stdout.write("{}\n".format(latency.total_seconds())) + sys.stdout.flush() if __name__ == '__main__': - main() + main() diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index 073412fc7f5..8831862f35a 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -347,7 +347,6 @@ class MetadataCodeDetailsTest(unittest.TestCase): self._servicer.set_details(_DETAILS) self._servicer.set_abort_call() - response_iterator_call = self._unary_stream( _SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) # NOTE: In the single-threaded case, we cannot grab the initial_metadata