pull/20753/head
Richard Belleville 5 years ago
parent 420c9bceff
commit 9ff053f140
  1. 4
      src/python/grpcio/grpc/__init__.py
  2. 59
      src/python/grpcio/grpc/_channel.py
  3. 57
      src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py
  4. 1
      src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py

@ -1954,14 +1954,14 @@ class Compression(enum.IntEnum):
class ChannelOptions(object): class ChannelOptions(object):
"""Indicates a channel option unique to gRPC Python. """Indicates a channel option unique to gRPC Python.
This enumeration is part of an EXPERIMENTAL API. This enumeration is part of an EXPERIMENTAL API.
Attributes: Attributes:
SingleThreadedUnaryStream: Perform unary-stream RPCs on a single thread. SingleThreadedUnaryStream: Perform unary-stream RPCs on a single thread.
""" """
SingleThreadedUnaryStream = "SingleThreadedUnaryStream" SingleThreadedUnaryStream = "SingleThreadedUnaryStream"
################################### __all__ ################################# ################################### __all__ #################################

@ -266,6 +266,7 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to
_deadline: A float representing the deadline of the RPC in seconds. Or _deadline: A float representing the deadline of the RPC in seconds. Or
possibly None, to represent an RPC with no deadline at all. possibly None, to represent an RPC with no deadline at all.
""" """
def __init__(self, state, call, response_deserializer, deadline): def __init__(self, state, call, response_deserializer, deadline):
super(_SingleThreadedRendezvous, self).__init__() super(_SingleThreadedRendezvous, self).__init__()
self._state = state self._state = state
@ -292,7 +293,8 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to
if self._state.code is None: if self._state.code is None:
code = grpc.StatusCode.CANCELLED code = grpc.StatusCode.CANCELLED
details = 'Locally cancelled by application!' details = 'Locally cancelled by application!'
self._call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) self._call.cancel(
_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
self._state.cancelled = True self._state.cancelled = True
_abort(self._state, code, details) _abort(self._state, code, details)
self._state.condition.notify_all() self._state.condition.notify_all()
@ -352,7 +354,8 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to
def _next(self): def _next(self):
with self._state.condition: with self._state.condition:
if self._state.code is None: if self._state.code is None:
operating = self._call.operate((cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None) operating = self._call.operate(
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)
if operating: if operating:
self._state.due.add(cygrpc.OperationType.receive_message) self._state.due.add(cygrpc.OperationType.receive_message)
elif self._state.code is grpc.StatusCode.OK: elif self._state.code is grpc.StatusCode.OK:
@ -362,15 +365,16 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to
while True: while True:
event = self._call.next_event() event = self._call.next_event()
with self._state.condition: with self._state.condition:
callbacks = _handle_event(event, self._state, self._response_deserializer) callbacks = _handle_event(event, self._state,
self._response_deserializer)
for callback in callbacks: for callback in callbacks:
try: try:
callback() callback()
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
# NOTE(rbellevi): We suppress but log errors here so as not to # NOTE(rbellevi): We suppress but log errors here so as not to
# kill the channel spin thread. # kill the channel spin thread.
logging.error('Exception in callback %s: %s', repr( logging.error('Exception in callback %s: %s',
callback.func), repr(e)) repr(callback.func), repr(e))
if self._state.response is not None: if self._state.response is not None:
response = self._state.response response = self._state.response
self._state.response = None self._state.response = None
@ -402,7 +406,8 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to
def _repr(self): def _repr(self):
with self._state.condition: with self._state.condition:
if self._state.code is None: if self._state.code is None:
return '<{} object of in-flight RPC>'.format(self.__class__.__name__) return '<{} object of in-flight RPC>'.format(
self.__class__.__name__)
elif self._state.code is grpc.StatusCode.OK: elif self._state.code is grpc.StatusCode.OK:
return _OK_RENDEZVOUS_REPR_FORMAT.format( return _OK_RENDEZVOUS_REPR_FORMAT.format(
self._state.code, self._state.details) self._state.code, self._state.details)
@ -441,7 +446,8 @@ class _Rendezvous(_SingleThreadedRendezvous, grpc.Future): # pylint: disable=to
""" """
def __init__(self, state, call, response_deserializer, deadline): def __init__(self, state, call, response_deserializer, deadline):
super(_Rendezvous, self).__init__(state, call, response_deserializer, deadline) super(_Rendezvous, self).__init__(state, call, response_deserializer,
deadline)
def cancelled(self): def cancelled(self):
with self._state.condition: with self._state.condition:
@ -737,10 +743,11 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
wait_for_ready=None, wait_for_ready=None,
compression=None): compression=None):
deadline = _deadline(timeout) deadline = _deadline(timeout)
serialized_request = _common.serialize(request, self._request_serializer) serialized_request = _common.serialize(request,
self._request_serializer)
if serialized_request is None: if serialized_request is None:
raise _RPCState((), (), (), grpc.StatusCode.INTERNAL, raise _RPCState((), (), (), grpc.StatusCode.INTERNAL,
'Exception serializing request!') 'Exception serializing request!')
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
call_credentials = None if credentials is None else credentials._credentials call_credentials = None if credentials is None else credentials._credentials
@ -750,16 +757,18 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
metadata, compression) metadata, compression)
operations_and_tags = ( operations_and_tags = (
((cygrpc.SendInitialMetadataOperation(augmented_metadata, ((cygrpc.SendInitialMetadataOperation(augmented_metadata,
initial_metadata_flags), initial_metadata_flags),
cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)), None), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)), None),
((cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None), ((cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None),
) )
call = self._channel.segregated_call(cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, call = self._channel.segregated_call(
self._method, None, _determine_deadline(deadline), cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
metadata, call_credentials, operations_and_tags, self._context) None, _determine_deadline(deadline), metadata, call_credentials,
return _SingleThreadedRendezvous(state, call, self._response_deserializer, deadline) operations_and_tags, self._context)
return _SingleThreadedRendezvous(state, call,
self._response_deserializer, deadline)
class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
@ -1233,12 +1242,16 @@ class Channel(grpc.Channel):
# remains the default. # remains the default.
if self._single_threaded_unary_stream: if self._single_threaded_unary_stream:
return _SingleThreadedUnaryStreamMultiCallable( return _SingleThreadedUnaryStreamMultiCallable(
self._channel, _channel_managed_call_management(self._call_state), self._channel, _channel_managed_call_management(
_common.encode(method), request_serializer, response_deserializer) self._call_state), _common.encode(method),
request_serializer, response_deserializer)
else: else:
return _UnaryStreamMultiCallable( return _UnaryStreamMultiCallable(self._channel,
self._channel, _channel_managed_call_management(self._call_state), _channel_managed_call_management(
_common.encode(method), request_serializer, response_deserializer) self._call_state),
_common.encode(method),
request_serializer,
response_deserializer)
def stream_unary(self, def stream_unary(self,
method, method,

@ -15,7 +15,6 @@ _PORT = 5741
_MESSAGE_SIZE = 4 _MESSAGE_SIZE = 4
_RESPONSE_COUNT = 32 * 1024 _RESPONSE_COUNT = 32 * 1024
_SERVER_CODE = """ _SERVER_CODE = """
import datetime import datetime
import threading import threading
@ -48,33 +47,43 @@ _GRPC_CHANNEL_OPTIONS = [
@contextlib.contextmanager @contextlib.contextmanager
def _running_server(): def _running_server():
server_process = subprocess.Popen([sys.executable, '-c', _SERVER_CODE], stdout=subprocess.PIPE, stderr=subprocess.PIPE) server_process = subprocess.Popen(
try: [sys.executable, '-c', _SERVER_CODE],
yield stdout=subprocess.PIPE,
finally: stderr=subprocess.PIPE)
server_process.terminate() try:
server_process.wait() yield
sys.stdout.write("stdout: {}".format(server_process.stdout.read())); sys.stdout.flush() finally:
sys.stdout.write("stderr: {}".format(server_process.stderr.read())); sys.stdout.flush() server_process.terminate()
server_process.wait()
sys.stdout.write("stdout: {}".format(server_process.stdout.read()))
sys.stdout.flush()
sys.stdout.write("stderr: {}".format(server_process.stderr.read()))
sys.stdout.flush()
def profile(message_size, response_count): def profile(message_size, response_count):
request = unary_stream_benchmark_pb2.BenchmarkRequest(message_size=message_size, response_count=response_count) request = unary_stream_benchmark_pb2.BenchmarkRequest(
with grpc.insecure_channel('[::]:{}'.format(_PORT), options=_GRPC_CHANNEL_OPTIONS) as channel: message_size=message_size, response_count=response_count)
stub = unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub(channel) with grpc.insecure_channel(
start = datetime.datetime.now() '[::]:{}'.format(_PORT), options=_GRPC_CHANNEL_OPTIONS) as channel:
call = stub.Benchmark(request, wait_for_ready=True) stub = unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub(
for message in call: channel)
pass start = datetime.datetime.now()
end = datetime.datetime.now() call = stub.Benchmark(request, wait_for_ready=True)
return end - start for message in call:
pass
end = datetime.datetime.now()
return end - start
def main(): def main():
with _running_server(): with _running_server():
for i in range(1000): for i in range(1000):
latency = profile(_MESSAGE_SIZE, 1024) latency = profile(_MESSAGE_SIZE, 1024)
sys.stdout.write("{}\n".format(latency.total_seconds())) sys.stdout.write("{}\n".format(latency.total_seconds()))
sys.stdout.flush() sys.stdout.flush()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

@ -347,7 +347,6 @@ class MetadataCodeDetailsTest(unittest.TestCase):
self._servicer.set_details(_DETAILS) self._servicer.set_details(_DETAILS)
self._servicer.set_abort_call() self._servicer.set_abort_call()
response_iterator_call = self._unary_stream( response_iterator_call = self._unary_stream(
_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) _SERIALIZED_REQUEST, metadata=_CLIENT_METADATA)
# NOTE: In the single-threaded case, we cannot grab the initial_metadata # NOTE: In the single-threaded case, we cannot grab the initial_metadata

Loading…
Cancel
Save