From 3b6a5a6a01ea5297cb566dc6d396806a42100993 Mon Sep 17 00:00:00 2001 From: Zhanghui Mao Date: Sat, 21 Sep 2019 17:17:12 +0800 Subject: [PATCH] switch to concrete classes --- .../grpcio/grpc/experimental/aio/__init__.py | 102 +++--------------- .../grpcio/grpc/experimental/aio/_channel.py | 59 +++++++++- 2 files changed, 69 insertions(+), 92 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index 6004126549b..8f064f90915 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -18,91 +18,8 @@ import six from grpc._cython.cygrpc import init_grpc_aio - -class Channel(six.with_metaclass(abc.ABCMeta)): - """Asynchronous Channel implementation.""" - - @abc.abstractmethod - def unary_unary(self, - method, - request_serializer=None, - response_deserializer=None): - """Creates a UnaryUnaryMultiCallable for a unary-unary method. - - Args: - method: The name of the RPC method. - request_serializer: Optional behaviour for serializing the request - message. Request goes unserialized in case None is passed. - response_deserializer: Optional behaviour for deserializing the - response message. Response goes undeserialized in case None - is passed. - - Returns: - A UnaryUnaryMultiCallable value for the named unary-unary method. - """ - raise NotImplementedError() - - @abc.abstractmethod - async def close(self): - """Closes this Channel and releases all resources held by it. - - Closing the Channel will proactively terminate all RPCs active with the - Channel and it is not valid to invoke new RPCs with the Channel. - - This method is idempotent. - """ - raise NotImplementedError() - - @abc.abstractmethod - async def __aenter__(self): - """Starts an asynchronous context manager. - - Returns: - Channel the channel that was instantiated. - """ - raise NotImplementedError() - - @abc.abstractmethod - async def __aexit__(self, exc_type, exc_val, exc_tb): - """Finishes the asynchronous context manager by closing gracefully the channel.""" - raise NotImplementedError() - - -class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): - """Affords invoking a unary-unary RPC from client-side in an asynchronous way.""" - - @abc.abstractmethod - async def __call__(self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None): - """Asynchronously invokes the underlying RPC. - - Args: - request: The request value for the RPC. - timeout: An optional duration of time in seconds to allow - for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. Only valid for - secure Channel. - wait_for_ready: This is an EXPERIMENTAL argument. An optional - flag to enable wait for ready mechanism - compression: An element of grpc.compression, e.g. - grpc.compression.Gzip. This is an EXPERIMENTAL option. - - Returns: - The response value for the RPC. - - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. The - raised RpcError will also be a Call for the RPC affording the RPC's - metadata, status code, and details. - """ - raise NotImplementedError() +from ._channel import Channel +from ._channel import UnaryUnaryMultiCallable def insecure_channel(target, options=None, compression=None): @@ -118,6 +35,15 @@ def insecure_channel(target, options=None, compression=None): Returns: A Channel. """ - from grpc.experimental.aio import _channel # pylint: disable=cyclic-import - return _channel.Channel(target, () - if options is None else options, None, compression) + return Channel(target, () + if options is None else options, None, compression) + + +################################### __all__ ################################# + +__all__ = ( + 'init_grpc_aio', + 'Channel', + 'UnaryUnaryMultiCallable', + 'insecure_channel', +) diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index e3c8fcdbf2f..434190187b6 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -15,10 +15,10 @@ from grpc import _common from grpc._cython import cygrpc -from grpc.experimental import aio -class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable): +class UnaryUnaryMultiCallable: + """Affords invoking a unary-unary RPC from client-side in an asynchronous way.""" def __init__(self, channel, method, request_serializer, response_deserializer): @@ -34,6 +34,29 @@ class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable): credentials=None, wait_for_ready=None, compression=None): + """Asynchronously invokes the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow + for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. Only valid for + secure Channel. + wait_for_ready: This is an EXPERIMENTAL argument. An optional + flag to enable wait for ready mechanism + compression: An element of grpc.compression, e.g. + grpc.compression.Gzip. This is an EXPERIMENTAL option. + + Returns: + The response value for the RPC. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ if timeout: raise NotImplementedError("TODO: timeout not implemented yet") @@ -57,8 +80,11 @@ class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable): return _common.deserialize(response, self._response_deserializer) -class Channel(aio.Channel): - """A cygrpc.AioChannel-backed implementation of grpc.experimental.aio.Channel.""" +class Channel: + """Asynchronous Channel implementation. + + A cygrpc.AioChannel-backed implementation. + """ def __init__(self, target, options, credentials, compression): """Constructor. @@ -86,7 +112,19 @@ class Channel(aio.Channel): method, request_serializer=None, response_deserializer=None): + """Creates a UnaryUnaryMultiCallable for a unary-unary method. + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None + is passed. + + Returns: + A UnaryUnaryMultiCallable value for the named unary-unary method. + """ return UnaryUnaryMultiCallable(self._channel, _common.encode(method), request_serializer, response_deserializer) @@ -96,10 +134,23 @@ class Channel(aio.Channel): self._channel.close() async def __aenter__(self): + """Starts an asynchronous context manager. + + Returns: + Channel the channel that was instantiated. + """ return self async def __aexit__(self, exc_type, exc_val, exc_tb): + """Finishes the asynchronous context manager by closing gracefully the channel.""" await self._close() async def close(self): + """Closes this Channel and releases all resources held by it. + + Closing the Channel will proactively terminate all RPCs active with the + Channel and it is not valid to invoke new RPCs with the Channel. + + This method is idempotent. + """ await self._close()