Support channel argument for both client-side and server-side

pull/21607/head
Lidi Zheng 5 years ago
parent da6a29dd6d
commit 91df9493eb
  1. 7
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
  2. 19
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  3. 36
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  4. 4
      src/python/grpcio/grpc/experimental/aio/__init__.py
  5. 17
      src/python/grpcio/grpc/experimental/aio/_channel.py
  6. 1
      src/python/grpcio/grpc/experimental/aio/_typing.py
  7. 156
      src/python/grpcio_tests/tests_aio/unit/channel_argument_test.py

@ -13,8 +13,11 @@
# limitations under the License.
cdef class AioChannel:
def __cinit__(self, bytes target):
self.channel = grpc_insecure_channel_create(<char *>target, NULL, NULL)
def __cinit__(self, bytes target, tuple options):
if options is None:
options = ()
cdef _ChannelArgs channel_args = _ChannelArgs(options)
self.channel = grpc_insecure_channel_create(<char *>target, channel_args.c_args(), NULL)
self.cq = CallbackCompletionQueue()
self._target = target

@ -26,6 +26,7 @@ cdef grpc_socket_vtable asyncio_socket_vtable
cdef grpc_custom_resolver_vtable asyncio_resolver_vtable
cdef grpc_custom_timer_vtable asyncio_timer_vtable
cdef grpc_custom_poller_vtable asyncio_pollset_vtable
cdef bint so_reuse_port
cdef grpc_error* asyncio_socket_init(
@ -121,10 +122,22 @@ cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil
return grpc_error_none()
def _asyncio_apply_socket_options(object s, so_reuse_port=False):
# TODO(https://github.com/grpc/grpc/issues/20667)
# Connects the so_reuse_port option to channel arguments
cdef list _socket_options_list = []
cdef str _SOCKET_OPT_SO_REUSEPORT = 'grpc.so_reuseport'
cdef _apply_socket_options(tuple options):
if options is None:
options = ()
for key, value in options:
if key == _SOCKET_OPT_SO_REUSEPORT:
_socket_options_list.append(value)
def _asyncio_apply_socket_options(object s):
s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1)
if _socket_options_list.pop(0):
s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEPORT, 1)
s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)

@ -227,17 +227,21 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop):
# TODO(lidiz) return unimplemented error to client side
raise NotImplementedError()
# TODO(lidiz) extend to all 4 types of RPC
if not method_handler.request_streaming and method_handler.response_streaming:
await _handle_unary_stream_rpc(method_handler,
rpc_state,
loop)
elif not method_handler.request_streaming and not method_handler.response_streaming:
await _handle_unary_unary_rpc(method_handler,
rpc_state,
loop)
else:
raise NotImplementedError()
try:
# TODO(lidiz) extend to all 4 types of RPC
if not method_handler.request_streaming and method_handler.response_streaming:
await _handle_unary_stream_rpc(method_handler,
rpc_state,
loop)
elif not method_handler.request_streaming and not method_handler.response_streaming:
await _handle_unary_unary_rpc(method_handler,
rpc_state,
loop)
else:
raise NotImplementedError()
except Exception as e:
_LOGGER.exception(e)
raise
class _RequestCallError(Exception): pass
@ -256,6 +260,8 @@ cdef class AioServer:
def __init__(self, loop, thread_pool, generic_handlers, interceptors,
options, maximum_concurrent_rpcs, compression):
# Notify IO manager about the socket options
_apply_socket_options(options)
# NOTE(lidiz) Core objects won't be deallocated automatically.
# If AioServer.shutdown is not called, those objects will leak.
self._server = Server(options)
@ -469,5 +475,11 @@ cdef class AioServer:
If the Cython representation is deallocated without underlying objects
freed, raise an RuntimeError.
"""
# NOTE(lidiz) if users create server, and then dealloc it immediately.
# There is a potential memory leak of created Core server.
if self._status != AIO_SERVER_STATUS_STOPPED:
raise RuntimeError('__dealloc__ called on running server: %d', self._status)
_LOGGER.warn(
'__dealloc__ called on running server %s with status %d',
self,
self._status
)

@ -30,7 +30,7 @@ from ._channel import Channel
from ._channel import UnaryUnaryMultiCallable
from ._interceptor import ClientCallDetails, UnaryUnaryClientInterceptor
from ._interceptor import InterceptedUnaryUnaryCall
from ._server import server
from ._server import server, Server
def insecure_channel(
@ -64,4 +64,4 @@ __all__ = ('AioRpcError', 'RpcContext', 'Call', 'UnaryUnaryCall',
'UnaryStreamCall', 'init_grpc_aio', 'Channel',
'UnaryUnaryMultiCallable', 'ClientCallDetails',
'UnaryUnaryClientInterceptor', 'InterceptedUnaryUnaryCall',
'insecure_channel', 'server')
'insecure_channel', 'server', 'Server')

@ -20,9 +20,11 @@ from grpc import _common
from grpc._cython import cygrpc
from . import _base_call
from ._call import UnaryUnaryCall, UnaryStreamCall
from ._interceptor import UnaryUnaryClientInterceptor, InterceptedUnaryUnaryCall
from ._typing import (DeserializingFunction, MetadataType, SerializingFunction)
from ._call import UnaryStreamCall, UnaryUnaryCall
from ._interceptor import (InterceptedUnaryUnaryCall,
UnaryUnaryClientInterceptor)
from ._typing import (ChannelArgumentType, DeserializingFunction, MetadataType,
SerializingFunction)
from ._utils import _timeout_to_deadline
@ -186,8 +188,9 @@ class Channel:
_channel: cygrpc.AioChannel
_unary_unary_interceptors: Optional[Sequence[UnaryUnaryClientInterceptor]]
def __init__(self, target: Text,
options: Optional[Sequence[Tuple[Text, Any]]],
def __init__(self,
target: Text,
options: Optional[ChannelArgumentType],
credentials: Optional[grpc.ChannelCredentials],
compression: Optional[grpc.Compression],
interceptors: Optional[Sequence[UnaryUnaryClientInterceptor]]):
@ -202,8 +205,6 @@ class Channel:
interceptors: An optional list of interceptors that would be used for
intercepting any RPC executed with that channel.
"""
if options:
raise NotImplementedError("TODO: options not implemented yet")
if credentials:
raise NotImplementedError("TODO: credentials not implemented yet")
@ -229,7 +230,7 @@ class Channel:
"UnaryUnaryClientInterceptors, the following are invalid: {}"\
.format(invalid_interceptors))
self._channel = cygrpc.AioChannel(_common.encode(target))
self._channel = cygrpc.AioChannel(_common.encode(target), options)
def unary_unary(
self,

@ -20,3 +20,4 @@ ResponseType = TypeVar('ResponseType')
SerializingFunction = Callable[[Any], bytes]
DeserializingFunction = Callable[[bytes], Any]
MetadataType = Sequence[Tuple[Text, AnyStr]]
ChannelArgumentType = Sequence[Tuple[Text, Any]]

@ -0,0 +1,156 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests behavior around the Core channel arguments."""
import asyncio
import logging
import unittest
import socket
import grpc
import random
from grpc.experimental import aio
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
from tests.unit.framework.common import test_constants
from tests_aio.unit._test_server import start_test_server
from tests_aio.unit._test_base import AioTestBase
# 100 servers in sequence
_RANDOM_SEED = 42
_ENABLE_REUSE_PORT = 'SO_REUSEPORT enabled'
_DISABLE_REUSE_PORT = 'SO_REUSEPORT disabled'
_SOCKET_OPT_SO_REUSEPORT = 'grpc.so_reuseport'
_OPTIONS = (
(_ENABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 1),)),
(_DISABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 0),)),
)
_NUM_SERVER_CREATED = 100
_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH = 'grpc.max_receive_message_length'
_MAX_MESSAGE_LENGTH = 1024
class _TestPointerWrapper(object):
def __int__(self):
return 123456
_TEST_CHANNEL_ARGS = (
('arg1', b'bytes_val'),
('arg2', 'str_val'),
('arg3', 1),
(b'arg4', 'str_val'),
('arg6', _TestPointerWrapper()),
)
_INVALID_TEST_CHANNEL_ARGS = [
{'foo': 'bar'},
(('key',),),
'str',
]
async def test_if_reuse_port_enabled(server: aio.Server):
port = server.add_insecure_port('127.0.0.1:0')
await server.start()
try:
another_socket = socket.socket(family=socket.AF_INET6)
another_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
another_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
another_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
another_socket.bind(('127.0.0.1', port))
except OSError as e:
assert 'Address already in use' in str(e)
return False
else:
return True
finally:
another_socket.close()
class TestChannelArgument(AioTestBase):
async def setUp(self):
random.seed(_RANDOM_SEED)
async def test_server_so_reuse_port_is_set_properly(self):
for _ in range(_NUM_SERVER_CREATED):
fact, options = random.choice(_OPTIONS)
server = aio.server(options=options)
try:
result = await test_if_reuse_port_enabled(server)
if fact == _ENABLE_REUSE_PORT and not result:
self.fail('Enabled reuse port in options, but not observed in socket')
elif fact == _DISABLE_REUSE_PORT and result:
self.fail('Disabled reuse port in options, but observed in socket')
finally:
await server.stop(None)
async def test_client(self):
aio.insecure_channel('[::]:0', options=_TEST_CHANNEL_ARGS)
async def test_server(self):
aio.server(options=_TEST_CHANNEL_ARGS)
async def test_invalid_client_args(self):
for invalid_arg in _INVALID_TEST_CHANNEL_ARGS:
self.assertRaises((ValueError, TypeError),
aio.insecure_channel,
'[::]:0',
options=invalid_arg)
async def test_max_message_length_applied(self):
address, server = await start_test_server()
async with aio.insecure_channel(address, options=(
(_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, _MAX_MESSAGE_LENGTH),
)) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
request = messages_pb2.StreamingOutputCallRequest()
# First request will pass
request.response_parameters.append(
messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH//2,)
)
# Second request should fail
request.response_parameters.append(
messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH*2,)
)
call = stub.StreamingOutputCall(request)
response = await call.read()
self.assertEqual(_MAX_MESSAGE_LENGTH//2, len(response.payload.body))
with self.assertRaises(aio.AioRpcError) as exception_context:
await call.read()
rpc_error = exception_context.exception
self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, rpc_error.code())
self.assertIn(str(_MAX_MESSAGE_LENGTH), rpc_error.details())
self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, await call.code())
await server.stop(None)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main(verbosity=2)
Loading…
Cancel
Save