diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 9661c5e5b38..ed4c871b684 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -488,6 +488,18 @@ def _stream_unary_invocation_operationses_and_tags(metadata, metadata, initial_metadata_flags)) +def _determine_deadline(user_deadline): + parent_deadline = cygrpc.get_deadline_from_context() + if parent_deadline is None and user_deadline is None: + return None + elif parent_deadline is not None and user_deadline is None: + return parent_deadline + elif user_deadline is not None and parent_deadline is None: + return user_deadline + else: + return min(parent_deadline, user_deadline) + + class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): # pylint: disable=too-many-arguments @@ -527,9 +539,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): if state is None: raise rendezvous # pylint: disable-msg=raising-bad-type else: + deadline_to_propagate = _determine_deadline(deadline) call = self._channel.segregated_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, - self._method, None, deadline, metadata, None + self._method, None, deadline_to_propagate, metadata, None if credentials is None else credentials._credentials, (( operations, None, @@ -619,8 +632,8 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, - self._method, None, deadline, metadata, None - if credentials is None else credentials._credentials, + self._method, None, _determine_deadline(deadline), metadata, + None if credentials is None else credentials._credentials, operationses, event_handler, self._context) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -644,9 +657,10 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) + deadline_to_propagate = _determine_deadline(deadline) call = self._channel.segregated_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, - None, deadline, metadata, None + None, deadline_to_propagate, metadata, None if credentials is None else credentials._credentials, _stream_unary_invocation_operationses_and_tags( metadata, initial_metadata_flags), self._context) @@ -734,9 +748,10 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), ) event_handler = _event_handler(state, self._response_deserializer) + deadline_to_propagate = _determine_deadline(deadline) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, - None, deadline, metadata, None + None, deadline_to_propagate, metadata, None if credentials is None else credentials._credentials, operationses, event_handler, self._context) _consume_request_iterator(request_iterator, state, call, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi index 6d1c36b2b35..de4d71b8196 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi @@ -16,7 +16,7 @@ cdef object _custom_op_on_c_call(int op, grpc_call *call): raise NotImplementedError("No custom hooks are implemented") -def install_context_from_call(Call call): +def install_context_from_request_call_event(RequestCallEvent event): pass def uninstall_context(): @@ -30,3 +30,6 @@ cdef class CensusContext: def set_census_context_on_call(_CallState call_state, CensusContext census_ctx): pass + +def get_deadline_from_context(): + return None diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 9224b2ac672..90136aef3c2 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -498,7 +498,7 @@ def _status(rpc_event, state, serialized_response): def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk, request_deserializer, response_serializer): - cygrpc.install_context_from_call(rpc_event.call) + cygrpc.install_context_from_request_call_event(rpc_event) try: argument = argument_thunk() if argument is not None: @@ -515,7 +515,7 @@ def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk, def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk, request_deserializer, response_serializer): - cygrpc.install_context_from_call(rpc_event.call) + cygrpc.install_context_from_request_call_event(rpc_event) def send_response(response): if response is None: