From 5d8a5ef8c7c99f68022e7ad30fba30eeca333940 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 30 Apr 2020 11:07:58 -0700 Subject: [PATCH] Add AsyncIO support for grpcio-channelz --- .../grpc_channelz/v1/BUILD.bazel | 2 +- .../grpc_channelz/v1/_async.py | 66 +++ .../grpc_channelz/v1/_servicer.py | 120 +++++ .../grpc_channelz/v1/channelz.py | 177 +++---- .../tests_aio/channelz/BUILD.bazel | 29 ++ .../tests_aio/channelz/__init__.py | 13 + .../channelz/channelz_servicer_test.py | 446 ++++++++++++++++++ src/python/grpcio_tests/tests_aio/tests.json | 1 + 8 files changed, 730 insertions(+), 124 deletions(-) create mode 100644 src/python/grpcio_channelz/grpc_channelz/v1/_async.py create mode 100644 src/python/grpcio_channelz/grpc_channelz/v1/_servicer.py create mode 100644 src/python/grpcio_tests/tests_aio/channelz/BUILD.bazel create mode 100644 src/python/grpcio_tests/tests_aio/channelz/__init__.py create mode 100644 src/python/grpcio_tests/tests_aio/channelz/channelz_servicer_test.py diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel b/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel index 942d861b158..d2393b06e0a 100644 --- a/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel +++ b/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel @@ -16,7 +16,7 @@ py_grpc_library( py_library( name = "grpc_channelz", - srcs = ["channelz.py"], + srcs = glob(["*.py"]), imports = ["../../"], deps = [ ":channelz_py_pb2", diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/_async.py b/src/python/grpcio_channelz/grpc_channelz/v1/_async.py new file mode 100644 index 00000000000..50911b0fd95 --- /dev/null +++ b/src/python/grpcio_channelz/grpc_channelz/v1/_async.py @@ -0,0 +1,66 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""AsyncIO version of Channelz servicer.""" + +from grpc.experimental import aio + +import grpc_channelz.v1.channelz_pb2 as _channelz_pb2 +import grpc_channelz.v1.channelz_pb2_grpc as _channelz_pb2_grpc +from grpc_channelz.v1._servicer import ChannelzServicer as _SyncChannelzServicer + + +class ChannelzServicer(_channelz_pb2_grpc.ChannelzServicer): + """AsyncIO servicer for handling RPCs for service statuses.""" + + @staticmethod + async def GetTopChannels(request: _channelz_pb2.GetTopChannelsRequest, + context: aio.ServicerContext + ) -> _channelz_pb2.GetTopChannelsResponse: + return _SyncChannelzServicer.GetTopChannels(request, context) + + @staticmethod + async def GetServers(request: _channelz_pb2.GetServersRequest, + context: aio.ServicerContext + ) -> _channelz_pb2.GetServersResponse: + return _SyncChannelzServicer.GetServers(request, context) + + @staticmethod + async def GetServer(request: _channelz_pb2.GetServerRequest, + context: aio.ServicerContext + ) -> _channelz_pb2.GetServerResponse: + return _SyncChannelzServicer.GetServer(request, context) + + @staticmethod + async def GetServerSockets(request: _channelz_pb2.GetServerSocketsRequest, + context: aio.ServicerContext + ) -> _channelz_pb2.GetServerSocketsResponse: + return _SyncChannelzServicer.GetServerSockets(request, context) + + @staticmethod + async def GetChannel(request: _channelz_pb2.GetChannelRequest, + context: aio.ServicerContext + ) -> _channelz_pb2.GetChannelResponse: + return _SyncChannelzServicer.GetChannel(request, context) + + @staticmethod + async def GetSubchannel(request: _channelz_pb2.GetSubchannelRequest, + context: aio.ServicerContext + ) -> _channelz_pb2.GetSubchannelResponse: + return _SyncChannelzServicer.GetSubchannel(request, context) + + @staticmethod + async def GetSocket(request: _channelz_pb2.GetSocketRequest, + context: aio.ServicerContext + ) -> _channelz_pb2.GetSocketResponse: + return _SyncChannelzServicer.GetSocket(request, context) diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/_servicer.py b/src/python/grpcio_channelz/grpc_channelz/v1/_servicer.py new file mode 100644 index 00000000000..6ba28f56a39 --- /dev/null +++ b/src/python/grpcio_channelz/grpc_channelz/v1/_servicer.py @@ -0,0 +1,120 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Channelz debug service implementation in gRPC Python.""" + +import grpc +from grpc._cython import cygrpc + +import grpc_channelz.v1.channelz_pb2 as _channelz_pb2 +import grpc_channelz.v1.channelz_pb2_grpc as _channelz_pb2_grpc + +from google.protobuf import json_format + + +class ChannelzServicer(_channelz_pb2_grpc.ChannelzServicer): + """Servicer handling RPCs for service statuses.""" + + @staticmethod + def GetTopChannels(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_top_channels(request.start_channel_id), + _channelz_pb2.GetTopChannelsResponse(), + ) + except (ValueError, json_format.ParseError) as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetServers(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_servers(request.start_server_id), + _channelz_pb2.GetServersResponse(), + ) + except (ValueError, json_format.ParseError) as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetServer(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_server(request.server_id), + _channelz_pb2.GetServerResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetServerSockets(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_server_sockets(request.server_id, + request.start_socket_id, + request.max_results), + _channelz_pb2.GetServerSocketsResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetChannel(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_channel(request.channel_id), + _channelz_pb2.GetChannelResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetSubchannel(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_subchannel(request.subchannel_id), + _channelz_pb2.GetSubchannelResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + + @staticmethod + def GetSocket(request, context): + try: + return json_format.Parse( + cygrpc.channelz_get_socket(request.socket_id), + _channelz_pb2.GetSocketResponse(), + ) + except ValueError as e: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(str(e)) + except json_format.ParseError as e: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py b/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py index 00eca311dc1..508fad53974 100644 --- a/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py +++ b/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py @@ -13,130 +13,61 @@ # limitations under the License. """Channelz debug service implementation in gRPC Python.""" +import sys import grpc -from grpc._cython import cygrpc -import grpc_channelz.v1.channelz_pb2 as _channelz_pb2 import grpc_channelz.v1.channelz_pb2_grpc as _channelz_pb2_grpc +from grpc_channelz.v1._servicer import ChannelzServicer -from google.protobuf import json_format - - -class ChannelzServicer(_channelz_pb2_grpc.ChannelzServicer): - """Servicer handling RPCs for service statuses.""" - - @staticmethod - def GetTopChannels(request, context): - try: - return json_format.Parse( - cygrpc.channelz_get_top_channels(request.start_channel_id), - _channelz_pb2.GetTopChannelsResponse(), - ) - except (ValueError, json_format.ParseError) as e: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(str(e)) - - @staticmethod - def GetServers(request, context): - try: - return json_format.Parse( - cygrpc.channelz_get_servers(request.start_server_id), - _channelz_pb2.GetServersResponse(), - ) - except (ValueError, json_format.ParseError) as e: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(str(e)) - - @staticmethod - def GetServer(request, context): - try: - return json_format.Parse( - cygrpc.channelz_get_server(request.server_id), - _channelz_pb2.GetServerResponse(), - ) - except ValueError as e: - context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details(str(e)) - except json_format.ParseError as e: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(str(e)) - - @staticmethod - def GetServerSockets(request, context): - try: - return json_format.Parse( - cygrpc.channelz_get_server_sockets(request.server_id, - request.start_socket_id, - request.max_results), - _channelz_pb2.GetServerSocketsResponse(), - ) - except ValueError as e: - context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details(str(e)) - except json_format.ParseError as e: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(str(e)) - - @staticmethod - def GetChannel(request, context): - try: - return json_format.Parse( - cygrpc.channelz_get_channel(request.channel_id), - _channelz_pb2.GetChannelResponse(), - ) - except ValueError as e: - context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details(str(e)) - except json_format.ParseError as e: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(str(e)) - - @staticmethod - def GetSubchannel(request, context): - try: - return json_format.Parse( - cygrpc.channelz_get_subchannel(request.subchannel_id), - _channelz_pb2.GetSubchannelResponse(), - ) - except ValueError as e: - context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details(str(e)) - except json_format.ParseError as e: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(str(e)) - - @staticmethod - def GetSocket(request, context): - try: - return json_format.Parse( - cygrpc.channelz_get_socket(request.socket_id), - _channelz_pb2.GetSocketResponse(), - ) - except ValueError as e: - context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details(str(e)) - except json_format.ParseError as e: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(str(e)) - - -def add_channelz_servicer(server): - """Add Channelz servicer to a server. Channelz servicer is in charge of - pulling information from C-Core for entire process. It will allow the - server to response to Channelz queries. - - The Channelz statistic is enabled by default inside C-Core. Whether the - statistic is enabled or not is isolated from adding Channelz servicer. - That means you can query Channelz info with a Channelz-disabled channel, - and you can add Channelz servicer to a Channelz-disabled server. - - The Channelz statistic can be enabled or disabled by channel option - 'grpc.enable_channelz'. Set to 1 to enable, set to 0 to disable. - - This is an EXPERIMENTAL API. - - Args: - server: grpc.Server to which Channelz service will be added. - """ - _channelz_pb2_grpc.add_ChannelzServicer_to_server(ChannelzServicer(), - server) +_add_channelz_servicer_doc = """Add Channelz servicer to a server. + +Channelz servicer is in charge of +pulling information from C-Core for entire process. It will allow the +server to response to Channelz queries. + +The Channelz statistic is enabled by default inside C-Core. Whether the +statistic is enabled or not is isolated from adding Channelz servicer. +That means you can query Channelz info with a Channelz-disabled channel, +and you can add Channelz servicer to a Channelz-disabled server. + +The Channelz statistic can be enabled or disabled by channel option +'grpc.enable_channelz'. Set to 1 to enable, set to 0 to disable. + +This is an EXPERIMENTAL API. + +Args: + server: A gRPC server to which Channelz service will be added. +""" + +if sys.version_info[0] >= 3 and sys.version_info[1] >= 6: + from grpc_channelz.v1 import _async as aio + + def add_channelz_servicer(server): + + if isinstance(server, grpc.experimental.aio.Server): + _channelz_pb2_grpc.add_ChannelzServicer_to_server( + aio.ChannelzServicer(), server) + else: + _channelz_pb2_grpc.add_ChannelzServicer_to_server( + ChannelzServicer(), server) + + add_channelz_servicer.__doc__ = _add_channelz_servicer_doc + + __all__ = [ + "aio", + "add_channelz_servicer", + "ChannelzServicer", + ] + +else: + + def add_channelz_servicer(server): + _channelz_pb2_grpc.add_ChannelzServicer_to_server( + ChannelzServicer(), server) + + add_channelz_servicer.__doc__ = _add_channelz_servicer_doc + + __all__ = [ + "add_channelz_servicer", + "ChannelzServicer", + ] diff --git a/src/python/grpcio_tests/tests_aio/channelz/BUILD.bazel b/src/python/grpcio_tests/tests_aio/channelz/BUILD.bazel new file mode 100644 index 00000000000..2ead714e446 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/channelz/BUILD.bazel @@ -0,0 +1,29 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package(default_visibility = ["//visibility:public"]) + +py_test( + name = "channelz_servicer_test", + size = "small", + srcs = ["channelz_servicer_test.py"], + imports = ["../../"], + python_version = "PY3", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + "//src/python/grpcio_tests/tests/unit/framework/common", + "//src/python/grpcio_tests/tests_aio/unit:_test_base", + ], +) diff --git a/src/python/grpcio_tests/tests_aio/channelz/__init__.py b/src/python/grpcio_tests/tests_aio/channelz/__init__.py new file mode 100644 index 00000000000..1517f71d093 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/channelz/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_aio/channelz/channelz_servicer_test.py b/src/python/grpcio_tests/tests_aio/channelz/channelz_servicer_test.py new file mode 100644 index 00000000000..021340462e0 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/channelz/channelz_servicer_test.py @@ -0,0 +1,446 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests of grpc_channelz.v1.channelz.""" + +import unittest +import logging +import asyncio + +import grpc +from grpc.experimental import aio + +from grpc_channelz.v1 import channelz +from grpc_channelz.v1 import channelz_pb2 +from grpc_channelz.v1 import channelz_pb2_grpc + +from tests.unit.framework.common import test_constants +from tests_aio.unit._test_base import AioTestBase + +aio.shutdown_grpc_aio() + +_SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary' +_FAILED_UNARY_UNARY = '/test/FailedUnaryUnary' +_SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream' + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x01\x01\x01' + +_DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),) +_ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),) +_DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),) + + +async def _successful_unary_unary(request, servicer_context): + return _RESPONSE + + +async def _failed_unary_unary(request, servicer_context): + servicer_context.set_code(grpc.StatusCode.INTERNAL) + servicer_context.set_details("Channelz Test Intended Failure") + + +async def _successful_stream_stream(request_iterator, servicer_context): + async for _ in request_iterator: + yield _RESPONSE + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY: + return grpc.unary_unary_rpc_method_handler(_successful_unary_unary) + elif handler_call_details.method == _FAILED_UNARY_UNARY: + return grpc.unary_unary_rpc_method_handler(_failed_unary_unary) + elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM: + return grpc.stream_stream_rpc_method_handler( + _successful_stream_stream) + else: + return None + + +class _ChannelServerPair(object): + + async def start(self): + # Server will enable channelz service + self.server = aio.server(options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ) + port = self.server.add_insecure_port('[::]:0') + self.server.add_generic_rpc_handlers((_GenericHandler(),)) + await self.server.start() + + # Channel will enable channelz service... + self.channel = aio.insecure_channel('localhost:%d' % port, + options=_ENABLE_CHANNELZ) + + +# Stores channel-server pairs globally, since the memory deallocation is +# non-deterministic in both Core and Python with multiple threads. The +# destroyed Channelz node might still present. So, as a work around, this +# test doesn't close channel-server-pairs between cases. +_pairs = [] + + +async def _generate_channel_server_pairs(n): + """Creates channel-server pairs globally, returns their indexes.""" + new_pairs = [_ChannelServerPair() for i in range(n)] + for pair in new_pairs: + await pair.start() + _pairs.extend(new_pairs) + return list(range(len(_pairs) - n, len(_pairs))) + + +class ChannelzServicerTest(AioTestBase): + + async def setUp(self): + self._pairs = [] + # This server is for Channelz info fetching only + # It self should not enable Channelz + self._server = aio.server(options=_DISABLE_REUSE_PORT + + _DISABLE_CHANNELZ) + port = self._server.add_insecure_port('[::]:0') + channelz.add_channelz_servicer(self._server) + await self._server.start() + + # This channel is used to fetch Channelz info only + # Channelz should not be enabled + self._channel = aio.insecure_channel('localhost:%d' % port, + options=_DISABLE_CHANNELZ) + self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel) + + async def tearDown(self): + await self._server.stop(None) + await self._channel.close() + + async def _send_successful_unary_unary(self, idx): + call = _pairs[idx].channel.unary_unary(_SUCCESSFUL_UNARY_UNARY)( + _REQUEST) + self.assertEqual(grpc.StatusCode.OK, await call.code()) + + async def _send_failed_unary_unary(self, idx): + try: + await _pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY)(_REQUEST) + except grpc.RpcError: + return + else: + self.fail("This call supposed to fail") + + async def _send_successful_stream_stream(self, idx): + call = _pairs[idx].channel.stream_stream(_SUCCESSFUL_STREAM_STREAM)( + iter([_REQUEST] * test_constants.STREAM_LENGTH)) + cnt = 0 + async for _ in call: + cnt += 1 + self.assertEqual(cnt, test_constants.STREAM_LENGTH) + + async def _get_channel_id(self, idx): + """Channel id may not be consecutive""" + resp = await self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertGreater(len(resp.channel), idx) + return resp.channel[idx].ref.channel_id + + async def _get_server_by_id(self, idx): + """Server id may not be consecutive""" + resp = await self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + return resp.server[idx] + + async def test_get_top_channels_basic(self): + before = await self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + await _generate_channel_server_pairs(1) + after = await self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertEqual(len(after.channel) - len(before.channel), 1) + self.assertEqual(after.end, True) + + async def test_get_top_channels_high_start_id(self): + await _generate_channel_server_pairs(1) + resp = await self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=10000)) + self.assertEqual(len(resp.channel), 0) + self.assertEqual(resp.end, True) + + async def test_successful_request(self): + idx = await _generate_channel_server_pairs(1) + await self._send_successful_unary_unary(idx[0]) + resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[0]))) + self.assertEqual(resp.channel.data.calls_started, 1) + self.assertEqual(resp.channel.data.calls_succeeded, 1) + self.assertEqual(resp.channel.data.calls_failed, 0) + + async def test_failed_request(self): + idx = await _generate_channel_server_pairs(1) + await self._send_failed_unary_unary(idx[0]) + resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[0]))) + self.assertEqual(resp.channel.data.calls_started, 1) + self.assertEqual(resp.channel.data.calls_succeeded, 0) + self.assertEqual(resp.channel.data.calls_failed, 1) + + async def test_many_requests(self): + idx = await _generate_channel_server_pairs(1) + k_success = 7 + k_failed = 9 + for i in range(k_success): + await self._send_successful_unary_unary(idx[0]) + for i in range(k_failed): + await self._send_failed_unary_unary(idx[0]) + resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[0]))) + self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) + self.assertEqual(resp.channel.data.calls_succeeded, k_success) + self.assertEqual(resp.channel.data.calls_failed, k_failed) + + async def test_many_requests_many_channel(self): + k_channels = 4 + idx = await _generate_channel_server_pairs(k_channels) + k_success = 11 + k_failed = 13 + for i in range(k_success): + await self._send_successful_unary_unary(idx[0]) + await self._send_successful_unary_unary(idx[2]) + for i in range(k_failed): + await self._send_failed_unary_unary(idx[1]) + await self._send_failed_unary_unary(idx[2]) + + # The first channel saw only successes + resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[0]))) + self.assertEqual(resp.channel.data.calls_started, k_success) + self.assertEqual(resp.channel.data.calls_succeeded, k_success) + self.assertEqual(resp.channel.data.calls_failed, 0) + + # The second channel saw only failures + resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[1]))) + self.assertEqual(resp.channel.data.calls_started, k_failed) + self.assertEqual(resp.channel.data.calls_succeeded, 0) + self.assertEqual(resp.channel.data.calls_failed, k_failed) + + # The third channel saw both successes and failures + resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[2]))) + self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) + self.assertEqual(resp.channel.data.calls_succeeded, k_success) + self.assertEqual(resp.channel.data.calls_failed, k_failed) + + # The fourth channel saw nothing + resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[3]))) + self.assertEqual(resp.channel.data.calls_started, 0) + self.assertEqual(resp.channel.data.calls_succeeded, 0) + self.assertEqual(resp.channel.data.calls_failed, 0) + + async def test_many_subchannels(self): + k_channels = 4 + idx = await _generate_channel_server_pairs(k_channels) + k_success = 17 + k_failed = 19 + for i in range(k_success): + await self._send_successful_unary_unary(idx[0]) + await self._send_successful_unary_unary(idx[2]) + for i in range(k_failed): + await self._send_failed_unary_unary(idx[1]) + await self._send_failed_unary_unary(idx[2]) + + for i in range(k_channels): + gc_resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[i]))) + # If no call performed in the channel, there shouldn't be any subchannel + if gc_resp.channel.data.calls_started == 0: + self.assertEqual(len(gc_resp.channel.subchannel_ref), 0) + continue + + # Otherwise, the subchannel should exist + self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) + gsc_resp = await self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest( + subchannel_id=gc_resp.channel.subchannel_ref[0]. + subchannel_id)) + self.assertEqual(gc_resp.channel.data.calls_started, + gsc_resp.subchannel.data.calls_started) + self.assertEqual(gc_resp.channel.data.calls_succeeded, + gsc_resp.subchannel.data.calls_succeeded) + self.assertEqual(gc_resp.channel.data.calls_failed, + gsc_resp.subchannel.data.calls_failed) + + async def test_server_call(self): + idx = await _generate_channel_server_pairs(1) + k_success = 23 + k_failed = 29 + for i in range(k_success): + await self._send_successful_unary_unary(idx[0]) + for i in range(k_failed): + await self._send_failed_unary_unary(idx[0]) + + resp = await self._get_server_by_id(idx[0]) + self.assertEqual(resp.data.calls_started, k_success + k_failed) + self.assertEqual(resp.data.calls_succeeded, k_success) + self.assertEqual(resp.data.calls_failed, k_failed) + + async def test_many_subchannels_and_sockets(self): + k_channels = 4 + idx = await _generate_channel_server_pairs(k_channels) + k_success = 3 + k_failed = 5 + for i in range(k_success): + await self._send_successful_unary_unary(idx[0]) + await self._send_successful_unary_unary(idx[2]) + for i in range(k_failed): + await self._send_failed_unary_unary(idx[1]) + await self._send_failed_unary_unary(idx[2]) + + for i in range(k_channels): + gc_resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[i]))) + + # If no call performed in the channel, there shouldn't be any subchannel + if gc_resp.channel.data.calls_started == 0: + self.assertEqual(len(gc_resp.channel.subchannel_ref), 0) + continue + + # Otherwise, the subchannel should exist + self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) + gsc_resp = await self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest( + subchannel_id=gc_resp.channel.subchannel_ref[0]. + subchannel_id)) + self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) + + gs_resp = await self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest( + socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) + self.assertEqual(gsc_resp.subchannel.data.calls_started, + gs_resp.socket.data.streams_started) + self.assertEqual(gsc_resp.subchannel.data.calls_started, + gs_resp.socket.data.streams_succeeded) + # Calls started == messages sent, only valid for unary calls + self.assertEqual(gsc_resp.subchannel.data.calls_started, + gs_resp.socket.data.messages_sent) + + async def test_streaming_rpc(self): + idx = await _generate_channel_server_pairs(1) + # In C++, the argument for _send_successful_stream_stream is message length. + # Here the argument is still channel idx, to be consistent with the other two. + await self._send_successful_stream_stream(idx[0]) + + gc_resp = await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest( + channel_id=await self._get_channel_id(idx[0]))) + self.assertEqual(gc_resp.channel.data.calls_started, 1) + self.assertEqual(gc_resp.channel.data.calls_succeeded, 1) + self.assertEqual(gc_resp.channel.data.calls_failed, 0) + # Subchannel exists + self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) + + gsc_resp = await self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest( + subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id)) + self.assertEqual(gsc_resp.subchannel.data.calls_started, 1) + self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1) + self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0) + # Socket exists + self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) + + gs_resp = await self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest( + socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) + self.assertEqual(gs_resp.socket.data.streams_started, 1) + self.assertEqual(gs_resp.socket.data.streams_succeeded, 1) + self.assertEqual(gs_resp.socket.data.streams_failed, 0) + self.assertEqual(gs_resp.socket.data.messages_sent, + test_constants.STREAM_LENGTH) + self.assertEqual(gs_resp.socket.data.messages_received, + test_constants.STREAM_LENGTH) + + async def test_server_sockets(self): + idx = await _generate_channel_server_pairs(1) + await self._send_successful_unary_unary(idx[0]) + await self._send_failed_unary_unary(idx[0]) + + resp = await self._get_server_by_id(idx[0]) + self.assertEqual(resp.data.calls_started, 2) + self.assertEqual(resp.data.calls_succeeded, 1) + self.assertEqual(resp.data.calls_failed, 1) + + gss_resp = await self._channelz_stub.GetServerSockets( + channelz_pb2.GetServerSocketsRequest(server_id=resp.ref.server_id, + start_socket_id=0)) + # If the RPC call failed, it will raise a grpc.RpcError + # So, if there is no exception raised, considered pass + + async def test_server_listen_sockets(self): + idx = await _generate_channel_server_pairs(1) + + resp = await self._get_server_by_id(idx[0]) + self.assertEqual(len(resp.listen_socket), 1) + + gs_resp = await self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest( + socket_id=resp.listen_socket[0].socket_id)) + # If the RPC call failed, it will raise a grpc.RpcError + # So, if there is no exception raised, considered pass + + async def test_invalid_query_get_server(self): + with self.assertRaises(aio.AioRpcError) as exception_context: + await self._channelz_stub.GetServer( + channelz_pb2.GetServerRequest(server_id=10000)) + self.assertEqual(grpc.StatusCode.NOT_FOUND, + exception_context.exception.code()) + + async def test_invalid_query_get_channel(self): + with self.assertRaises(aio.AioRpcError) as exception_context: + await self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=10000)) + self.assertEqual(grpc.StatusCode.NOT_FOUND, + exception_context.exception.code()) + + async def test_invalid_query_get_subchannel(self): + with self.assertRaises(aio.AioRpcError) as exception_context: + await self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest(subchannel_id=10000)) + self.assertEqual(grpc.StatusCode.NOT_FOUND, + exception_context.exception.code()) + + async def test_invalid_query_get_socket(self): + with self.assertRaises(aio.AioRpcError) as exception_context: + await self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest(socket_id=10000)) + self.assertEqual(grpc.StatusCode.NOT_FOUND, + exception_context.exception.code()) + + async def test_invalid_query_get_server_sockets(self): + with self.assertRaises(aio.AioRpcError) as exception_context: + await self._channelz_stub.GetServerSockets( + channelz_pb2.GetServerSocketsRequest( + server_id=10000, + start_socket_id=0, + )) + self.assertEqual(grpc.StatusCode.NOT_FOUND, + exception_context.exception.code()) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index e657cfa7e4d..1b5b17b7d8b 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -1,5 +1,6 @@ [ "_sanity._sanity_test.AioSanityTest", + "channelz.channelz_servicer_test.ChannelzServicerTest", "health_check.health_servicer_test.HealthServicerTest", "interop.local_interop_test.InsecureLocalInteropTest", "interop.local_interop_test.SecureLocalInteropTest",