Support infinite timeouts

pull/23163/head
Richard Belleville 5 years ago
parent 00d20bb70a
commit 6344b2b97f
  1. 48
      src/python/grpcio/grpc/_simple_stubs.py
  2. 426
      src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py

@ -56,6 +56,11 @@ if _DEFAULT_TIMEOUT_KEY in os.environ:
else:
_DEFAULT_TIMEOUT = 60.0
class _DefaultTimeoutSentinelType(object):
pass
_DEFAULT_TIMEOUT_SENTINEL = _DefaultTimeoutSentinelType()
def _create_channel(target: str, options: Sequence[Tuple[str, str]],
channel_credentials: Optional[grpc.ChannelCredentials],
@ -173,11 +178,12 @@ class ChannelCache:
def _get_wait_for_ready_settings(wait_for_ready: Optional[bool],
timeout: Optional[float]
) -> Tuple[bool, float]:
timeout: Union[Optional[float], _DefaultTimeoutSentinelType]
) -> Tuple[bool, Optional[float]]:
if wait_for_ready is None:
wait_for_ready = True
if wait_for_ready and timeout is None:
# TODO: You don't actually need this sentinel...
if timeout is _DEFAULT_TIMEOUT_SENTINEL:
timeout = _DEFAULT_TIMEOUT
return wait_for_ready, timeout
@ -195,7 +201,7 @@ def unary_unary(
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
) -> ResponseType:
"""Invokes a unary-unary RPC without an explicitly specified channel.
@ -240,8 +246,11 @@ def unary_unary(
becomes ready. When using this option, the user will likely also want
to set a timeout. Defaults to True.
timeout: An optional duration of time in seconds to allow for the RPC,
after which an exception will be raised. If timeout is unspecified and
wait_for_ready is True, defaults to one minute.
after which an exception will be raised. If timeout is unspecified,
defaults to a timeout controlled by the
GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
unset, defaults to 60 seconds. Supply a value of None to indicate that
no timeout should be enforced.
metadata: Optional metadata to send to the server.
Returns:
@ -274,7 +283,7 @@ def unary_stream(
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
) -> Iterator[ResponseType]:
"""Invokes a unary-stream RPC without an explicitly specified channel.
@ -318,8 +327,11 @@ def unary_stream(
becomes ready. When using this option, the user will likely also want
to set a timeout. Defaults to True.
timeout: An optional duration of time in seconds to allow for the RPC,
after which an exception will be raised. If timeout is unspecified and
wait_for_ready is True, defaults to one minute.
after which an exception will be raised. If timeout is unspecified,
defaults to a timeout controlled by the
GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
unset, defaults to 60 seconds. Supply a value of None to indicate that
no timeout should be enforced.
metadata: Optional metadata to send to the server.
Returns:
@ -352,7 +364,7 @@ def stream_unary(
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
) -> ResponseType:
"""Invokes a stream-unary RPC without an explicitly specified channel.
@ -396,8 +408,11 @@ def stream_unary(
becomes ready. When using this option, the user will likely also want
to set a timeout. Defaults to True.
timeout: An optional duration of time in seconds to allow for the RPC,
after which an exception will be raised. If timeout is unspecified and
wait_for_ready is True, defaults to one minute.
after which an exception will be raised. If timeout is unspecified,
defaults to a timeout controlled by the
GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
unset, defaults to 60 seconds. Supply a value of None to indicate that
no timeout should be enforced.
metadata: Optional metadata to send to the server.
Returns:
@ -430,7 +445,7 @@ def stream_stream(
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
) -> Iterator[ResponseType]:
"""Invokes a stream-stream RPC without an explicitly specified channel.
@ -474,8 +489,11 @@ def stream_stream(
becomes ready. When using this option, the user will likely also want
to set a timeout. Defaults to True.
timeout: An optional duration of time in seconds to allow for the RPC,
after which an exception will be raised. If timeout is unspecified and
wait_for_ready is True, defaults to one minute.
after which an exception will be raised. If timeout is unspecified,
defaults to a timeout controlled by the
GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
unset, defaults to 60 seconds. Supply a value of None to indicate that
no timeout should be enforced.
metadata: Optional metadata to send to the server.
Returns:

@ -19,8 +19,11 @@ import os
_MAXIMUM_CHANNELS = 10
_DEFAULT_TIMEOUT = 1.0
os.environ["GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"] = "2"
os.environ["GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"] = str(_MAXIMUM_CHANNELS)
os.environ["GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"] = str(_DEFAULT_TIMEOUT)
import contextlib
import datetime
@ -52,6 +55,7 @@ _UNARY_UNARY = "/test/UnaryUnary"
_UNARY_STREAM = "/test/UnaryStream"
_STREAM_UNARY = "/test/StreamUnary"
_STREAM_STREAM = "/test/StreamStream"
_BLACK_HOLE = "/test/BlackHole"
@contextlib.contextmanager
@ -82,6 +86,15 @@ def _stream_stream_handler(request_iterator, context):
yield request
def _black_hole_handler(request, context):
event = threading.Event()
def _on_done():
event.set()
context.add_callback(_on_done)
while not event.is_set():
time.sleep(0.1)
class _GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
@ -93,6 +106,8 @@ class _GenericHandler(grpc.GenericRpcHandler):
return grpc.stream_unary_rpc_method_handler(_stream_unary_handler)
elif handler_call_details.method == _STREAM_STREAM:
return grpc.stream_stream_rpc_method_handler(_stream_stream_handler)
elif handler_call_details.method == _BLACK_HOLE:
return grpc.unary_unary_rpc_method_handler(_black_hole_handler)
else:
raise NotImplementedError()
@ -163,201 +178,230 @@ class SimpleStubsTest(unittest.TestCase):
else:
self.fail(message() + " after " + str(timeout))
def test_unary_unary_insecure(self):
with _server(None) as port:
target = f'localhost:{port}'
response = grpc.experimental.unary_unary(
_REQUEST,
target,
_UNARY_UNARY,
channel_credentials=grpc.experimental.
insecure_channel_credentials())
self.assertEqual(_REQUEST, response)
def test_unary_unary_secure(self):
with _server(grpc.local_server_credentials()) as port:
target = f'localhost:{port}'
response = grpc.experimental.unary_unary(
_REQUEST,
target,
_UNARY_UNARY,
channel_credentials=grpc.local_channel_credentials())
self.assertEqual(_REQUEST, response)
def test_channels_cached(self):
with _server(grpc.local_server_credentials()) as port:
target = f'localhost:{port}'
test_name = inspect.stack()[0][3]
args = (_REQUEST, target, _UNARY_UNARY)
kwargs = {"channel_credentials": grpc.local_channel_credentials()}
def _invoke(seed: str):
run_kwargs = dict(kwargs)
run_kwargs["options"] = ((test_name + seed, ""),)
grpc.experimental.unary_unary(*args, **run_kwargs)
self.assert_cached(_invoke)
def test_channels_evicted(self):
with _server(grpc.local_server_credentials()) as port:
target = f'localhost:{port}'
response = grpc.experimental.unary_unary(
_REQUEST,
target,
_UNARY_UNARY,
channel_credentials=grpc.local_channel_credentials())
self.assert_eventually(
lambda: grpc._simple_stubs.ChannelCache.get(
)._test_only_channel_count() == 0,
message=lambda:
f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} remain"
)
def test_total_channels_enforced(self):
with _server(grpc.local_server_credentials()) as port:
target = f'localhost:{port}'
for i in range(_STRESS_EPOCHS):
# Ensure we get a new channel each time.
options = (("foo", str(i)),)
# Send messages at full blast.
grpc.experimental.unary_unary(
_REQUEST,
target,
_UNARY_UNARY,
options=options,
channel_credentials=grpc.local_channel_credentials())
self.assert_eventually(
lambda: grpc._simple_stubs.ChannelCache.get(
)._test_only_channel_count() <= _MAXIMUM_CHANNELS + 1,
message=lambda:
f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} channels remain"
)
def test_unary_stream(self):
with _server(grpc.local_server_credentials()) as port:
target = f'localhost:{port}'
for response in grpc.experimental.unary_stream(
_REQUEST,
target,
_UNARY_STREAM,
channel_credentials=grpc.local_channel_credentials()):
self.assertEqual(_REQUEST, response)
def test_stream_unary(self):
def request_iter():
for _ in range(_CLIENT_REQUEST_COUNT):
yield _REQUEST
with _server(grpc.local_server_credentials()) as port:
target = f'localhost:{port}'
response = grpc.experimental.stream_unary(
request_iter(),
target,
_STREAM_UNARY,
channel_credentials=grpc.local_channel_credentials())
self.assertEqual(_REQUEST, response)
def test_stream_stream(self):
def request_iter():
for _ in range(_CLIENT_REQUEST_COUNT):
yield _REQUEST
with _server(grpc.local_server_credentials()) as port:
target = f'localhost:{port}'
for response in grpc.experimental.stream_stream(
request_iter(),
target,
_STREAM_STREAM,
channel_credentials=grpc.local_channel_credentials()):
self.assertEqual(_REQUEST, response)
def test_default_ssl(self):
_private_key = resources.private_key()
_certificate_chain = resources.certificate_chain()
_server_certs = ((_private_key, _certificate_chain),)
_server_host_override = 'foo.test.google.fr'
_test_root_certificates = resources.test_root_certificates()
_property_options = ((
'grpc.ssl_target_name_override',
_server_host_override,
),)
cert_dir = os.path.join(os.path.dirname(resources.__file__),
"credentials")
cert_file = os.path.join(cert_dir, "ca.pem")
with _env("GRPC_DEFAULT_SSL_ROOTS_FILE_PATH", cert_file):
server_creds = grpc.ssl_server_credentials(_server_certs)
with _server(server_creds) as port:
target = f'localhost:{port}'
response = grpc.experimental.unary_unary(
_REQUEST, target, _UNARY_UNARY, options=_property_options)
def test_insecure_sugar(self):
with _server(None) as port:
target = f'localhost:{port}'
response = grpc.experimental.unary_unary(_REQUEST,
target,
_UNARY_UNARY,
insecure=True)
self.assertEqual(_REQUEST, response)
def test_insecure_sugar_mutually_exclusive(self):
# def test_unary_unary_insecure(self):
# with _server(None) as port:
# target = f'localhost:{port}'
# response = grpc.experimental.unary_unary(
# _REQUEST,
# target,
# _UNARY_UNARY,
# channel_credentials=grpc.experimental.
# insecure_channel_credentials())
# self.assertEqual(_REQUEST, response)
# def test_unary_unary_secure(self):
# with _server(grpc.local_server_credentials()) as port:
# target = f'localhost:{port}'
# response = grpc.experimental.unary_unary(
# _REQUEST,
# target,
# _UNARY_UNARY,
# channel_credentials=grpc.local_channel_credentials())
# self.assertEqual(_REQUEST, response)
# def test_channels_cached(self):
# with _server(grpc.local_server_credentials()) as port:
# target = f'localhost:{port}'
# test_name = inspect.stack()[0][3]
# args = (_REQUEST, target, _UNARY_UNARY)
# kwargs = {"channel_credentials": grpc.local_channel_credentials()}
# def _invoke(seed: str):
# run_kwargs = dict(kwargs)
# run_kwargs["options"] = ((test_name + seed, ""),)
# grpc.experimental.unary_unary(*args, **run_kwargs)
# self.assert_cached(_invoke)
# def test_channels_evicted(self):
# with _server(grpc.local_server_credentials()) as port:
# target = f'localhost:{port}'
# response = grpc.experimental.unary_unary(
# _REQUEST,
# target,
# _UNARY_UNARY,
# channel_credentials=grpc.local_channel_credentials())
# self.assert_eventually(
# lambda: grpc._simple_stubs.ChannelCache.get(
# )._test_only_channel_count() == 0,
# message=lambda:
# f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} remain"
# )
# def test_total_channels_enforced(self):
# with _server(grpc.local_server_credentials()) as port:
# target = f'localhost:{port}'
# for i in range(_STRESS_EPOCHS):
# # Ensure we get a new channel each time.
# options = (("foo", str(i)),)
# # Send messages at full blast.
# grpc.experimental.unary_unary(
# _REQUEST,
# target,
# _UNARY_UNARY,
# options=options,
# channel_credentials=grpc.local_channel_credentials())
# self.assert_eventually(
# lambda: grpc._simple_stubs.ChannelCache.get(
# )._test_only_channel_count() <= _MAXIMUM_CHANNELS + 1,
# message=lambda:
# f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} channels remain"
# )
# def test_unary_stream(self):
# with _server(grpc.local_server_credentials()) as port:
# target = f'localhost:{port}'
# for response in grpc.experimental.unary_stream(
# _REQUEST,
# target,
# _UNARY_STREAM,
# channel_credentials=grpc.local_channel_credentials()):
# self.assertEqual(_REQUEST, response)
# def test_stream_unary(self):
# def request_iter():
# for _ in range(_CLIENT_REQUEST_COUNT):
# yield _REQUEST
# with _server(grpc.local_server_credentials()) as port:
# target = f'localhost:{port}'
# response = grpc.experimental.stream_unary(
# request_iter(),
# target,
# _STREAM_UNARY,
# channel_credentials=grpc.local_channel_credentials())
# self.assertEqual(_REQUEST, response)
# def test_stream_stream(self):
# def request_iter():
# for _ in range(_CLIENT_REQUEST_COUNT):
# yield _REQUEST
# with _server(grpc.local_server_credentials()) as port:
# target = f'localhost:{port}'
# for response in grpc.experimental.stream_stream(
# request_iter(),
# target,
# _STREAM_STREAM,
# channel_credentials=grpc.local_channel_credentials()):
# self.assertEqual(_REQUEST, response)
# def test_default_ssl(self):
# _private_key = resources.private_key()
# _certificate_chain = resources.certificate_chain()
# _server_certs = ((_private_key, _certificate_chain),)
# _server_host_override = 'foo.test.google.fr'
# _test_root_certificates = resources.test_root_certificates()
# _property_options = ((
# 'grpc.ssl_target_name_override',
# _server_host_override,
# ),)
# cert_dir = os.path.join(os.path.dirname(resources.__file__),
# "credentials")
# cert_file = os.path.join(cert_dir, "ca.pem")
# with _env("GRPC_DEFAULT_SSL_ROOTS_FILE_PATH", cert_file):
# server_creds = grpc.ssl_server_credentials(_server_certs)
# with _server(server_creds) as port:
# target = f'localhost:{port}'
# response = grpc.experimental.unary_unary(
# _REQUEST, target, _UNARY_UNARY, options=_property_options)
# def test_insecure_sugar(self):
# with _server(None) as port:
# target = f'localhost:{port}'
# response = grpc.experimental.unary_unary(_REQUEST,
# target,
# _UNARY_UNARY,
# insecure=True)
# self.assertEqual(_REQUEST, response)
# def test_insecure_sugar_mutually_exclusive(self):
# with _server(None) as port:
# target = f'localhost:{port}'
# with self.assertRaises(ValueError):
# response = grpc.experimental.unary_unary(
# _REQUEST,
# target,
# _UNARY_UNARY,
# insecure=True,
# channel_credentials=grpc.local_channel_credentials())
# def test_default_wait_for_ready(self):
# addr, port, sock = get_socket()
# sock.close()
# target = f'{addr}:{port}'
# channel = grpc._simple_stubs.ChannelCache.get().get_channel(
# target, (), None, True, None)
# rpc_finished_event = threading.Event()
# rpc_failed_event = threading.Event()
# server = None
# def _on_connectivity_changed(connectivity):
# nonlocal server
# if connectivity is grpc.ChannelConnectivity.TRANSIENT_FAILURE:
# self.assertFalse(rpc_finished_event.is_set())
# self.assertFalse(rpc_failed_event.is_set())
# server = test_common.test_server()
# server.add_insecure_port(target)
# server.add_generic_rpc_handlers((_GenericHandler(),))
# server.start()
# channel.unsubscribe(_on_connectivity_changed)
# elif connectivity in (grpc.ChannelConnectivity.IDLE,
# grpc.ChannelConnectivity.CONNECTING):
# pass
# else:
# self.fail("Encountered unknown state.")
# channel.subscribe(_on_connectivity_changed)
# def _send_rpc():
# try:
# response = grpc.experimental.unary_unary(_REQUEST,
# target,
# _UNARY_UNARY,
# insecure=True)
# rpc_finished_event.set()
# except Exception as e:
# rpc_failed_event.set()
# t = threading.Thread(target=_send_rpc)
# t.start()
# t.join()
# self.assertFalse(rpc_failed_event.is_set())
# self.assertTrue(rpc_finished_event.is_set())
# if server is not None:
# server.stop(None)
def assert_times_out(self, invocation_args):
with _server(None) as port:
target = f'localhost:{port}'
with self.assertRaises(ValueError):
response = grpc.experimental.unary_unary(
_REQUEST,
target,
_UNARY_UNARY,
insecure=True,
channel_credentials=grpc.local_channel_credentials())
def test_default_wait_for_ready(self):
addr, port, sock = get_socket()
sock.close()
target = f'{addr}:{port}'
channel = grpc._simple_stubs.ChannelCache.get().get_channel(
target, (), None, True, None)
rpc_finished_event = threading.Event()
rpc_failed_event = threading.Event()
server = None
def _on_connectivity_changed(connectivity):
nonlocal server
if connectivity is grpc.ChannelConnectivity.TRANSIENT_FAILURE:
self.assertFalse(rpc_finished_event.is_set())
self.assertFalse(rpc_failed_event.is_set())
server = test_common.test_server()
server.add_insecure_port(target)
server.add_generic_rpc_handlers((_GenericHandler(),))
server.start()
channel.unsubscribe(_on_connectivity_changed)
elif connectivity in (grpc.ChannelConnectivity.IDLE,
grpc.ChannelConnectivity.CONNECTING):
pass
else:
self.fail("Encountered unknown state.")
channel.subscribe(_on_connectivity_changed)
def _send_rpc():
try:
with self.assertRaises(grpc.RpcError) as cm:
response = grpc.experimental.unary_unary(_REQUEST,
target,
_UNARY_UNARY,
insecure=True)
rpc_finished_event.set()
except Exception as e:
rpc_failed_event.set()
t = threading.Thread(target=_send_rpc)
t.start()
t.join()
self.assertFalse(rpc_failed_event.is_set())
self.assertTrue(rpc_finished_event.is_set())
if server is not None:
server.stop(None)
_BLACK_HOLE,
insecure=True,
**invocation_args)
self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, cm.exception.code())
def test_default_timeout(self):
not_present = object()
wait_for_ready_values = [True, not_present]
timeout_values = [0.5, not_present]
cases = []
for wait_for_ready in wait_for_ready_values:
for timeout in timeout_values:
case = {}
if timeout is not not_present:
case["timeout"] = timeout
if wait_for_ready is not not_present:
case["wait_for_ready"] = wait_for_ready
cases.append(case)
for case in cases:
with self.subTest(**case):
self.assert_times_out(case)
if __name__ == "__main__":

Loading…
Cancel
Save