|
|
|
# Protocol Buffers - Google's data interchange format
|
|
|
|
# Copyright 2023 Google LLC. All rights reserved.
|
|
|
|
# https://developers.google.com/protocol-buffers/
|
|
|
|
#
|
|
|
|
# Redistribution and use in source and binary forms, with or without
|
|
|
|
# modification, are permitted provided that the following conditions are
|
|
|
|
# met:
|
|
|
|
#
|
|
|
|
# * Redistributions of source code must retain the above copyright
|
|
|
|
# notice, this list of conditions and the following disclaimer.
|
|
|
|
# * Redistributions in binary form must reproduce the above
|
|
|
|
# copyright notice, this list of conditions and the following disclaimer
|
|
|
|
# in the documentation and/or other materials provided with the
|
|
|
|
# distribution.
|
|
|
|
# * Neither the name of Google LLC nor the names of its
|
|
|
|
# contributors may be used to endorse or promote products derived from
|
|
|
|
# this software without specific prior written permission.
|
|
|
|
#
|
|
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
|
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
|
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
|
|
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
|
|
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
|
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
|
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
|
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
|
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
|
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
|
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
|
|
|
|
"""A bare-bones unit test that doesn't load any generated code."""
|
|
|
|
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
from google.protobuf.pyext import _message
|
|
|
|
from google3.net.proto2.python.internal import api_implementation
|
|
|
|
from google.protobuf import unittest_pb2
|
|
|
|
from google.protobuf import map_unittest_pb2
|
|
|
|
from google.protobuf import descriptor_pool
|
|
|
|
from google.protobuf import text_format
|
|
|
|
from google.protobuf import message_factory
|
|
|
|
from google.protobuf import message
|
|
|
|
from google3.net.proto2.python.internal import factory_test1_pb2
|
|
|
|
from google3.net.proto2.python.internal import factory_test2_pb2
|
|
|
|
from google3.net.proto2.python.internal import more_extensions_pb2
|
|
|
|
from google.protobuf import descriptor_pb2
|
|
|
|
|
|
|
|
class TestMessageExtension(unittest.TestCase):
|
|
|
|
|
|
|
|
def test_descriptor_pool(self):
|
|
|
|
serialized_desc = b'\n\ntest.proto\"\x0e\n\x02M1*\x08\x08\x01\x10\x80\x80\x80\x80\x02:\x15\n\x08test_ext\x12\x03.M1\x18\x01 \x01(\x05'
|
|
|
|
pool = _message.DescriptorPool()
|
|
|
|
file_desc = pool.AddSerializedFile(serialized_desc)
|
|
|
|
self.assertEqual("test.proto", file_desc.name)
|
|
|
|
ext_desc = pool.FindExtensionByName("test_ext")
|
|
|
|
self.assertEqual(1, ext_desc.number)
|
|
|
|
|
|
|
|
# Test object cache: repeatedly retrieving the same descriptor
|
|
|
|
# should result in the same object
|
|
|
|
self.assertIs(ext_desc, pool.FindExtensionByName("test_ext"))
|
|
|
|
|
|
|
|
|
|
|
|
def test_lib_is_upb(self):
|
|
|
|
# Ensure we are not pulling in a different protobuf library on the
|
|
|
|
# system.
|
|
|
|
print(_message._IS_UPB)
|
|
|
|
self.assertTrue(_message._IS_UPB)
|
|
|
|
self.assertEqual(api_implementation.Type(), "cpp")
|
|
|
|
|
|
|
|
def test_repeated_field_slice_delete(self):
|
|
|
|
def test_slice(start, end, step):
|
|
|
|
vals = list(range(20))
|
|
|
|
message = unittest_pb2.TestAllTypes(repeated_int32=vals)
|
|
|
|
del vals[start:end:step]
|
|
|
|
del message.repeated_int32[start:end:step]
|
|
|
|
self.assertEqual(vals, list(message.repeated_int32))
|
|
|
|
test_slice(3, 11, 1)
|
|
|
|
test_slice(3, 11, 2)
|
|
|
|
test_slice(3, 11, 3)
|
|
|
|
test_slice(11, 3, -1)
|
|
|
|
test_slice(11, 3, -2)
|
|
|
|
test_slice(11, 3, -3)
|
|
|
|
test_slice(10, 25, 4)
|
|
|
|
|
|
|
|
def testExtensionsErrors(self):
|
|
|
|
msg = unittest_pb2.TestAllTypes()
|
|
|
|
self.assertRaises(AttributeError, getattr, msg, 'Extensions')
|
|
|
|
|
|
|
|
def testClearStubMapField(self):
|
|
|
|
msg = map_unittest_pb2.TestMapSubmessage()
|
|
|
|
int32_map = msg.test_map.map_int32_int32
|
|
|
|
msg.test_map.ClearField("map_int32_int32")
|
|
|
|
int32_map[123] = 456
|
|
|
|
self.assertEqual(0, msg.test_map.ByteSize())
|
|
|
|
|
|
|
|
def testClearReifiedMapField(self):
|
|
|
|
msg = map_unittest_pb2.TestMap()
|
|
|
|
int32_map = msg.map_int32_int32
|
|
|
|
int32_map[123] = 456
|
|
|
|
msg.ClearField("map_int32_int32")
|
|
|
|
int32_map[111] = 222
|
|
|
|
self.assertEqual(0, msg.ByteSize())
|
|
|
|
|
|
|
|
def testClearStubRepeatedField(self):
|
|
|
|
msg = unittest_pb2.NestedTestAllTypes()
|
|
|
|
int32_array = msg.payload.repeated_int32
|
|
|
|
msg.payload.ClearField("repeated_int32")
|
|
|
|
int32_array.append(123)
|
|
|
|
self.assertEqual(0, msg.payload.ByteSize())
|
|
|
|
|
|
|
|
def testClearReifiedRepeatdField(self):
|
|
|
|
msg = unittest_pb2.TestAllTypes()
|
|
|
|
int32_array = msg.repeated_int32
|
|
|
|
int32_array.append(123)
|
|
|
|
self.assertNotEqual(0, msg.ByteSize())
|
|
|
|
msg.ClearField("repeated_int32")
|
|
|
|
int32_array.append(123)
|
|
|
|
self.assertEqual(0, msg.ByteSize())
|
|
|
|
|
|
|
|
def testFloatPrinting(self):
|
|
|
|
message = unittest_pb2.TestAllTypes()
|
|
|
|
message.optional_float = -0.0
|
|
|
|
self.assertEqual(str(message), 'optional_float: -0\n')
|
|
|
|
|
|
|
|
class OversizeProtosTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
msg = unittest_pb2.NestedTestAllTypes()
|
|
|
|
m = msg
|
|
|
|
for i in range(101):
|
|
|
|
m = m.child
|
|
|
|
m.Clear()
|
|
|
|
self.p_serialized = msg.SerializeToString()
|
|
|
|
|
|
|
|
def testAssertOversizeProto(self):
|
|
|
|
from google.protobuf.pyext._message import SetAllowOversizeProtos
|
|
|
|
SetAllowOversizeProtos(False)
|
|
|
|
q = unittest_pb2.NestedTestAllTypes()
|
|
|
|
with self.assertRaises(message.DecodeError):
|
|
|
|
q.ParseFromString(self.p_serialized)
|
|
|
|
print(q)
|
|
|
|
|
|
|
|
def testSucceedOversizeProto(self):
|
|
|
|
from google.protobuf.pyext._message import SetAllowOversizeProtos
|
|
|
|
SetAllowOversizeProtos(True)
|
|
|
|
q = unittest_pb2.NestedTestAllTypes()
|
|
|
|
q.ParseFromString(self.p_serialized)
|
|
|
|
|
|
|
|
def testExtensionIter(self):
|
|
|
|
extendee_proto = more_extensions_pb2.ExtendedMessage()
|
|
|
|
|
|
|
|
extension_int32 = more_extensions_pb2.optional_int_extension
|
|
|
|
extendee_proto.Extensions[extension_int32] = 23
|
|
|
|
|
|
|
|
extension_repeated = more_extensions_pb2.repeated_int_extension
|
|
|
|
extendee_proto.Extensions[extension_repeated].append(11)
|
|
|
|
|
|
|
|
extension_msg = more_extensions_pb2.optional_message_extension
|
|
|
|
extendee_proto.Extensions[extension_msg].foreign_message_int = 56
|
|
|
|
|
|
|
|
# Set some normal fields.
|
|
|
|
extendee_proto.optional_int32 = 1
|
|
|
|
extendee_proto.repeated_string.append('hi')
|
|
|
|
|
|
|
|
expected = {
|
|
|
|
extension_int32: True,
|
|
|
|
extension_msg: True,
|
|
|
|
extension_repeated: True
|
|
|
|
}
|
|
|
|
count = 0
|
|
|
|
for item in extendee_proto.Extensions:
|
|
|
|
del expected[item]
|
|
|
|
self.assertIn(item, extendee_proto.Extensions)
|
|
|
|
count += 1
|
|
|
|
self.assertEqual(count, 3)
|
|
|
|
self.assertEqual(len(expected), 0)
|
|
|
|
|
|
|
|
def testIsInitializedStub(self):
|
|
|
|
proto = unittest_pb2.TestRequiredForeign()
|
|
|
|
self.assertTrue(proto.IsInitialized())
|
|
|
|
self.assertFalse(proto.optional_message.IsInitialized())
|
|
|
|
errors = []
|
|
|
|
self.assertFalse(proto.optional_message.IsInitialized(errors))
|
|
|
|
self.assertEqual(['a', 'b', 'c'], errors)
|
|
|
|
self.assertRaises(message.EncodeError, proto.optional_message.SerializeToString)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main(verbosity=2)
|