Merge pull request #22662 from lidizheng/aio-reflection

[Aio] Add AsyncIO support to grpcio-reflection
pull/22694/head
Lidi Zheng 5 years ago committed by GitHub
commit b6a7d5505c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/python/grpcio_reflection/grpc_reflection/v1alpha/BUILD.bazel
  2. 57
      src/python/grpcio_reflection/grpc_reflection/v1alpha/_async.py
  3. 110
      src/python/grpcio_reflection/grpc_reflection/v1alpha/_base.py
  4. 137
      src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py
  5. 30
      src/python/grpcio_tests/tests_aio/reflection/BUILD.bazel
  6. 13
      src/python/grpcio_tests/tests_aio/reflection/__init__.py
  7. 193
      src/python/grpcio_tests/tests_aio/reflection/reflection_servicer_test.py
  8. 1
      src/python/grpcio_tests/tests_aio/tests.json

@ -17,7 +17,7 @@ py_grpc_library(
py_library(
name = "grpc_reflection",
srcs = ["reflection.py"],
srcs = glob(["*.py"]),
imports = ["../../"],
deps = [
":reflection_py_pb2",

@ -0,0 +1,57 @@
# Copyright 2020 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The AsyncIO version of the reflection servicer."""
from typing import AsyncIterable
import grpc
from grpc_reflection.v1alpha import reflection_pb2 as _reflection_pb2
from grpc_reflection.v1alpha._base import BaseReflectionServicer
class ReflectionServicer(BaseReflectionServicer):
"""Servicer handling RPCs for service statuses."""
async def ServerReflectionInfo(
self, request_iterator: AsyncIterable[
_reflection_pb2.ServerReflectionRequest], unused_context
) -> AsyncIterable[_reflection_pb2.ServerReflectionResponse]:
async for request in request_iterator:
if request.HasField('file_by_filename'):
yield self._file_by_filename(request.file_by_filename)
elif request.HasField('file_containing_symbol'):
yield self._file_containing_symbol(
request.file_containing_symbol)
elif request.HasField('file_containing_extension'):
yield self._file_containing_extension(
request.file_containing_extension.containing_type,
request.file_containing_extension.extension_number)
elif request.HasField('all_extension_numbers_of_type'):
yield self._all_extension_numbers_of_type(
request.all_extension_numbers_of_type)
elif request.HasField('list_services'):
yield self._list_services()
else:
yield _reflection_pb2.ServerReflectionResponse(
error_response=_reflection_pb2.ErrorResponse(
error_code=grpc.StatusCode.INVALID_ARGUMENT.value[0],
error_message=grpc.StatusCode.INVALID_ARGUMENT.value[1].
encode(),
))
__all__ = [
"ReflectionServicer",
]

@ -0,0 +1,110 @@
# Copyright 2020 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base implementation of reflection servicer."""
import grpc
from google.protobuf import descriptor_pb2
from google.protobuf import descriptor_pool
from grpc_reflection.v1alpha import reflection_pb2 as _reflection_pb2
from grpc_reflection.v1alpha import reflection_pb2_grpc as _reflection_pb2_grpc
_POOL = descriptor_pool.Default()
def _not_found_error():
return _reflection_pb2.ServerReflectionResponse(
error_response=_reflection_pb2.ErrorResponse(
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
))
def _file_descriptor_response(descriptor):
proto = descriptor_pb2.FileDescriptorProto()
descriptor.CopyToProto(proto)
serialized_proto = proto.SerializeToString()
return _reflection_pb2.ServerReflectionResponse(
file_descriptor_response=_reflection_pb2.FileDescriptorResponse(
file_descriptor_proto=(serialized_proto,)),)
class BaseReflectionServicer(_reflection_pb2_grpc.ServerReflectionServicer):
"""Base class for reflection servicer."""
def __init__(self, service_names, pool=None):
"""Constructor.
Args:
service_names: Iterable of fully-qualified service names available.
pool: An optional DescriptorPool instance.
"""
self._service_names = tuple(sorted(service_names))
self._pool = _POOL if pool is None else pool
def _file_by_filename(self, filename):
try:
descriptor = self._pool.FindFileByName(filename)
except KeyError:
return _not_found_error()
else:
return _file_descriptor_response(descriptor)
def _file_containing_symbol(self, fully_qualified_name):
try:
descriptor = self._pool.FindFileContainingSymbol(
fully_qualified_name)
except KeyError:
return _not_found_error()
else:
return _file_descriptor_response(descriptor)
def _file_containing_extension(self, containing_type, extension_number):
try:
message_descriptor = self._pool.FindMessageTypeByName(
containing_type)
extension_descriptor = self._pool.FindExtensionByNumber(
message_descriptor, extension_number)
descriptor = self._pool.FindFileContainingSymbol(
extension_descriptor.full_name)
except KeyError:
return _not_found_error()
else:
return _file_descriptor_response(descriptor)
def _all_extension_numbers_of_type(self, containing_type):
try:
message_descriptor = self._pool.FindMessageTypeByName(
containing_type)
extension_numbers = tuple(
sorted(extension.number for extension in
self._pool.FindAllExtensions(message_descriptor)))
except KeyError:
return _not_found_error()
else:
return _reflection_pb2.ServerReflectionResponse(
all_extension_numbers_response=_reflection_pb2.
ExtensionNumberResponse(
base_type_name=message_descriptor.full_name,
extension_number=extension_numbers))
def _list_services(self):
return _reflection_pb2.ServerReflectionResponse(
list_services_response=_reflection_pb2.ListServiceResponse(service=[
_reflection_pb2.ServiceResponse(name=service_name)
for service_name in self._service_names
]))
__all__ = ['BaseReflectionServicer']

@ -13,100 +13,21 @@
# limitations under the License.
"""Reference implementation for reflection in gRPC Python."""
import sys
import grpc
from google.protobuf import descriptor_pb2
from google.protobuf import descriptor_pool
from grpc_reflection.v1alpha import reflection_pb2 as _reflection_pb2
from grpc_reflection.v1alpha import reflection_pb2_grpc as _reflection_pb2_grpc
_POOL = descriptor_pool.Default()
from grpc_reflection.v1alpha._base import BaseReflectionServicer
SERVICE_NAME = _reflection_pb2.DESCRIPTOR.services_by_name[
'ServerReflection'].full_name
def _not_found_error():
return _reflection_pb2.ServerReflectionResponse(
error_response=_reflection_pb2.ErrorResponse(
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
))
def _file_descriptor_response(descriptor):
proto = descriptor_pb2.FileDescriptorProto()
descriptor.CopyToProto(proto)
serialized_proto = proto.SerializeToString()
return _reflection_pb2.ServerReflectionResponse(
file_descriptor_response=_reflection_pb2.FileDescriptorResponse(
file_descriptor_proto=(serialized_proto,)),)
class ReflectionServicer(_reflection_pb2_grpc.ServerReflectionServicer):
class ReflectionServicer(BaseReflectionServicer):
"""Servicer handling RPCs for service statuses."""
def __init__(self, service_names, pool=None):
"""Constructor.
Args:
service_names: Iterable of fully-qualified service names available.
"""
self._service_names = tuple(sorted(service_names))
self._pool = _POOL if pool is None else pool
def _file_by_filename(self, filename):
try:
descriptor = self._pool.FindFileByName(filename)
except KeyError:
return _not_found_error()
else:
return _file_descriptor_response(descriptor)
def _file_containing_symbol(self, fully_qualified_name):
try:
descriptor = self._pool.FindFileContainingSymbol(
fully_qualified_name)
except KeyError:
return _not_found_error()
else:
return _file_descriptor_response(descriptor)
def _file_containing_extension(self, containing_type, extension_number):
try:
message_descriptor = self._pool.FindMessageTypeByName(
containing_type)
extension_descriptor = self._pool.FindExtensionByNumber(
message_descriptor, extension_number)
descriptor = self._pool.FindFileContainingSymbol(
extension_descriptor.full_name)
except KeyError:
return _not_found_error()
else:
return _file_descriptor_response(descriptor)
def _all_extension_numbers_of_type(self, containing_type):
try:
message_descriptor = self._pool.FindMessageTypeByName(
containing_type)
extension_numbers = tuple(
sorted(extension.number for extension in
self._pool.FindAllExtensions(message_descriptor)))
except KeyError:
return _not_found_error()
else:
return _reflection_pb2.ServerReflectionResponse(
all_extension_numbers_response=_reflection_pb2.
ExtensionNumberResponse(
base_type_name=message_descriptor.full_name,
extension_number=extension_numbers))
def _list_services(self):
return _reflection_pb2.ServerReflectionResponse(
list_services_response=_reflection_pb2.ListServiceResponse(service=[
_reflection_pb2.ServiceResponse(name=service_name)
for service_name in self._service_names
]))
def ServerReflectionInfo(self, request_iterator, context):
# pylint: disable=unused-argument
for request in request_iterator:
@ -133,13 +54,45 @@ class ReflectionServicer(_reflection_pb2_grpc.ServerReflectionServicer):
))
def enable_server_reflection(service_names, server, pool=None):
"""Enables server reflection on a server.
_enable_server_reflection_doc = """Enables server reflection on a server.
Args:
service_names: Iterable of fully-qualified service names available.
server: grpc.Server to which reflection service will be added.
pool: DescriptorPool object to use (descriptor_pool.Default() if None).
"""
_reflection_pb2_grpc.add_ServerReflectionServicer_to_server(
ReflectionServicer(service_names, pool=pool), server)
Args:
service_names: Iterable of fully-qualified service names available.
server: grpc.Server to which reflection service will be added.
pool: DescriptorPool object to use (descriptor_pool.Default() if None).
"""
if sys.version_info[0] >= 3 and sys.version_info[1] >= 6:
# Exposes AsyncReflectionServicer as public API.
from . import _async as aio
from grpc.experimental import aio as grpc_aio # pylint: disable=ungrouped-imports
def enable_server_reflection(service_names, server, pool=None):
if isinstance(server, grpc_aio.Server):
_reflection_pb2_grpc.add_ServerReflectionServicer_to_server(
aio.ReflectionServicer(service_names, pool=pool), server)
else:
_reflection_pb2_grpc.add_ServerReflectionServicer_to_server(
ReflectionServicer(service_names, pool=pool), server)
enable_server_reflection.__doc__ = _enable_server_reflection_doc
__all__ = [
"SERVICE_NAME",
"ReflectionServicer",
"enable_server_reflection",
"aio",
]
else:
def enable_server_reflection(service_names, server, pool=None):
_reflection_pb2_grpc.add_ServerReflectionServicer_to_server(
ReflectionServicer(service_names, pool=pool), server)
enable_server_reflection.__doc__ = _enable_server_reflection_doc
__all__ = [
"SERVICE_NAME",
"ReflectionServicer",
"enable_server_reflection",
]

@ -0,0 +1,30 @@
# Copyright 2020 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
package(default_testonly = 1)
py_test(
name = "reflection_servicer_test",
srcs = ["reflection_servicer_test.py"],
imports = ["../../"],
python_version = "PY3",
deps = [
"//src/proto/grpc/testing:empty_py_pb2",
"//src/proto/grpc/testing/proto2:empty2_extensions_proto",
"//src/proto/grpc/testing/proto2:empty2_proto",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_reflection/grpc_reflection/v1alpha:grpc_reflection",
"//src/python/grpcio_tests/tests_aio/unit:_test_base",
],
)

@ -0,0 +1,13 @@
# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,193 @@
# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests of grpc_reflection.v1alpha.reflection."""
import logging
import unittest
import grpc
from google.protobuf import descriptor_pb2, descriptor_pool
from grpc.experimental import aio
from grpc_reflection.v1alpha import (reflection, reflection_pb2,
reflection_pb2_grpc)
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing.proto2 import empty2_extensions_pb2
from tests_aio.unit._test_base import AioTestBase
_EMPTY_PROTO_FILE_NAME = 'src/proto/grpc/testing/empty.proto'
_EMPTY_PROTO_SYMBOL_NAME = 'grpc.testing.Empty'
_SERVICE_NAMES = ('Angstrom', 'Bohr', 'Curie', 'Dyson', 'Einstein', 'Feynman',
'Galilei')
_EMPTY_EXTENSIONS_SYMBOL_NAME = 'grpc.testing.proto2.EmptyWithExtensions'
_EMPTY_EXTENSIONS_NUMBERS = (
124,
125,
126,
127,
128,
)
def _file_descriptor_to_proto(descriptor):
proto = descriptor_pb2.FileDescriptorProto()
descriptor.CopyToProto(proto)
return proto.SerializeToString()
class ReflectionServicerTest(AioTestBase):
async def setUp(self):
self._server = aio.server()
reflection.enable_server_reflection(_SERVICE_NAMES, self._server)
port = self._server.add_insecure_port('[::]:0')
await self._server.start()
self._channel = aio.insecure_channel('localhost:%d' % port)
self._stub = reflection_pb2_grpc.ServerReflectionStub(self._channel)
async def tearDown(self):
await self._server.stop(None)
await self._channel.close()
async def test_file_by_name(self):
requests = (
reflection_pb2.ServerReflectionRequest(
file_by_filename=_EMPTY_PROTO_FILE_NAME),
reflection_pb2.ServerReflectionRequest(
file_by_filename='i-donut-exist'),
)
responses = []
async for response in self._stub.ServerReflectionInfo(iter(requests)):
responses.append(response)
expected_responses = (
reflection_pb2.ServerReflectionResponse(
valid_host='',
file_descriptor_response=reflection_pb2.FileDescriptorResponse(
file_descriptor_proto=(
_file_descriptor_to_proto(empty_pb2.DESCRIPTOR),))),
reflection_pb2.ServerReflectionResponse(
valid_host='',
error_response=reflection_pb2.ErrorResponse(
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
)),
)
self.assertSequenceEqual(expected_responses, responses)
async def test_file_by_symbol(self):
requests = (
reflection_pb2.ServerReflectionRequest(
file_containing_symbol=_EMPTY_PROTO_SYMBOL_NAME),
reflection_pb2.ServerReflectionRequest(
file_containing_symbol='i.donut.exist.co.uk.org.net.me.name.foo'
),
)
responses = []
async for response in self._stub.ServerReflectionInfo(iter(requests)):
responses.append(response)
expected_responses = (
reflection_pb2.ServerReflectionResponse(
valid_host='',
file_descriptor_response=reflection_pb2.FileDescriptorResponse(
file_descriptor_proto=(
_file_descriptor_to_proto(empty_pb2.DESCRIPTOR),))),
reflection_pb2.ServerReflectionResponse(
valid_host='',
error_response=reflection_pb2.ErrorResponse(
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
)),
)
self.assertSequenceEqual(expected_responses, responses)
async def test_file_containing_extension(self):
requests = (
reflection_pb2.ServerReflectionRequest(
file_containing_extension=reflection_pb2.ExtensionRequest(
containing_type=_EMPTY_EXTENSIONS_SYMBOL_NAME,
extension_number=125,
),),
reflection_pb2.ServerReflectionRequest(
file_containing_extension=reflection_pb2.ExtensionRequest(
containing_type='i.donut.exist.co.uk.org.net.me.name.foo',
extension_number=55,
),),
)
responses = []
async for response in self._stub.ServerReflectionInfo(iter(requests)):
responses.append(response)
expected_responses = (
reflection_pb2.ServerReflectionResponse(
valid_host='',
file_descriptor_response=reflection_pb2.FileDescriptorResponse(
file_descriptor_proto=(_file_descriptor_to_proto(
empty2_extensions_pb2.DESCRIPTOR),))),
reflection_pb2.ServerReflectionResponse(
valid_host='',
error_response=reflection_pb2.ErrorResponse(
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
)),
)
self.assertSequenceEqual(expected_responses, responses)
async def test_extension_numbers_of_type(self):
requests = (
reflection_pb2.ServerReflectionRequest(
all_extension_numbers_of_type=_EMPTY_EXTENSIONS_SYMBOL_NAME),
reflection_pb2.ServerReflectionRequest(
all_extension_numbers_of_type='i.donut.exist.co.uk.net.name.foo'
),
)
responses = []
async for response in self._stub.ServerReflectionInfo(iter(requests)):
responses.append(response)
expected_responses = (
reflection_pb2.ServerReflectionResponse(
valid_host='',
all_extension_numbers_response=reflection_pb2.
ExtensionNumberResponse(
base_type_name=_EMPTY_EXTENSIONS_SYMBOL_NAME,
extension_number=_EMPTY_EXTENSIONS_NUMBERS)),
reflection_pb2.ServerReflectionResponse(
valid_host='',
error_response=reflection_pb2.ErrorResponse(
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
)),
)
self.assertSequenceEqual(expected_responses, responses)
async def test_list_services(self):
requests = (reflection_pb2.ServerReflectionRequest(list_services='',),)
responses = []
async for response in self._stub.ServerReflectionInfo(iter(requests)):
responses.append(response)
expected_responses = (reflection_pb2.ServerReflectionResponse(
valid_host='',
list_services_response=reflection_pb2.ListServiceResponse(
service=tuple(
reflection_pb2.ServiceResponse(name=name)
for name in _SERVICE_NAMES))),)
self.assertSequenceEqual(expected_responses, responses)
def test_reflection_service_name(self):
self.assertEqual(reflection.SERVICE_NAME,
'grpc.reflection.v1alpha.ServerReflection')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main(verbosity=2)

@ -3,6 +3,7 @@
"health_check.health_servicer_test.HealthServicerTest",
"interop.local_interop_test.InsecureLocalInteropTest",
"interop.local_interop_test.SecureLocalInteropTest",
"reflection.reflection_servicer_test.ReflectionServicerTest",
"unit._metadata_test.TestTypeMetadata",
"unit.abort_test.TestAbort",
"unit.aio_rpc_error_test.TestAioRpcError",

Loading…
Cancel
Save