Merge pull request #16919 from lidizheng/wait-for-ready

Add wait-for-ready semantics
pull/17141/head
Lidi Zheng 6 years ago committed by GitHub
commit 678ea08950
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 57
      src/python/grpcio/grpc/__init__.py
  2. 121
      src/python/grpcio/grpc/_channel.py
  3. 4
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  4. 6
      src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi
  5. 126
      src/python/grpcio/grpc/_interceptor.py
  6. 1
      src/python/grpcio_tests/tests/tests.json
  7. 251
      src/python/grpcio_tests/tests/unit/_metadata_flags_test.py
  8. 26
      src/python/grpcio_tests/tests/unit/test_common.py

@ -387,6 +387,7 @@ class ClientCallDetails(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to metadata: Optional :term:`metadata` to be transmitted to
the service-side of the RPC. the service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
wait_for_ready: An optional flag to enable wait for ready mechanism.
""" """
@ -638,7 +639,12 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-unary RPC from client-side.""" """Affords invoking a unary-unary RPC from client-side."""
@abc.abstractmethod @abc.abstractmethod
def __call__(self, request, timeout=None, metadata=None, credentials=None): def __call__(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
"""Synchronously invokes the underlying RPC. """Synchronously invokes the underlying RPC.
Args: Args:
@ -648,6 +654,8 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
wait_for_ready: An optional flag to enable wait for ready
mechanism
Returns: Returns:
The response value for the RPC. The response value for the RPC.
@ -660,7 +668,12 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def with_call(self, request, timeout=None, metadata=None, credentials=None): def with_call(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
"""Synchronously invokes the underlying RPC. """Synchronously invokes the underlying RPC.
Args: Args:
@ -670,6 +683,8 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
wait_for_ready: An optional flag to enable wait for ready
mechanism
Returns: Returns:
The response value for the RPC and a Call value for the RPC. The response value for the RPC and a Call value for the RPC.
@ -682,7 +697,12 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def future(self, request, timeout=None, metadata=None, credentials=None): def future(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
"""Asynchronously invokes the underlying RPC. """Asynchronously invokes the underlying RPC.
Args: Args:
@ -692,6 +712,8 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
wait_for_ready: An optional flag to enable wait for ready
mechanism
Returns: Returns:
An object that is both a Call for the RPC and a Future. An object that is both a Call for the RPC and a Future.
@ -707,7 +729,12 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-stream RPC from client-side.""" """Affords invoking a unary-stream RPC from client-side."""
@abc.abstractmethod @abc.abstractmethod
def __call__(self, request, timeout=None, metadata=None, credentials=None): def __call__(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
"""Invokes the underlying RPC. """Invokes the underlying RPC.
Args: Args:
@ -717,6 +744,8 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: An optional :term:`metadata` to be transmitted to the metadata: An optional :term:`metadata` to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
wait_for_ready: An optional flag to enable wait for ready
mechanism
Returns: Returns:
An object that is both a Call for the RPC and an iterator of An object that is both a Call for the RPC and an iterator of
@ -735,7 +764,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
"""Synchronously invokes the underlying RPC. """Synchronously invokes the underlying RPC.
Args: Args:
@ -746,6 +776,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
wait_for_ready: An optional flag to enable wait for ready
mechanism
Returns: Returns:
The response value for the RPC. The response value for the RPC.
@ -762,7 +794,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
"""Synchronously invokes the underlying RPC on the client. """Synchronously invokes the underlying RPC on the client.
Args: Args:
@ -773,6 +806,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
wait_for_ready: An optional flag to enable wait for ready
mechanism
Returns: Returns:
The response value for the RPC and a Call object for the RPC. The response value for the RPC and a Call object for the RPC.
@ -789,7 +824,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
"""Asynchronously invokes the underlying RPC on the client. """Asynchronously invokes the underlying RPC on the client.
Args: Args:
@ -799,6 +835,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
wait_for_ready: An optional flag to enable wait for ready
mechanism
Returns: Returns:
An object that is both a Call for the RPC and a Future. An object that is both a Call for the RPC and a Future.
@ -818,7 +856,8 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
"""Invokes the underlying RPC on the client. """Invokes the underlying RPC on the client.
Args: Args:
@ -828,6 +867,8 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
wait_for_ready: An optional flag to enable wait for ready
mechanism
Returns: Returns:
An object that is both a Call for the RPC and an iterator of An object that is both a Call for the RPC and an iterator of

@ -467,10 +467,11 @@ def _end_unary_response_blocking(state, call, with_call, deadline):
raise _Rendezvous(state, None, None, deadline) raise _Rendezvous(state, None, None, deadline)
def _stream_unary_invocation_operationses(metadata): def _stream_unary_invocation_operationses(metadata, initial_metadata_flags):
return ( return (
( (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), cygrpc.SendInitialMetadataOperation(metadata,
initial_metadata_flags),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
), ),
@ -478,15 +479,19 @@ def _stream_unary_invocation_operationses(metadata):
) )
def _stream_unary_invocation_operationses_and_tags(metadata): def _stream_unary_invocation_operationses_and_tags(metadata,
initial_metadata_flags):
return tuple(( return tuple((
operations, operations,
None, None,
) for operations in _stream_unary_invocation_operationses(metadata)) )
for operations in _stream_unary_invocation_operationses(
metadata, initial_metadata_flags))
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
# pylint: disable=too-many-arguments
def __init__(self, channel, managed_call, method, request_serializer, def __init__(self, channel, managed_call, method, request_serializer,
response_deserializer): response_deserializer):
self._channel = channel self._channel = channel
@ -495,15 +500,18 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
def _prepare(self, request, timeout, metadata): def _prepare(self, request, timeout, metadata, wait_for_ready):
deadline, serialized_request, rendezvous = _start_unary_request( deadline, serialized_request, rendezvous = _start_unary_request(
request, timeout, self._request_serializer) request, timeout, self._request_serializer)
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
wait_for_ready)
if serialized_request is None: if serialized_request is None:
return None, None, None, rendezvous return None, None, None, rendezvous
else: else:
state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
operations = ( operations = (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), cygrpc.SendInitialMetadataOperation(metadata,
initial_metadata_flags),
cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
@ -512,9 +520,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
) )
return state, operations, deadline, None return state, operations, deadline, None
def _blocking(self, request, timeout, metadata, credentials): def _blocking(self, request, timeout, metadata, credentials,
wait_for_ready):
state, operations, deadline, rendezvous = self._prepare( state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata) request, timeout, metadata, wait_for_ready)
if state is None: if state is None:
raise rendezvous raise rendezvous
else: else:
@ -528,17 +537,34 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
_handle_event(event, state, self._response_deserializer) _handle_event(event, state, self._response_deserializer)
return state, call, return state, call,
def __call__(self, request, timeout=None, metadata=None, credentials=None): def __call__(self,
state, call, = self._blocking(request, timeout, metadata, credentials) request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
state, call, = self._blocking(request, timeout, metadata, credentials,
wait_for_ready)
return _end_unary_response_blocking(state, call, False, None) return _end_unary_response_blocking(state, call, False, None)
def with_call(self, request, timeout=None, metadata=None, credentials=None): def with_call(self,
state, call, = self._blocking(request, timeout, metadata, credentials) request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
state, call, = self._blocking(request, timeout, metadata, credentials,
wait_for_ready)
return _end_unary_response_blocking(state, call, True, None) return _end_unary_response_blocking(state, call, True, None)
def future(self, request, timeout=None, metadata=None, credentials=None): def future(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
state, operations, deadline, rendezvous = self._prepare( state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata) request, timeout, metadata, wait_for_ready)
if state is None: if state is None:
raise rendezvous raise rendezvous
else: else:
@ -553,6 +579,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
# pylint: disable=too-many-arguments
def __init__(self, channel, managed_call, method, request_serializer, def __init__(self, channel, managed_call, method, request_serializer,
response_deserializer): response_deserializer):
self._channel = channel self._channel = channel
@ -561,16 +588,24 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
def __call__(self, request, timeout=None, metadata=None, credentials=None): def __call__(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
deadline, serialized_request, rendezvous = _start_unary_request( deadline, serialized_request, rendezvous = _start_unary_request(
request, timeout, self._request_serializer) request, timeout, self._request_serializer)
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
wait_for_ready)
if serialized_request is None: if serialized_request is None:
raise rendezvous raise rendezvous
else: else:
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
operationses = ( operationses = (
( (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), cygrpc.SendInitialMetadataOperation(metadata,
initial_metadata_flags),
cygrpc.SendMessageOperation(serialized_request, cygrpc.SendMessageOperation(serialized_request,
_EMPTY_FLAGS), _EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
@ -589,6 +624,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
# pylint: disable=too-many-arguments
def __init__(self, channel, managed_call, method, request_serializer, def __init__(self, channel, managed_call, method, request_serializer,
response_deserializer): response_deserializer):
self._channel = channel self._channel = channel
@ -597,13 +633,17 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
def _blocking(self, request_iterator, timeout, metadata, credentials): def _blocking(self, request_iterator, timeout, metadata, credentials,
wait_for_ready):
deadline = _deadline(timeout) deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
wait_for_ready)
call = self._channel.segregated_call( call = self._channel.segregated_call(
0, self._method, None, deadline, metadata, None 0, self._method, None, deadline, metadata, None
if credentials is None else credentials._credentials, if credentials is None else credentials._credentials,
_stream_unary_invocation_operationses_and_tags(metadata)) _stream_unary_invocation_operationses_and_tags(
metadata, initial_metadata_flags))
_consume_request_iterator(request_iterator, state, call, _consume_request_iterator(request_iterator, state, call,
self._request_serializer, None) self._request_serializer, None)
while True: while True:
@ -619,32 +659,38 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
state, call, = self._blocking(request_iterator, timeout, metadata, state, call, = self._blocking(request_iterator, timeout, metadata,
credentials) credentials, wait_for_ready)
return _end_unary_response_blocking(state, call, False, None) return _end_unary_response_blocking(state, call, False, None)
def with_call(self, def with_call(self,
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
state, call, = self._blocking(request_iterator, timeout, metadata, state, call, = self._blocking(request_iterator, timeout, metadata,
credentials) credentials, wait_for_ready)
return _end_unary_response_blocking(state, call, True, None) return _end_unary_response_blocking(state, call, True, None)
def future(self, def future(self,
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
deadline = _deadline(timeout) deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
event_handler = _event_handler(state, self._response_deserializer) event_handler = _event_handler(state, self._response_deserializer)
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
wait_for_ready)
call = self._managed_call( call = self._managed_call(
0, self._method, None, deadline, metadata, None 0, self._method, None, deadline, metadata, None
if credentials is None else credentials._credentials, if credentials is None else credentials._credentials,
_stream_unary_invocation_operationses(metadata), event_handler) _stream_unary_invocation_operationses(
metadata, initial_metadata_flags), event_handler)
_consume_request_iterator(request_iterator, state, call, _consume_request_iterator(request_iterator, state, call,
self._request_serializer, event_handler) self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline) return _Rendezvous(state, call, self._response_deserializer, deadline)
@ -652,6 +698,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
# pylint: disable=too-many-arguments
def __init__(self, channel, managed_call, method, request_serializer, def __init__(self, channel, managed_call, method, request_serializer,
response_deserializer): response_deserializer):
self._channel = channel self._channel = channel
@ -664,12 +711,16 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
deadline = _deadline(timeout) deadline = _deadline(timeout)
state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
wait_for_ready)
operationses = ( operationses = (
( (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), cygrpc.SendInitialMetadataOperation(metadata,
initial_metadata_flags),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
), ),
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
@ -684,6 +735,24 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
return _Rendezvous(state, call, self._response_deserializer, deadline) return _Rendezvous(state, call, self._response_deserializer, deadline)
class _InitialMetadataFlags(int):
"""Stores immutable initial metadata flags"""
def __new__(cls, value=_EMPTY_FLAGS):
value &= cygrpc.InitialMetadataFlags.used_mask
return super(_InitialMetadataFlags, cls).__new__(cls, value)
def with_wait_for_ready(self, wait_for_ready):
if wait_for_ready is not None:
if wait_for_ready:
self = self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
elif not wait_for_ready:
self = self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
return self
class _ChannelCallState(object): class _ChannelCallState(object):
def __init__(self, channel): def __init__(self, channel):

@ -140,6 +140,10 @@ cdef extern from "grpc/grpc.h":
const int GRPC_WRITE_NO_COMPRESS const int GRPC_WRITE_NO_COMPRESS
const int GRPC_WRITE_USED_MASK const int GRPC_WRITE_USED_MASK
const int GRPC_INITIAL_METADATA_WAIT_FOR_READY
const int GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
const int GRPC_INITIAL_METADATA_USED_MASK
const int GRPC_MAX_COMPLETION_QUEUE_PLUCKERS const int GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
ctypedef struct grpc_completion_queue: ctypedef struct grpc_completion_queue:

@ -15,6 +15,12 @@
import collections import collections
class InitialMetadataFlags:
used_mask = GRPC_INITIAL_METADATA_USED_MASK
wait_for_ready = GRPC_INITIAL_METADATA_WAIT_FOR_READY
wait_for_ready_explicitly_set = GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',)) _Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',))

@ -46,7 +46,7 @@ def service_pipeline(interceptors):
class _ClientCallDetails( class _ClientCallDetails(
collections.namedtuple( collections.namedtuple(
'_ClientCallDetails', '_ClientCallDetails',
('method', 'timeout', 'metadata', 'credentials')), ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready')),
grpc.ClientCallDetails): grpc.ClientCallDetails):
pass pass
@ -72,7 +72,12 @@ def _unwrap_client_call_details(call_details, default_details):
except AttributeError: except AttributeError:
credentials = default_details.credentials credentials = default_details.credentials
return method, timeout, metadata, credentials try:
wait_for_ready = call_details.wait_for_ready
except AttributeError:
wait_for_ready = default_details.wait_for_ready
return method, timeout, metadata, credentials, wait_for_ready
class _FailureOutcome(grpc.RpcError, grpc.Future, grpc.Call): class _FailureOutcome(grpc.RpcError, grpc.Future, grpc.Call):
@ -193,28 +198,39 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._method = method self._method = method
self._interceptor = interceptor self._interceptor = interceptor
def __call__(self, request, timeout=None, metadata=None, credentials=None): def __call__(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
response, ignored_call = self._with_call( response, ignored_call = self._with_call(
request, request,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata,
credentials=credentials) credentials=credentials,
wait_for_ready=wait_for_ready)
return response return response
def _with_call(self, request, timeout=None, metadata=None, def _with_call(self,
credentials=None): request,
client_call_details = _ClientCallDetails(self._method, timeout, timeout=None,
metadata, credentials) metadata=None,
credentials=None,
wait_for_ready=None):
client_call_details = _ClientCallDetails(
self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request): def continuation(new_details, request):
new_method, new_timeout, new_metadata, new_credentials = ( new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details)) _unwrap_client_call_details(new_details, client_call_details))
try: try:
response, call = self._thunk(new_method).with_call( response, call = self._thunk(new_method).with_call(
request, request,
timeout=new_timeout, timeout=new_timeout,
metadata=new_metadata, metadata=new_metadata,
credentials=new_credentials) credentials=new_credentials,
wait_for_ready=new_wait_for_ready)
return _UnaryOutcome(response, call) return _UnaryOutcome(response, call)
except grpc.RpcError: except grpc.RpcError:
raise raise
@ -225,25 +241,37 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
continuation, client_call_details, request) continuation, client_call_details, request)
return call.result(), call return call.result(), call
def with_call(self, request, timeout=None, metadata=None, credentials=None): def with_call(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
return self._with_call( return self._with_call(
request, request,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata,
credentials=credentials) credentials=credentials,
wait_for_ready=wait_for_ready)
def future(self, request, timeout=None, metadata=None, credentials=None): def future(self,
client_call_details = _ClientCallDetails(self._method, timeout, request,
metadata, credentials) timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
client_call_details = _ClientCallDetails(
self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request): def continuation(new_details, request):
new_method, new_timeout, new_metadata, new_credentials = ( new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details)) _unwrap_client_call_details(new_details, client_call_details))
return self._thunk(new_method).future( return self._thunk(new_method).future(
request, request,
timeout=new_timeout, timeout=new_timeout,
metadata=new_metadata, metadata=new_metadata,
credentials=new_credentials) credentials=new_credentials,
wait_for_ready=new_wait_for_ready)
try: try:
return self._interceptor.intercept_unary_unary( return self._interceptor.intercept_unary_unary(
@ -259,18 +287,24 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._method = method self._method = method
self._interceptor = interceptor self._interceptor = interceptor
def __call__(self, request, timeout=None, metadata=None, credentials=None): def __call__(self,
client_call_details = _ClientCallDetails(self._method, timeout, request,
metadata, credentials) timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None):
client_call_details = _ClientCallDetails(
self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request): def continuation(new_details, request):
new_method, new_timeout, new_metadata, new_credentials = ( new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details)) _unwrap_client_call_details(new_details, client_call_details))
return self._thunk(new_method)( return self._thunk(new_method)(
request, request,
timeout=new_timeout, timeout=new_timeout,
metadata=new_metadata, metadata=new_metadata,
credentials=new_credentials) credentials=new_credentials,
wait_for_ready=new_wait_for_ready)
try: try:
return self._interceptor.intercept_unary_stream( return self._interceptor.intercept_unary_stream(
@ -290,31 +324,35 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
response, ignored_call = self._with_call( response, ignored_call = self._with_call(
request_iterator, request_iterator,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata,
credentials=credentials) credentials=credentials,
wait_for_ready=wait_for_ready)
return response return response
def _with_call(self, def _with_call(self,
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
client_call_details = _ClientCallDetails(self._method, timeout, wait_for_ready=None):
metadata, credentials) client_call_details = _ClientCallDetails(
self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request_iterator): def continuation(new_details, request_iterator):
new_method, new_timeout, new_metadata, new_credentials = ( new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details)) _unwrap_client_call_details(new_details, client_call_details))
try: try:
response, call = self._thunk(new_method).with_call( response, call = self._thunk(new_method).with_call(
request_iterator, request_iterator,
timeout=new_timeout, timeout=new_timeout,
metadata=new_metadata, metadata=new_metadata,
credentials=new_credentials) credentials=new_credentials,
wait_for_ready=new_wait_for_ready)
return _UnaryOutcome(response, call) return _UnaryOutcome(response, call)
except grpc.RpcError: except grpc.RpcError:
raise raise
@ -329,29 +367,33 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
wait_for_ready=None):
return self._with_call( return self._with_call(
request_iterator, request_iterator,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata,
credentials=credentials) credentials=credentials,
wait_for_ready=wait_for_ready)
def future(self, def future(self,
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
client_call_details = _ClientCallDetails(self._method, timeout, wait_for_ready=None):
metadata, credentials) client_call_details = _ClientCallDetails(
self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request_iterator): def continuation(new_details, request_iterator):
new_method, new_timeout, new_metadata, new_credentials = ( new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details)) _unwrap_client_call_details(new_details, client_call_details))
return self._thunk(new_method).future( return self._thunk(new_method).future(
request_iterator, request_iterator,
timeout=new_timeout, timeout=new_timeout,
metadata=new_metadata, metadata=new_metadata,
credentials=new_credentials) credentials=new_credentials,
wait_for_ready=new_wait_for_ready)
try: try:
return self._interceptor.intercept_stream_unary( return self._interceptor.intercept_stream_unary(
@ -371,18 +413,20 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
request_iterator, request_iterator,
timeout=None, timeout=None,
metadata=None, metadata=None,
credentials=None): credentials=None,
client_call_details = _ClientCallDetails(self._method, timeout, wait_for_ready=None):
metadata, credentials) client_call_details = _ClientCallDetails(
self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request_iterator): def continuation(new_details, request_iterator):
new_method, new_timeout, new_metadata, new_credentials = ( new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details)) _unwrap_client_call_details(new_details, client_call_details))
return self._thunk(new_method)( return self._thunk(new_method)(
request_iterator, request_iterator,
timeout=new_timeout, timeout=new_timeout,
metadata=new_metadata, metadata=new_metadata,
credentials=new_credentials) credentials=new_credentials,
wait_for_ready=new_wait_for_ready)
try: try:
return self._interceptor.intercept_stream_stream( return self._interceptor.intercept_stream_stream(

@ -49,6 +49,7 @@
"unit._invocation_defects_test.InvocationDefectsTest", "unit._invocation_defects_test.InvocationDefectsTest",
"unit._logging_test.LoggingTest", "unit._logging_test.LoggingTest",
"unit._metadata_code_details_test.MetadataCodeDetailsTest", "unit._metadata_code_details_test.MetadataCodeDetailsTest",
"unit._metadata_flags_test.MetadataFlagsTest",
"unit._metadata_test.MetadataTest", "unit._metadata_test.MetadataTest",
"unit._reconnect_test.ReconnectTest", "unit._reconnect_test.ReconnectTest",
"unit._resource_exhausted_test.ResourceExhaustedTest", "unit._resource_exhausted_test.ResourceExhaustedTest",

@ -0,0 +1,251 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests metadata flags feature by testing wait-for-ready semantics"""
import time
import weakref
import unittest
import threading
import socket
from six.moves import queue
import grpc
from tests.unit import test_common
from tests.unit.framework.common import test_constants
_UNARY_UNARY = '/test/UnaryUnary'
_UNARY_STREAM = '/test/UnaryStream'
_STREAM_UNARY = '/test/StreamUnary'
_STREAM_STREAM = '/test/StreamStream'
_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x00\x00\x00'
def handle_unary_unary(test, request, servicer_context):
return _RESPONSE
def handle_unary_stream(test, request, servicer_context):
for _ in range(test_constants.STREAM_LENGTH):
yield _RESPONSE
def handle_stream_unary(test, request_iterator, servicer_context):
for _ in request_iterator:
pass
return _RESPONSE
def handle_stream_stream(test, request_iterator, servicer_context):
for _ in request_iterator:
yield _RESPONSE
class _MethodHandler(grpc.RpcMethodHandler):
def __init__(self, test, request_streaming, response_streaming):
self.request_streaming = request_streaming
self.response_streaming = response_streaming
self.request_deserializer = None
self.response_serializer = None
self.unary_unary = None
self.unary_stream = None
self.stream_unary = None
self.stream_stream = None
if self.request_streaming and self.response_streaming:
self.stream_stream = lambda req, ctx: handle_stream_stream(test, req, ctx)
elif self.request_streaming:
self.stream_unary = lambda req, ctx: handle_stream_unary(test, req, ctx)
elif self.response_streaming:
self.unary_stream = lambda req, ctx: handle_unary_stream(test, req, ctx)
else:
self.unary_unary = lambda req, ctx: handle_unary_unary(test, req, ctx)
class _GenericHandler(grpc.GenericRpcHandler):
def __init__(self, test):
self._test = test
def service(self, handler_call_details):
if handler_call_details.method == _UNARY_UNARY:
return _MethodHandler(self._test, False, False)
elif handler_call_details.method == _UNARY_STREAM:
return _MethodHandler(self._test, False, True)
elif handler_call_details.method == _STREAM_UNARY:
return _MethodHandler(self._test, True, False)
elif handler_call_details.method == _STREAM_STREAM:
return _MethodHandler(self._test, True, True)
else:
return None
def get_free_loopback_tcp_port():
tcp = socket.socket(socket.AF_INET6)
tcp.bind(('', 0))
address_tuple = tcp.getsockname()
return tcp, "[::1]:%s" % (address_tuple[1])
def create_dummy_channel():
"""Creating dummy channels is a workaround for retries"""
_, addr = get_free_loopback_tcp_port()
return grpc.insecure_channel(addr)
def perform_unary_unary_call(channel, wait_for_ready=None):
channel.unary_unary(_UNARY_UNARY).__call__(
_REQUEST,
timeout=test_constants.LONG_TIMEOUT,
wait_for_ready=wait_for_ready)
def perform_unary_unary_with_call(channel, wait_for_ready=None):
channel.unary_unary(_UNARY_UNARY).with_call(
_REQUEST,
timeout=test_constants.LONG_TIMEOUT,
wait_for_ready=wait_for_ready)
def perform_unary_unary_future(channel, wait_for_ready=None):
channel.unary_unary(_UNARY_UNARY).future(
_REQUEST,
timeout=test_constants.LONG_TIMEOUT,
wait_for_ready=wait_for_ready).result(
timeout=test_constants.LONG_TIMEOUT)
def perform_unary_stream_call(channel, wait_for_ready=None):
response_iterator = channel.unary_stream(_UNARY_STREAM).__call__(
_REQUEST,
timeout=test_constants.LONG_TIMEOUT,
wait_for_ready=wait_for_ready)
for _ in response_iterator:
pass
def perform_stream_unary_call(channel, wait_for_ready=None):
channel.stream_unary(_STREAM_UNARY).__call__(
iter([_REQUEST] * test_constants.STREAM_LENGTH),
timeout=test_constants.LONG_TIMEOUT,
wait_for_ready=wait_for_ready)
def perform_stream_unary_with_call(channel, wait_for_ready=None):
channel.stream_unary(_STREAM_UNARY).with_call(
iter([_REQUEST] * test_constants.STREAM_LENGTH),
timeout=test_constants.LONG_TIMEOUT,
wait_for_ready=wait_for_ready)
def perform_stream_unary_future(channel, wait_for_ready=None):
channel.stream_unary(_STREAM_UNARY).future(
iter([_REQUEST] * test_constants.STREAM_LENGTH),
timeout=test_constants.LONG_TIMEOUT,
wait_for_ready=wait_for_ready).result(
timeout=test_constants.LONG_TIMEOUT)
def perform_stream_stream_call(channel, wait_for_ready=None):
response_iterator = channel.stream_stream(_STREAM_STREAM).__call__(
iter([_REQUEST] * test_constants.STREAM_LENGTH),
timeout=test_constants.LONG_TIMEOUT,
wait_for_ready=wait_for_ready)
for _ in response_iterator:
pass
_ALL_CALL_CASES = [
perform_unary_unary_call, perform_unary_unary_with_call,
perform_unary_unary_future, perform_unary_stream_call,
perform_stream_unary_call, perform_stream_unary_with_call,
perform_stream_unary_future, perform_stream_stream_call
]
class MetadataFlagsTest(unittest.TestCase):
def check_connection_does_failfast(self, fn, channel, wait_for_ready=None):
try:
fn(channel, wait_for_ready)
self.fail("The Call should fail")
except BaseException as e: # pylint: disable=broad-except
self.assertIn('StatusCode.UNAVAILABLE', str(e))
def test_call_wait_for_ready_default(self):
for perform_call in _ALL_CALL_CASES:
self.check_connection_does_failfast(perform_call,
create_dummy_channel())
def test_call_wait_for_ready_disabled(self):
for perform_call in _ALL_CALL_CASES:
self.check_connection_does_failfast(
perform_call, create_dummy_channel(), wait_for_ready=False)
def test_call_wait_for_ready_enabled(self):
# To test the wait mechanism, Python thread is required to make
# client set up first without handling them case by case.
# Also, Python thread don't pass the unhandled exceptions to
# main thread. So, it need another method to store the
# exceptions and raise them again in main thread.
unhandled_exceptions = queue.Queue()
tcp, addr = get_free_loopback_tcp_port()
wg = test_common.WaitGroup(len(_ALL_CALL_CASES))
def wait_for_transient_failure(channel_connectivity):
if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
wg.done()
def test_call(perform_call):
try:
channel = grpc.insecure_channel(addr)
channel.subscribe(wait_for_transient_failure)
perform_call(channel, wait_for_ready=True)
except BaseException as e: # pylint: disable=broad-except
# If the call failed, the thread would be destroyed. The channel
# object can be collected before calling the callback, which
# will result in a deadlock.
wg.done()
unhandled_exceptions.put(e, True)
test_threads = []
for perform_call in _ALL_CALL_CASES:
test_thread = threading.Thread(
target=test_call, args=(perform_call,))
test_thread.exception = None
test_thread.start()
test_threads.append(test_thread)
# Start the server after the connections are waiting
wg.wait()
tcp.close()
server = test_common.test_server()
server.add_generic_rpc_handlers((_GenericHandler(weakref.proxy(self)),))
server.add_insecure_port(addr)
server.start()
for test_thread in test_threads:
test_thread.join()
# Stop the server to make test end properly
server.stop(0)
if not unhandled_exceptions.empty():
raise unhandled_exceptions.get(True)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -14,6 +14,7 @@
"""Common code used throughout tests of gRPC.""" """Common code used throughout tests of gRPC."""
import collections import collections
import threading
from concurrent import futures from concurrent import futures
import grpc import grpc
@ -107,3 +108,28 @@ def test_server(max_workers=10):
return grpc.server( return grpc.server(
futures.ThreadPoolExecutor(max_workers=max_workers), futures.ThreadPoolExecutor(max_workers=max_workers),
options=(('grpc.so_reuseport', 0),)) options=(('grpc.so_reuseport', 0),))
class WaitGroup(object):
def __init__(self, n=0):
self.count = n
self.cv = threading.Condition()
def add(self, n):
self.cv.acquire()
self.count += n
self.cv.release()
def done(self):
self.cv.acquire()
self.count -= 1
if self.count == 0:
self.cv.notify_all()
self.cv.release()
def wait(self):
self.cv.acquire()
while self.count > 0:
self.cv.wait()
self.cv.release()

Loading…
Cancel
Save