diff --git a/src/python/grpcio/grpc/experimental/aio/_metadata.py b/src/python/grpcio/grpc/experimental/aio/_metadata.py new file mode 100644 index 00000000000..03c39ba9b7a --- /dev/null +++ b/src/python/grpcio/grpc/experimental/aio/_metadata.py @@ -0,0 +1,72 @@ +# Copyright 2020 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementation of the metadata abstraction for gRPC Asyncio Python.""" +from typing import List, Tuple, AnyStr, Iterator, Any +from collections import abc, OrderedDict + + +class Metadata(abc.Mapping): + """Metadata abstraction for the asynchronous calls and interceptors. + + The metadata is a mapping from str -> List[str] + + Traits + * Multiple entries are allowed for the same key + * The order of the values by key is preserved + * Getting by an element by key, retrieves the first mapped value + * Supports an immutable view of the data + """ + + def __init__(self, *args) -> None: + self._metadata = OrderedDict() + for md_key, md_value in args: + self.add(md_key, md_value) + + def add(self, key: str, value: str) -> None: + key = key.lower() + self._metadata.setdefault(key, []) + self._metadata[key].append(value) + + def __len__(self) -> int: + return len(self._metadata) + + def __getitem__(self, key: str) -> str: + try: + first, *_ = self._metadata[key.lower()] + return first + except ValueError as e: + raise KeyError("{0!r}".format(key)) from e + + def __iter__(self) -> Iterator[Tuple[AnyStr, AnyStr]]: + for key, values in self._metadata.items(): + for value in values: + yield (key, value) + + def view(self) -> Tuple[AnyStr, AnyStr]: + return tuple(self) + + def get_all(self, key: str) -> List[str]: + """For compatibility with other Metadata abstraction objects (like in Java), + this would return all items under the desired . + """ + return self._metadata.get(key.lower(), []) + + def __contains__(self, key: str) -> bool: + return key.lower() in self._metadata + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, self.__class__): + return NotImplemented + + return self._metadata == other._metadata diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index 3e7717b04c7..e19da5f3867 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -19,6 +19,7 @@ import logging import grpc from grpc import _channel +from grpc.experimental.aio._metadata import Metadata from tests.unit import test_common from tests.unit.framework.common import test_constants @@ -237,6 +238,72 @@ class MetadataTest(unittest.TestCase): call.trailing_metadata())) +class MetadataTypeTest(unittest.TestCase): + """Tests for the metadata type""" + + _DEFAULT_DATA = (("key1", "value1"), ("key2", "value2")) + _MULTI_ENTRY_DATA = (("key1", "value1"), ("key1", "other value 1"), + ("key2", "value2")) + + def test_init_metadata(self): + test_cases = { + "emtpy": (), + "with-data": self._DEFAULT_DATA, + } + for case, args in test_cases.items(): + with self.subTest(case=case): + metadata = Metadata(*args) + self.assertEqual(len(metadata), len(args)) + + def test_get_item(self): + metadata = Metadata(("key", "value1"), ("key", "value2"), + ("key2", "other value")) + self.assertEqual(metadata["key"], "value1") + self.assertEqual(metadata["key2"], "other value") + self.assertEqual(metadata.get("key"), "value1") + self.assertEqual(metadata.get("key2"), "other value") + + with self.assertRaises(KeyError): + metadata["key not found"] + self.assertIsNone(metadata.get("key not found")) + + def test_view(self): + self.assertEqual( + Metadata(*self._DEFAULT_DATA).view(), self._DEFAULT_DATA) + + def test_add_value(self): + metadata = Metadata() + metadata.add("key", "value") + metadata.add("key", "second value") + metadata.add("key2", "value2") + + self.assertEqual(metadata["key"], "value") + self.assertEqual(metadata["key2"], "value2") + self.assertEqual(metadata["KEY2"], "value2") + + def test_get_all_items(self): + metadata = Metadata(*self._MULTI_ENTRY_DATA) + self.assertEqual(metadata.get_all("key1"), ["value1", "other value 1"]) + self.assertEqual(metadata.get_all("KEY1"), ["value1", "other value 1"]) + self.assertEqual(metadata.get_all("key2"), ["value2"]) + self.assertEqual(metadata.get_all("non existing key"), []) + + def test_container(self): + metadata = Metadata(*self._MULTI_ENTRY_DATA) + for key in ("key1", "Key1", "KEY1"): + with self.subTest(case=key): + self.assertIn(key, metadata, "{0!r} not found".format(key)) + + def test_equals(self): + metadata = Metadata() + for key, value in self._DEFAULT_DATA: + metadata.add(key, value) + metadata2 = Metadata(*self._DEFAULT_DATA) + + self.assertEqual(metadata, metadata2) + self.assertNotEqual(metadata, "foo") + + if __name__ == '__main__': logging.basicConfig() unittest.main(verbosity=2)