| #! /usr/bin/env python |
| # |
| # Protocol Buffers - Google's data interchange format |
| # Copyright 2008 Google Inc. All rights reserved. |
| # https://developers.google.com/protocol-buffers/ |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # * Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above |
| # copyright notice, this list of conditions and the following disclaimer |
| # in the documentation and/or other materials provided with the |
| # distribution. |
| # * Neither the name of Google Inc. nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| """Test for google.protobuf.internal.well_known_types.""" |
| |
| __author__ = 'jieluo@google.com (Jie Luo)' |
| |
| import collections |
| from datetime import datetime |
| |
| try: |
| import unittest2 as unittest #PY26 |
| except ImportError: |
| import unittest |
| |
| from google.protobuf import any_pb2 |
| from google.protobuf import duration_pb2 |
| from google.protobuf import field_mask_pb2 |
| from google.protobuf import struct_pb2 |
| from google.protobuf import timestamp_pb2 |
| from google.protobuf import unittest_pb2 |
| from google.protobuf.internal import any_test_pb2 |
| from google.protobuf.internal import test_util |
| from google.protobuf.internal import well_known_types |
| from google.protobuf import descriptor |
| from google.protobuf import text_format |
| |
| |
| class TimeUtilTestBase(unittest.TestCase): |
| |
| def CheckTimestampConversion(self, message, text): |
| self.assertEqual(text, message.ToJsonString()) |
| parsed_message = timestamp_pb2.Timestamp() |
| parsed_message.FromJsonString(text) |
| self.assertEqual(message, parsed_message) |
| |
| def CheckDurationConversion(self, message, text): |
| self.assertEqual(text, message.ToJsonString()) |
| parsed_message = duration_pb2.Duration() |
| parsed_message.FromJsonString(text) |
| self.assertEqual(message, parsed_message) |
| |
| |
| class TimeUtilTest(TimeUtilTestBase): |
| |
| def testTimestampSerializeAndParse(self): |
| message = timestamp_pb2.Timestamp() |
| # Generated output should contain 3, 6, or 9 fractional digits. |
| message.seconds = 0 |
| message.nanos = 0 |
| self.CheckTimestampConversion(message, '1970-01-01T00:00:00Z') |
| message.nanos = 10000000 |
| self.CheckTimestampConversion(message, '1970-01-01T00:00:00.010Z') |
| message.nanos = 10000 |
| self.CheckTimestampConversion(message, '1970-01-01T00:00:00.000010Z') |
| message.nanos = 10 |
| self.CheckTimestampConversion(message, '1970-01-01T00:00:00.000000010Z') |
| # Test min timestamps. |
| message.seconds = -62135596800 |
| message.nanos = 0 |
| self.CheckTimestampConversion(message, '0001-01-01T00:00:00Z') |
| # Test max timestamps. |
| message.seconds = 253402300799 |
| message.nanos = 999999999 |
| self.CheckTimestampConversion(message, '9999-12-31T23:59:59.999999999Z') |
| # Test negative timestamps. |
| message.seconds = -1 |
| self.CheckTimestampConversion(message, '1969-12-31T23:59:59.999999999Z') |
| |
| # Parsing accepts an fractional digits as long as they fit into nano |
| # precision. |
| message.FromJsonString('1970-01-01T00:00:00.1Z') |
| self.assertEqual(0, message.seconds) |
| self.assertEqual(100000000, message.nanos) |
| # Parsing accepts offsets. |
| message.FromJsonString('1970-01-01T00:00:00-08:00') |
| self.assertEqual(8 * 3600, message.seconds) |
| self.assertEqual(0, message.nanos) |
| |
| # It is not easy to check with current time. For test coverage only. |
| message.GetCurrentTime() |
| self.assertNotEqual(8 * 3600, message.seconds) |
| |
| def testDurationSerializeAndParse(self): |
| message = duration_pb2.Duration() |
| # Generated output should contain 3, 6, or 9 fractional digits. |
| message.seconds = 0 |
| message.nanos = 0 |
| self.CheckDurationConversion(message, '0s') |
| message.nanos = 10000000 |
| self.CheckDurationConversion(message, '0.010s') |
| message.nanos = 10000 |
| self.CheckDurationConversion(message, '0.000010s') |
| message.nanos = 10 |
| self.CheckDurationConversion(message, '0.000000010s') |
| |
| # Test min and max |
| message.seconds = 315576000000 |
| message.nanos = 999999999 |
| self.CheckDurationConversion(message, '315576000000.999999999s') |
| message.seconds = -315576000000 |
| message.nanos = -999999999 |
| self.CheckDurationConversion(message, '-315576000000.999999999s') |
| |
| # Parsing accepts an fractional digits as long as they fit into nano |
| # precision. |
| message.FromJsonString('0.1s') |
| self.assertEqual(100000000, message.nanos) |
| message.FromJsonString('0.0000001s') |
| self.assertEqual(100, message.nanos) |
| |
| def testTimestampIntegerConversion(self): |
| message = timestamp_pb2.Timestamp() |
| message.FromNanoseconds(1) |
| self.assertEqual('1970-01-01T00:00:00.000000001Z', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToNanoseconds()) |
| |
| message.FromNanoseconds(-1) |
| self.assertEqual('1969-12-31T23:59:59.999999999Z', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToNanoseconds()) |
| |
| message.FromMicroseconds(1) |
| self.assertEqual('1970-01-01T00:00:00.000001Z', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToMicroseconds()) |
| |
| message.FromMicroseconds(-1) |
| self.assertEqual('1969-12-31T23:59:59.999999Z', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToMicroseconds()) |
| |
| message.FromMilliseconds(1) |
| self.assertEqual('1970-01-01T00:00:00.001Z', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToMilliseconds()) |
| |
| message.FromMilliseconds(-1) |
| self.assertEqual('1969-12-31T23:59:59.999Z', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToMilliseconds()) |
| |
| message.FromSeconds(1) |
| self.assertEqual('1970-01-01T00:00:01Z', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToSeconds()) |
| |
| message.FromSeconds(-1) |
| self.assertEqual('1969-12-31T23:59:59Z', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToSeconds()) |
| |
| message.FromNanoseconds(1999) |
| self.assertEqual(1, message.ToMicroseconds()) |
| # For negative values, Timestamp will be rounded down. |
| # For example, "1969-12-31T23:59:59.5Z" (i.e., -0.5s) rounded to seconds |
| # will be "1969-12-31T23:59:59Z" (i.e., -1s) rather than |
| # "1970-01-01T00:00:00Z" (i.e., 0s). |
| message.FromNanoseconds(-1999) |
| self.assertEqual(-2, message.ToMicroseconds()) |
| |
| def testDurationIntegerConversion(self): |
| message = duration_pb2.Duration() |
| message.FromNanoseconds(1) |
| self.assertEqual('0.000000001s', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToNanoseconds()) |
| |
| message.FromNanoseconds(-1) |
| self.assertEqual('-0.000000001s', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToNanoseconds()) |
| |
| message.FromMicroseconds(1) |
| self.assertEqual('0.000001s', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToMicroseconds()) |
| |
| message.FromMicroseconds(-1) |
| self.assertEqual('-0.000001s', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToMicroseconds()) |
| |
| message.FromMilliseconds(1) |
| self.assertEqual('0.001s', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToMilliseconds()) |
| |
| message.FromMilliseconds(-1) |
| self.assertEqual('-0.001s', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToMilliseconds()) |
| |
| message.FromSeconds(1) |
| self.assertEqual('1s', message.ToJsonString()) |
| self.assertEqual(1, message.ToSeconds()) |
| |
| message.FromSeconds(-1) |
| self.assertEqual('-1s', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToSeconds()) |
| |
| # Test truncation behavior. |
| message.FromNanoseconds(1999) |
| self.assertEqual(1, message.ToMicroseconds()) |
| |
| # For negative values, Duration will be rounded towards 0. |
| message.FromNanoseconds(-1999) |
| self.assertEqual(-1, message.ToMicroseconds()) |
| |
| def testDatetimeConverison(self): |
| message = timestamp_pb2.Timestamp() |
| dt = datetime(1970, 1, 1) |
| message.FromDatetime(dt) |
| self.assertEqual(dt, message.ToDatetime()) |
| |
| message.FromMilliseconds(1999) |
| self.assertEqual(datetime(1970, 1, 1, 0, 0, 1, 999000), |
| message.ToDatetime()) |
| |
| def testTimedeltaConversion(self): |
| message = duration_pb2.Duration() |
| message.FromNanoseconds(1999999999) |
| td = message.ToTimedelta() |
| self.assertEqual(1, td.seconds) |
| self.assertEqual(999999, td.microseconds) |
| |
| message.FromNanoseconds(-1999999999) |
| td = message.ToTimedelta() |
| self.assertEqual(-1, td.days) |
| self.assertEqual(86398, td.seconds) |
| self.assertEqual(1, td.microseconds) |
| |
| message.FromMicroseconds(-1) |
| td = message.ToTimedelta() |
| self.assertEqual(-1, td.days) |
| self.assertEqual(86399, td.seconds) |
| self.assertEqual(999999, td.microseconds) |
| converted_message = duration_pb2.Duration() |
| converted_message.FromTimedelta(td) |
| self.assertEqual(message, converted_message) |
| |
| def testInvalidTimestamp(self): |
| message = timestamp_pb2.Timestamp() |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| 'Failed to parse timestamp: missing valid timezone offset.', |
| message.FromJsonString, |
| '') |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| 'Failed to parse timestamp: invalid trailing data ' |
| '1970-01-01T00:00:01Ztrail.', |
| message.FromJsonString, |
| '1970-01-01T00:00:01Ztrail') |
| self.assertRaisesRegexp( |
| ValueError, |
| 'time data \'10000-01-01T00:00:00\' does not match' |
| ' format \'%Y-%m-%dT%H:%M:%S\'', |
| message.FromJsonString, '10000-01-01T00:00:00.00Z') |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| 'nanos 0123456789012 more than 9 fractional digits.', |
| message.FromJsonString, |
| '1970-01-01T00:00:00.0123456789012Z') |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| (r'Invalid timezone offset value: \+08.'), |
| message.FromJsonString, |
| '1972-01-01T01:00:00.01+08',) |
| self.assertRaisesRegexp( |
| ValueError, |
| 'year (0 )?is out of range', |
| message.FromJsonString, |
| '0000-01-01T00:00:00Z') |
| message.seconds = 253402300800 |
| self.assertRaisesRegexp( |
| OverflowError, |
| 'date value out of range', |
| message.ToJsonString) |
| |
| def testInvalidDuration(self): |
| message = duration_pb2.Duration() |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| 'Duration must end with letter "s": 1.', |
| message.FromJsonString, '1') |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| 'Couldn\'t parse duration: 1...2s.', |
| message.FromJsonString, '1...2s') |
| text = '-315576000001.000000000s' |
| self.assertRaisesRegexp( |
| well_known_types.Error, |
| r'Duration is not valid\: Seconds -315576000001 must be in range' |
| r' \[-315576000000\, 315576000000\].', |
| message.FromJsonString, text) |
| text = '315576000001.000000000s' |
| self.assertRaisesRegexp( |
| well_known_types.Error, |
| r'Duration is not valid\: Seconds 315576000001 must be in range' |
| r' \[-315576000000\, 315576000000\].', |
| message.FromJsonString, text) |
| message.seconds = -315576000001 |
| message.nanos = 0 |
| self.assertRaisesRegexp( |
| well_known_types.Error, |
| r'Duration is not valid\: Seconds -315576000001 must be in range' |
| r' \[-315576000000\, 315576000000\].', |
| message.ToJsonString) |
| message.seconds = 0 |
| message.nanos = 999999999 + 1 |
| self.assertRaisesRegexp( |
| well_known_types.Error, |
| r'Duration is not valid\: Nanos 1000000000 must be in range' |
| r' \[-999999999\, 999999999\].', |
| message.ToJsonString) |
| message.seconds = -1 |
| message.nanos = 1 |
| self.assertRaisesRegexp( |
| well_known_types.Error, |
| r'Duration is not valid\: Sign mismatch.', |
| message.ToJsonString) |
| |
| |
| class FieldMaskTest(unittest.TestCase): |
| |
| def testStringFormat(self): |
| mask = field_mask_pb2.FieldMask() |
| self.assertEqual('', mask.ToJsonString()) |
| mask.paths.append('foo') |
| self.assertEqual('foo', mask.ToJsonString()) |
| mask.paths.append('bar') |
| self.assertEqual('foo,bar', mask.ToJsonString()) |
| |
| mask.FromJsonString('') |
| self.assertEqual('', mask.ToJsonString()) |
| mask.FromJsonString('foo') |
| self.assertEqual(['foo'], mask.paths) |
| mask.FromJsonString('foo,bar') |
| self.assertEqual(['foo', 'bar'], mask.paths) |
| |
| # Test camel case |
| mask.Clear() |
| mask.paths.append('foo_bar') |
| self.assertEqual('fooBar', mask.ToJsonString()) |
| mask.paths.append('bar_quz') |
| self.assertEqual('fooBar,barQuz', mask.ToJsonString()) |
| |
| mask.FromJsonString('') |
| self.assertEqual('', mask.ToJsonString()) |
| mask.FromJsonString('fooBar') |
| self.assertEqual(['foo_bar'], mask.paths) |
| mask.FromJsonString('fooBar,barQuz') |
| self.assertEqual(['foo_bar', 'bar_quz'], mask.paths) |
| |
| def testDescriptorToFieldMask(self): |
| mask = field_mask_pb2.FieldMask() |
| msg_descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR |
| mask.AllFieldsFromDescriptor(msg_descriptor) |
| self.assertEqual(75, len(mask.paths)) |
| self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) |
| for field in msg_descriptor.fields: |
| self.assertTrue(field.name in mask.paths) |
| |
| def testIsValidForDescriptor(self): |
| msg_descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR |
| # Empty mask |
| mask = field_mask_pb2.FieldMask() |
| self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) |
| # All fields from descriptor |
| mask.AllFieldsFromDescriptor(msg_descriptor) |
| self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) |
| # Child under optional message |
| mask.paths.append('optional_nested_message.bb') |
| self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) |
| # Repeated field is only allowed in the last position of path |
| mask.paths.append('repeated_nested_message.bb') |
| self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) |
| # Invalid top level field |
| mask = field_mask_pb2.FieldMask() |
| mask.paths.append('xxx') |
| self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) |
| # Invalid field in root |
| mask = field_mask_pb2.FieldMask() |
| mask.paths.append('xxx.zzz') |
| self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) |
| # Invalid field in internal node |
| mask = field_mask_pb2.FieldMask() |
| mask.paths.append('optional_nested_message.xxx.zzz') |
| self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) |
| # Invalid field in leaf |
| mask = field_mask_pb2.FieldMask() |
| mask.paths.append('optional_nested_message.xxx') |
| self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) |
| |
| def testCanonicalFrom(self): |
| mask = field_mask_pb2.FieldMask() |
| out_mask = field_mask_pb2.FieldMask() |
| # Paths will be sorted. |
| mask.FromJsonString('baz.quz,bar,foo') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('bar,baz.quz,foo', out_mask.ToJsonString()) |
| # Duplicated paths will be removed. |
| mask.FromJsonString('foo,bar,foo') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('bar,foo', out_mask.ToJsonString()) |
| # Sub-paths of other paths will be removed. |
| mask.FromJsonString('foo.b1,bar.b1,foo.b2,bar') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('bar,foo.b1,foo.b2', out_mask.ToJsonString()) |
| |
| # Test more deeply nested cases. |
| mask.FromJsonString( |
| 'foo.bar.baz1,foo.bar.baz2.quz,foo.bar.baz2') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('foo.bar.baz1,foo.bar.baz2', |
| out_mask.ToJsonString()) |
| mask.FromJsonString( |
| 'foo.bar.baz1,foo.bar.baz2,foo.bar.baz2.quz') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('foo.bar.baz1,foo.bar.baz2', |
| out_mask.ToJsonString()) |
| mask.FromJsonString( |
| 'foo.bar.baz1,foo.bar.baz2,foo.bar.baz2.quz,foo.bar') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('foo.bar', out_mask.ToJsonString()) |
| mask.FromJsonString( |
| 'foo.bar.baz1,foo.bar.baz2,foo.bar.baz2.quz,foo') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('foo', out_mask.ToJsonString()) |
| |
| def testUnion(self): |
| mask1 = field_mask_pb2.FieldMask() |
| mask2 = field_mask_pb2.FieldMask() |
| out_mask = field_mask_pb2.FieldMask() |
| mask1.FromJsonString('foo,baz') |
| mask2.FromJsonString('bar,quz') |
| out_mask.Union(mask1, mask2) |
| self.assertEqual('bar,baz,foo,quz', out_mask.ToJsonString()) |
| # Overlap with duplicated paths. |
| mask1.FromJsonString('foo,baz.bb') |
| mask2.FromJsonString('baz.bb,quz') |
| out_mask.Union(mask1, mask2) |
| self.assertEqual('baz.bb,foo,quz', out_mask.ToJsonString()) |
| # Overlap with paths covering some other paths. |
| mask1.FromJsonString('foo.bar.baz,quz') |
| mask2.FromJsonString('foo.bar,bar') |
| out_mask.Union(mask1, mask2) |
| self.assertEqual('bar,foo.bar,quz', out_mask.ToJsonString()) |
| src = unittest_pb2.TestAllTypes() |
| with self.assertRaises(ValueError): |
| out_mask.Union(src, mask2) |
| |
| def testIntersect(self): |
| mask1 = field_mask_pb2.FieldMask() |
| mask2 = field_mask_pb2.FieldMask() |
| out_mask = field_mask_pb2.FieldMask() |
| # Test cases without overlapping. |
| mask1.FromJsonString('foo,baz') |
| mask2.FromJsonString('bar,quz') |
| out_mask.Intersect(mask1, mask2) |
| self.assertEqual('', out_mask.ToJsonString()) |
| # Overlap with duplicated paths. |
| mask1.FromJsonString('foo,baz.bb') |
| mask2.FromJsonString('baz.bb,quz') |
| out_mask.Intersect(mask1, mask2) |
| self.assertEqual('baz.bb', out_mask.ToJsonString()) |
| # Overlap with paths covering some other paths. |
| mask1.FromJsonString('foo.bar.baz,quz') |
| mask2.FromJsonString('foo.bar,bar') |
| out_mask.Intersect(mask1, mask2) |
| self.assertEqual('foo.bar.baz', out_mask.ToJsonString()) |
| mask1.FromJsonString('foo.bar,bar') |
| mask2.FromJsonString('foo.bar.baz,quz') |
| out_mask.Intersect(mask1, mask2) |
| self.assertEqual('foo.bar.baz', out_mask.ToJsonString()) |
| |
| def testMergeMessage(self): |
| # Test merge one field. |
| src = unittest_pb2.TestAllTypes() |
| test_util.SetAllFields(src) |
| for field in src.DESCRIPTOR.fields: |
| if field.containing_oneof: |
| continue |
| field_name = field.name |
| dst = unittest_pb2.TestAllTypes() |
| # Only set one path to mask. |
| mask = field_mask_pb2.FieldMask() |
| mask.paths.append(field_name) |
| mask.MergeMessage(src, dst) |
| # The expected result message. |
| msg = unittest_pb2.TestAllTypes() |
| if field.label == descriptor.FieldDescriptor.LABEL_REPEATED: |
| repeated_src = getattr(src, field_name) |
| repeated_msg = getattr(msg, field_name) |
| if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE: |
| for item in repeated_src: |
| repeated_msg.add().CopyFrom(item) |
| else: |
| repeated_msg.extend(repeated_src) |
| elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE: |
| getattr(msg, field_name).CopyFrom(getattr(src, field_name)) |
| else: |
| setattr(msg, field_name, getattr(src, field_name)) |
| # Only field specified in mask is merged. |
| self.assertEqual(msg, dst) |
| |
| # Test merge nested fields. |
| nested_src = unittest_pb2.NestedTestAllTypes() |
| nested_dst = unittest_pb2.NestedTestAllTypes() |
| nested_src.child.payload.optional_int32 = 1234 |
| nested_src.child.child.payload.optional_int32 = 5678 |
| mask = field_mask_pb2.FieldMask() |
| mask.FromJsonString('child.payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(0, nested_dst.child.child.payload.optional_int32) |
| |
| mask.FromJsonString('child.child.payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(5678, nested_dst.child.child.payload.optional_int32) |
| |
| nested_dst.Clear() |
| mask.FromJsonString('child.child.payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(0, nested_dst.child.payload.optional_int32) |
| self.assertEqual(5678, nested_dst.child.child.payload.optional_int32) |
| |
| nested_dst.Clear() |
| mask.FromJsonString('child') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(5678, nested_dst.child.child.payload.optional_int32) |
| |
| # Test MergeOptions. |
| nested_dst.Clear() |
| nested_dst.child.payload.optional_int64 = 4321 |
| # Message fields will be merged by default. |
| mask.FromJsonString('child.payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(4321, nested_dst.child.payload.optional_int64) |
| # Change the behavior to replace message fields. |
| mask.FromJsonString('child.payload') |
| mask.MergeMessage(nested_src, nested_dst, True, False) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(0, nested_dst.child.payload.optional_int64) |
| |
| # By default, fields missing in source are not cleared in destination. |
| nested_dst.payload.optional_int32 = 1234 |
| self.assertTrue(nested_dst.HasField('payload')) |
| mask.FromJsonString('payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertTrue(nested_dst.HasField('payload')) |
| # But they are cleared when replacing message fields. |
| nested_dst.Clear() |
| nested_dst.payload.optional_int32 = 1234 |
| mask.FromJsonString('payload') |
| mask.MergeMessage(nested_src, nested_dst, True, False) |
| self.assertFalse(nested_dst.HasField('payload')) |
| |
| nested_src.payload.repeated_int32.append(1234) |
| nested_dst.payload.repeated_int32.append(5678) |
| # Repeated fields will be appended by default. |
| mask.FromJsonString('payload.repeatedInt32') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(2, len(nested_dst.payload.repeated_int32)) |
| self.assertEqual(5678, nested_dst.payload.repeated_int32[0]) |
| self.assertEqual(1234, nested_dst.payload.repeated_int32[1]) |
| # Change the behavior to replace repeated fields. |
| mask.FromJsonString('payload.repeatedInt32') |
| mask.MergeMessage(nested_src, nested_dst, False, True) |
| self.assertEqual(1, len(nested_dst.payload.repeated_int32)) |
| self.assertEqual(1234, nested_dst.payload.repeated_int32[0]) |
| |
| # Test Merge oneof field. |
| new_msg = unittest_pb2.TestOneof2() |
| dst = unittest_pb2.TestOneof2() |
| dst.foo_message.qux_int = 1 |
| mask = field_mask_pb2.FieldMask() |
| mask.FromJsonString('fooMessage,fooLazyMessage.quxInt') |
| mask.MergeMessage(new_msg, dst) |
| self.assertTrue(dst.HasField('foo_message')) |
| self.assertFalse(dst.HasField('foo_lazy_message')) |
| |
| def testMergeErrors(self): |
| src = unittest_pb2.TestAllTypes() |
| dst = unittest_pb2.TestAllTypes() |
| mask = field_mask_pb2.FieldMask() |
| test_util.SetAllFields(src) |
| mask.FromJsonString('optionalInt32.field') |
| with self.assertRaises(ValueError) as e: |
| mask.MergeMessage(src, dst) |
| self.assertEqual('Error: Field optional_int32 in message ' |
| 'protobuf_unittest.TestAllTypes is not a singular ' |
| 'message field and cannot have sub-fields.', |
| str(e.exception)) |
| |
| def testSnakeCaseToCamelCase(self): |
| self.assertEqual('fooBar', |
| well_known_types._SnakeCaseToCamelCase('foo_bar')) |
| self.assertEqual('FooBar', |
| well_known_types._SnakeCaseToCamelCase('_foo_bar')) |
| self.assertEqual('foo3Bar', |
| well_known_types._SnakeCaseToCamelCase('foo3_bar')) |
| |
| # No uppercase letter is allowed. |
| self.assertRaisesRegexp( |
| well_known_types.Error, |
| 'Fail to print FieldMask to Json string: Path name Foo must ' |
| 'not contain uppercase letters.', |
| well_known_types._SnakeCaseToCamelCase, |
| 'Foo') |
| # Any character after a "_" must be a lowercase letter. |
| # 1. "_" cannot be followed by another "_". |
| # 2. "_" cannot be followed by a digit. |
| # 3. "_" cannot appear as the last character. |
| self.assertRaisesRegexp( |
| well_known_types.Error, |
| 'Fail to print FieldMask to Json string: The character after a ' |
| '"_" must be a lowercase letter in path name foo__bar.', |
| well_known_types._SnakeCaseToCamelCase, |
| 'foo__bar') |
| self.assertRaisesRegexp( |
| well_known_types.Error, |
| 'Fail to print FieldMask to Json string: The character after a ' |
| '"_" must be a lowercase letter in path name foo_3bar.', |
| well_known_types._SnakeCaseToCamelCase, |
| 'foo_3bar') |
| self.assertRaisesRegexp( |
| well_known_types.Error, |
| 'Fail to print FieldMask to Json string: Trailing "_" in path ' |
| 'name foo_bar_.', |
| well_known_types._SnakeCaseToCamelCase, |
| 'foo_bar_') |
| |
| def testCamelCaseToSnakeCase(self): |
| self.assertEqual('foo_bar', |
| well_known_types._CamelCaseToSnakeCase('fooBar')) |
| self.assertEqual('_foo_bar', |
| well_known_types._CamelCaseToSnakeCase('FooBar')) |
| self.assertEqual('foo3_bar', |
| well_known_types._CamelCaseToSnakeCase('foo3Bar')) |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| 'Fail to parse FieldMask: Path name foo_bar must not contain "_"s.', |
| well_known_types._CamelCaseToSnakeCase, |
| 'foo_bar') |
| |
| |
| class StructTest(unittest.TestCase): |
| |
| def testStruct(self): |
| struct = struct_pb2.Struct() |
| self.assertIsInstance(struct, collections.Mapping) |
| self.assertEqual(0, len(struct)) |
| struct_class = struct.__class__ |
| |
| struct['key1'] = 5 |
| struct['key2'] = 'abc' |
| struct['key3'] = True |
| struct.get_or_create_struct('key4')['subkey'] = 11.0 |
| struct_list = struct.get_or_create_list('key5') |
| self.assertIsInstance(struct_list, collections.Sequence) |
| struct_list.extend([6, 'seven', True, False, None]) |
| struct_list.add_struct()['subkey2'] = 9 |
| struct['key6'] = {'subkey': {}} |
| struct['key7'] = [2, False] |
| |
| self.assertEqual(7, len(struct)) |
| self.assertTrue(isinstance(struct, well_known_types.Struct)) |
| self.assertEqual(5, struct['key1']) |
| self.assertEqual('abc', struct['key2']) |
| self.assertIs(True, struct['key3']) |
| self.assertEqual(11, struct['key4']['subkey']) |
| inner_struct = struct_class() |
| inner_struct['subkey2'] = 9 |
| self.assertEqual([6, 'seven', True, False, None, inner_struct], |
| list(struct['key5'].items())) |
| self.assertEqual({}, dict(struct['key6']['subkey'].fields)) |
| self.assertEqual([2, False], list(struct['key7'].items())) |
| |
| serialized = struct.SerializeToString() |
| struct2 = struct_pb2.Struct() |
| struct2.ParseFromString(serialized) |
| |
| self.assertEqual(struct, struct2) |
| for key, value in struct.items(): |
| self.assertIn(key, struct) |
| self.assertIn(key, struct2) |
| self.assertEqual(value, struct2[key]) |
| |
| self.assertEqual(7, len(struct.keys())) |
| self.assertEqual(7, len(struct.values())) |
| for key in struct.keys(): |
| self.assertIn(key, struct) |
| self.assertIn(key, struct2) |
| self.assertEqual(struct[key], struct2[key]) |
| |
| item = (next(iter(struct.keys())), next(iter(struct.values()))) |
| self.assertEqual(item, next(iter(struct.items()))) |
| |
| self.assertTrue(isinstance(struct2, well_known_types.Struct)) |
| self.assertEqual(5, struct2['key1']) |
| self.assertEqual('abc', struct2['key2']) |
| self.assertIs(True, struct2['key3']) |
| self.assertEqual(11, struct2['key4']['subkey']) |
| self.assertEqual([6, 'seven', True, False, None, inner_struct], |
| list(struct2['key5'].items())) |
| |
| struct_list = struct2['key5'] |
| self.assertEqual(6, struct_list[0]) |
| self.assertEqual('seven', struct_list[1]) |
| self.assertEqual(True, struct_list[2]) |
| self.assertEqual(False, struct_list[3]) |
| self.assertEqual(None, struct_list[4]) |
| self.assertEqual(inner_struct, struct_list[5]) |
| |
| struct_list[1] = 7 |
| self.assertEqual(7, struct_list[1]) |
| |
| struct_list.add_list().extend([1, 'two', True, False, None]) |
| self.assertEqual([1, 'two', True, False, None], |
| list(struct_list[6].items())) |
| struct_list.extend([{'nested_struct': 30}, ['nested_list', 99], {}, []]) |
| self.assertEqual(11, len(struct_list.values)) |
| self.assertEqual(30, struct_list[7]['nested_struct']) |
| self.assertEqual('nested_list', struct_list[8][0]) |
| self.assertEqual(99, struct_list[8][1]) |
| self.assertEqual({}, dict(struct_list[9].fields)) |
| self.assertEqual([], list(struct_list[10].items())) |
| struct_list[0] = {'replace': 'set'} |
| struct_list[1] = ['replace', 'set'] |
| self.assertEqual('set', struct_list[0]['replace']) |
| self.assertEqual(['replace', 'set'], list(struct_list[1].items())) |
| |
| text_serialized = str(struct) |
| struct3 = struct_pb2.Struct() |
| text_format.Merge(text_serialized, struct3) |
| self.assertEqual(struct, struct3) |
| |
| struct.get_or_create_struct('key3')['replace'] = 12 |
| self.assertEqual(12, struct['key3']['replace']) |
| |
| # Tests empty list. |
| struct.get_or_create_list('empty_list') |
| empty_list = struct['empty_list'] |
| self.assertEqual([], list(empty_list.items())) |
| list2 = struct_pb2.ListValue() |
| list2.add_list() |
| empty_list = list2[0] |
| self.assertEqual([], list(empty_list.items())) |
| |
| # Tests empty struct. |
| struct.get_or_create_struct('empty_struct') |
| empty_struct = struct['empty_struct'] |
| self.assertEqual({}, dict(empty_struct.fields)) |
| list2.add_struct() |
| empty_struct = list2[1] |
| self.assertEqual({}, dict(empty_struct.fields)) |
| |
| self.assertEqual(9, len(struct)) |
| del struct['key3'] |
| del struct['key4'] |
| self.assertEqual(7, len(struct)) |
| self.assertEqual(6, len(struct['key5'])) |
| del struct['key5'][1] |
| self.assertEqual(5, len(struct['key5'])) |
| self.assertEqual([6, True, False, None, inner_struct], |
| list(struct['key5'].items())) |
| |
| def testMergeFrom(self): |
| struct = struct_pb2.Struct() |
| struct_class = struct.__class__ |
| |
| dictionary = { |
| 'key1': 5, |
| 'key2': 'abc', |
| 'key3': True, |
| 'key4': {'subkey': 11.0}, |
| 'key5': [6, 'seven', True, False, None, {'subkey2': 9}], |
| 'key6': [['nested_list', True]], |
| 'empty_struct': {}, |
| 'empty_list': [] |
| } |
| struct.update(dictionary) |
| self.assertEqual(5, struct['key1']) |
| self.assertEqual('abc', struct['key2']) |
| self.assertIs(True, struct['key3']) |
| self.assertEqual(11, struct['key4']['subkey']) |
| inner_struct = struct_class() |
| inner_struct['subkey2'] = 9 |
| self.assertEqual([6, 'seven', True, False, None, inner_struct], |
| list(struct['key5'].items())) |
| self.assertEqual(2, len(struct['key6'][0].values)) |
| self.assertEqual('nested_list', struct['key6'][0][0]) |
| self.assertEqual(True, struct['key6'][0][1]) |
| empty_list = struct['empty_list'] |
| self.assertEqual([], list(empty_list.items())) |
| empty_struct = struct['empty_struct'] |
| self.assertEqual({}, dict(empty_struct.fields)) |
| |
| # According to documentation: "When parsing from the wire or when merging, |
| # if there are duplicate map keys the last key seen is used". |
| duplicate = { |
| 'key4': {'replace': 20}, |
| 'key5': [[False, 5]] |
| } |
| struct.update(duplicate) |
| self.assertEqual(1, len(struct['key4'].fields)) |
| self.assertEqual(20, struct['key4']['replace']) |
| self.assertEqual(1, len(struct['key5'].values)) |
| self.assertEqual(False, struct['key5'][0][0]) |
| self.assertEqual(5, struct['key5'][0][1]) |
| |
| |
| class AnyTest(unittest.TestCase): |
| |
| def testAnyMessage(self): |
| # Creates and sets message. |
| msg = any_test_pb2.TestAny() |
| msg_descriptor = msg.DESCRIPTOR |
| all_types = unittest_pb2.TestAllTypes() |
| all_descriptor = all_types.DESCRIPTOR |
| all_types.repeated_string.append(u'\u00fc\ua71f') |
| # Packs to Any. |
| msg.value.Pack(all_types) |
| self.assertEqual(msg.value.type_url, |
| 'type.googleapis.com/%s' % all_descriptor.full_name) |
| self.assertEqual(msg.value.value, |
| all_types.SerializeToString()) |
| # Tests Is() method. |
| self.assertTrue(msg.value.Is(all_descriptor)) |
| self.assertFalse(msg.value.Is(msg_descriptor)) |
| # Unpacks Any. |
| unpacked_message = unittest_pb2.TestAllTypes() |
| self.assertTrue(msg.value.Unpack(unpacked_message)) |
| self.assertEqual(all_types, unpacked_message) |
| # Unpacks to different type. |
| self.assertFalse(msg.value.Unpack(msg)) |
| # Only Any messages have Pack method. |
| try: |
| msg.Pack(all_types) |
| except AttributeError: |
| pass |
| else: |
| raise AttributeError('%s should not have Pack method.' % |
| msg_descriptor.full_name) |
| |
| def testMessageName(self): |
| # Creates and sets message. |
| submessage = any_test_pb2.TestAny() |
| submessage.int_value = 12345 |
| msg = any_pb2.Any() |
| msg.Pack(submessage) |
| self.assertEqual(msg.TypeName(), 'google.protobuf.internal.TestAny') |
| |
| def testPackWithCustomTypeUrl(self): |
| submessage = any_test_pb2.TestAny() |
| submessage.int_value = 12345 |
| msg = any_pb2.Any() |
| # Pack with a custom type URL prefix. |
| msg.Pack(submessage, 'type.myservice.com') |
| self.assertEqual(msg.type_url, |
| 'type.myservice.com/%s' % submessage.DESCRIPTOR.full_name) |
| # Pack with a custom type URL prefix ending with '/'. |
| msg.Pack(submessage, 'type.myservice.com/') |
| self.assertEqual(msg.type_url, |
| 'type.myservice.com/%s' % submessage.DESCRIPTOR.full_name) |
| # Pack with an empty type URL prefix. |
| msg.Pack(submessage, '') |
| self.assertEqual(msg.type_url, |
| '/%s' % submessage.DESCRIPTOR.full_name) |
| # Test unpacking the type. |
| unpacked_message = any_test_pb2.TestAny() |
| self.assertTrue(msg.Unpack(unpacked_message)) |
| self.assertEqual(submessage, unpacked_message) |
| |
| def testPackDeterministic(self): |
| submessage = any_test_pb2.TestAny() |
| for i in range(10): |
| submessage.map_value[str(i)] = i * 2 |
| msg = any_pb2.Any() |
| msg.Pack(submessage, deterministic=True) |
| serialized = msg.SerializeToString(deterministic=True) |
| golden = (b'\n4type.googleapis.com/google.protobuf.internal.TestAny\x12F' |
| b'\x1a\x05\n\x010\x10\x00\x1a\x05\n\x011\x10\x02\x1a\x05\n\x01' |
| b'2\x10\x04\x1a\x05\n\x013\x10\x06\x1a\x05\n\x014\x10\x08\x1a' |
| b'\x05\n\x015\x10\n\x1a\x05\n\x016\x10\x0c\x1a\x05\n\x017\x10' |
| b'\x0e\x1a\x05\n\x018\x10\x10\x1a\x05\n\x019\x10\x12') |
| self.assertEqual(golden, serialized) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |