diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index c8d1e92edfb..d62ab4b075c 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -1953,6 +1953,17 @@ class Compression(enum.IntEnum): Gzip = _compression.Gzip +class ChannelOptions(object): + """Indicates a channel option unique to gRPC Python. + + This enumeration is part of an EXPERIMENTAL API. + + Attributes: + SingleThreadedUnaryStream: Perform unary-stream RPCs on a single thread. + """ + SingleThreadedUnaryStream = "SingleThreadedUnaryStream" + + ################################### __all__ ################################# __all__ = ( diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 9244034382b..d169a2a5abd 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -1176,6 +1176,18 @@ def _augment_options(base_options, compression): ),) +def _separate_channel_options(options): + """Separates core channel options from Python channel options.""" + core_options = [] + python_options = [] + for pair in options: + if pair[0] == grpc.ChannelOptions.SingleThreadedUnaryStream: + python_options.append(pair) + else: + core_options.append(pair) + return python_options, core_options + + class Channel(grpc.Channel): """A cygrpc.Channel-backed implementation of grpc.Channel.""" @@ -1189,13 +1201,22 @@ class Channel(grpc.Channel): compression: An optional value indicating the compression method to be used over the lifetime of the channel. """ + python_options, core_options = _separate_channel_options(options) + self._single_threaded_unary_stream = False + self._process_python_options(python_options) self._channel = cygrpc.Channel( - _common.encode(target), _augment_options(options, compression), + _common.encode(target), _augment_options(core_options, compression), credentials) self._call_state = _ChannelCallState(self._channel) self._connectivity_state = _ChannelConnectivityState(self._channel) cygrpc.fork_register_channel(self) + def _process_python_options(self, python_options): + """Sets channel attributes according to python-only channel options.""" + for pair in python_options: + if pair[0] == grpc.ChannelOptions.SingleThreadedUnaryStream: + self._single_threaded_unary_stream = True + def subscribe(self, callback, try_to_connect=None): _subscribe(self._connectivity_state, callback, try_to_connect) @@ -1214,12 +1235,18 @@ class Channel(grpc.Channel): method, request_serializer=None, response_deserializer=None): - # return _UnaryStreamMultiCallable( - # self._channel, _channel_managed_call_management(self._call_state), - # _common.encode(method), request_serializer, response_deserializer) - return _SingleThreadedUnaryStreamMultiCallable( - self._channel, _channel_managed_call_management(self._call_state), - _common.encode(method), request_serializer, response_deserializer) + # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC + # on a single Python thread results in an appreciable speed-up. However, + # due to slight differences in capability, the multi-threaded variant' + # remains the default. + if self._single_threaded_unary_stream: + return _SingleThreadedUnaryStreamMultiCallable( + self._channel, _channel_managed_call_management(self._call_state), + _common.encode(method), request_serializer, response_deserializer) + else: + return _UnaryStreamMultiCallable( + self._channel, _channel_managed_call_management(self._call_state), + _common.encode(method), request_serializer, response_deserializer) def stream_unary(self, method, diff --git a/src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py b/src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py index 81c74ec8faa..a01564bc429 100644 --- a/src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py +++ b/src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py @@ -41,7 +41,9 @@ server.wait_for_termination() _GRPC_CHANNEL_OPTIONS = [ ('grpc.max_metadata_size', 16 * 1024 * 1024), - ('grpc.max_receive_message_length', 64 * 1024 * 1024)] + ('grpc.max_receive_message_length', 64 * 1024 * 1024), + (grpc.ChannelOptions.SingleThreadedUnaryStream, 1), +] @contextlib.contextmanager