switch to concrete classes

pull/20326/head
Zhanghui Mao 5 years ago
parent a44e6d76b7
commit 3b6a5a6a01
  1. 102
      src/python/grpcio/grpc/experimental/aio/__init__.py
  2. 59
      src/python/grpcio/grpc/experimental/aio/_channel.py

@ -18,91 +18,8 @@ import six
from grpc._cython.cygrpc import init_grpc_aio
class Channel(six.with_metaclass(abc.ABCMeta)):
"""Asynchronous Channel implementation."""
@abc.abstractmethod
def unary_unary(self,
method,
request_serializer=None,
response_deserializer=None):
"""Creates a UnaryUnaryMultiCallable for a unary-unary method.
Args:
method: The name of the RPC method.
request_serializer: Optional behaviour for serializing the request
message. Request goes unserialized in case None is passed.
response_deserializer: Optional behaviour for deserializing the
response message. Response goes undeserialized in case None
is passed.
Returns:
A UnaryUnaryMultiCallable value for the named unary-unary method.
"""
raise NotImplementedError()
@abc.abstractmethod
async def close(self):
"""Closes this Channel and releases all resources held by it.
Closing the Channel will proactively terminate all RPCs active with the
Channel and it is not valid to invoke new RPCs with the Channel.
This method is idempotent.
"""
raise NotImplementedError()
@abc.abstractmethod
async def __aenter__(self):
"""Starts an asynchronous context manager.
Returns:
Channel the channel that was instantiated.
"""
raise NotImplementedError()
@abc.abstractmethod
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Finishes the asynchronous context manager by closing gracefully the channel."""
raise NotImplementedError()
class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-unary RPC from client-side in an asynchronous way."""
@abc.abstractmethod
async def __call__(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None):
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: An optional duration of time in seconds to allow
for the RPC.
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC. Only valid for
secure Channel.
wait_for_ready: This is an EXPERIMENTAL argument. An optional
flag to enable wait for ready mechanism
compression: An element of grpc.compression, e.g.
grpc.compression.Gzip. This is an EXPERIMENTAL option.
Returns:
The response value for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
raise NotImplementedError()
from ._channel import Channel
from ._channel import UnaryUnaryMultiCallable
def insecure_channel(target, options=None, compression=None):
@ -118,6 +35,15 @@ def insecure_channel(target, options=None, compression=None):
Returns:
A Channel.
"""
from grpc.experimental.aio import _channel # pylint: disable=cyclic-import
return _channel.Channel(target, ()
if options is None else options, None, compression)
return Channel(target, ()
if options is None else options, None, compression)
################################### __all__ #################################
__all__ = (
'init_grpc_aio',
'Channel',
'UnaryUnaryMultiCallable',
'insecure_channel',
)

@ -15,10 +15,10 @@
from grpc import _common
from grpc._cython import cygrpc
from grpc.experimental import aio
class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable):
class UnaryUnaryMultiCallable:
"""Affords invoking a unary-unary RPC from client-side in an asynchronous way."""
def __init__(self, channel, method, request_serializer,
response_deserializer):
@ -34,6 +34,29 @@ class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable):
credentials=None,
wait_for_ready=None,
compression=None):
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: An optional duration of time in seconds to allow
for the RPC.
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC. Only valid for
secure Channel.
wait_for_ready: This is an EXPERIMENTAL argument. An optional
flag to enable wait for ready mechanism
compression: An element of grpc.compression, e.g.
grpc.compression.Gzip. This is an EXPERIMENTAL option.
Returns:
The response value for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
if timeout:
raise NotImplementedError("TODO: timeout not implemented yet")
@ -57,8 +80,11 @@ class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable):
return _common.deserialize(response, self._response_deserializer)
class Channel(aio.Channel):
"""A cygrpc.AioChannel-backed implementation of grpc.experimental.aio.Channel."""
class Channel:
"""Asynchronous Channel implementation.
A cygrpc.AioChannel-backed implementation.
"""
def __init__(self, target, options, credentials, compression):
"""Constructor.
@ -86,7 +112,19 @@ class Channel(aio.Channel):
method,
request_serializer=None,
response_deserializer=None):
"""Creates a UnaryUnaryMultiCallable for a unary-unary method.
Args:
method: The name of the RPC method.
request_serializer: Optional behaviour for serializing the request
message. Request goes unserialized in case None is passed.
response_deserializer: Optional behaviour for deserializing the
response message. Response goes undeserialized in case None
is passed.
Returns:
A UnaryUnaryMultiCallable value for the named unary-unary method.
"""
return UnaryUnaryMultiCallable(self._channel, _common.encode(method),
request_serializer,
response_deserializer)
@ -96,10 +134,23 @@ class Channel(aio.Channel):
self._channel.close()
async def __aenter__(self):
"""Starts an asynchronous context manager.
Returns:
Channel the channel that was instantiated.
"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Finishes the asynchronous context manager by closing gracefully the channel."""
await self._close()
async def close(self):
"""Closes this Channel and releases all resources held by it.
Closing the Channel will proactively terminate all RPCs active with the
Channel and it is not valid to invoke new RPCs with the Channel.
This method is idempotent.
"""
await self._close()

Loading…
Cancel
Save