diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py index a74c77ebcc2..ee3d72fdbc8 100644 --- a/src/python/grpcio/grpc/_links/invocation.py +++ b/src/python/grpcio/grpc/_links/invocation.py @@ -141,6 +141,8 @@ class _Kernel(object): termination = links.Ticket.Termination.CANCELLATION elif event.status.code is _intermediary_low.Code.DEADLINE_EXCEEDED: termination = links.Ticket.Termination.EXPIRATION + elif event.status.code is _intermediary_low.Code.UNKNOWN: + termination = links.Ticket.Termination.LOCAL_FAILURE else: termination = links.Ticket.Termination.TRANSMISSION_FAILURE ticket = links.Ticket( @@ -349,7 +351,7 @@ def invocation_link(channel, host, request_serializers, response_deserializers): """Creates an InvocationLink. Args: - channel: A channel for use by the link. + channel: An _intermediary_low.Channel for use by the link. host: The host to specify when invoking RPCs. request_serializers: A dict from group-method pair to request object serialization behavior. diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index 14361762461..43c4c0e80ca 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -40,6 +40,19 @@ from grpc.framework.foundation import logging_pool from grpc.framework.foundation import relay from grpc.framework.interfaces.links import links +_TERMINATION_KIND_TO_CODE = { + links.Ticket.Termination.COMPLETION: _intermediary_low.Code.OK, + links.Ticket.Termination.CANCELLATION: _intermediary_low.Code.CANCELLED, + links.Ticket.Termination.EXPIRATION: + _intermediary_low.Code.DEADLINE_EXCEEDED, + links.Ticket.Termination.SHUTDOWN: _intermediary_low.Code.UNAVAILABLE, + links.Ticket.Termination.RECEPTION_FAILURE: _intermediary_low.Code.INTERNAL, + links.Ticket.Termination.TRANSMISSION_FAILURE: + _intermediary_low.Code.INTERNAL, + links.Ticket.Termination.LOCAL_FAILURE: _intermediary_low.Code.UNKNOWN, + links.Ticket.Termination.REMOTE_FAILURE: _intermediary_low.Code.UNKNOWN, +} + @enum.unique class _Read(enum.Enum): @@ -93,6 +106,15 @@ def _metadatafy(call, metadata): call.add_metadata(metadata_key, metadata_value) +def _status(termination_kind, code, details): + effective_details = b'' if details is None else details + if code is None: + effective_code = _TERMINATION_KIND_TO_CODE[termination_kind] + else: + effective_code = code + return _intermediary_low.Status(effective_code, effective_details) + + class _Kernel(object): def __init__(self, request_deserializers, response_serializers, ticket_relay): @@ -170,8 +192,10 @@ class _Kernel(object): if rpc_state.high_write is _HighWrite.CLOSED: if rpc_state.terminal_metadata is not None: _metadatafy(call, rpc_state.terminal_metadata) - call.status( - _intermediary_low.Status(rpc_state.code, rpc_state.message), call) + status = _status( + links.Ticket.Termination.COMPLETION, rpc_state.code, + rpc_state.message) + call.status(status, call) rpc_state.low_write = _LowWrite.CLOSED else: ticket = links.Ticket( @@ -279,14 +303,17 @@ class _Kernel(object): if rpc_state.low_write is _LowWrite.OPEN: if rpc_state.terminal_metadata is not None: _metadatafy(call, rpc_state.terminal_metadata) - status = _intermediary_low.Status( - _intermediary_low.Code.OK - if rpc_state.code is None else rpc_state.code, - '' if rpc_state.message is None else rpc_state.message) + status = _status( + links.Ticket.Termination.COMPLETION, rpc_state.code, + rpc_state.message) call.status(status, call) rpc_state.low_write = _LowWrite.CLOSED elif ticket.termination is not None: - call.cancel() + if rpc_state.terminal_metadata is not None: + _metadatafy(call, rpc_state.terminal_metadata) + status = _status( + ticket.termination, rpc_state.code, rpc_state.message) + call.status(status, call) self._rpc_states.pop(call, None) def add_port(self, port, server_credentials):