From d4abad8731f9b49ee13e3c2fae32b73a000591ce Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 30 Jun 2020 20:25:00 +1200 Subject: [PATCH 1/2] Optimize reading strings across segments --- .gitignore | 3 + .../Google.Protobuf.Benchmarks.csproj | 4 + .../ParseRawPrimitivesBenchmark.cs | 42 ++++- .../CodedInputStreamTest.cs | 84 ++++++++- .../ReadOnlySequenceFactory.cs | 17 +- .../Collections/RepeatedField.cs | 20 +- .../Google.Protobuf/ParserInternalState.cs | 12 +- .../src/Google.Protobuf/ParsingPrimitives.cs | 171 +++++++++++++----- 8 files changed, 274 insertions(+), 79 deletions(-) diff --git a/.gitignore b/.gitignore index 0cbcf50380..a7349c2aa8 100644 --- a/.gitignore +++ b/.gitignore @@ -209,3 +209,6 @@ cmake/cmake-build-debug/ # IntelliJ .idea *.iml + +# BenchmarkDotNet +BenchmarkDotNet.Artifacts/ diff --git a/csharp/src/Google.Protobuf.Benchmarks/Google.Protobuf.Benchmarks.csproj b/csharp/src/Google.Protobuf.Benchmarks/Google.Protobuf.Benchmarks.csproj index e2fe2556f1..37bbc3ef2f 100644 --- a/csharp/src/Google.Protobuf.Benchmarks/Google.Protobuf.Benchmarks.csproj +++ b/csharp/src/Google.Protobuf.Benchmarks/Google.Protobuf.Benchmarks.csproj @@ -11,6 +11,10 @@ true + + + + diff --git a/csharp/src/Google.Protobuf.Benchmarks/ParseRawPrimitivesBenchmark.cs b/csharp/src/Google.Protobuf.Benchmarks/ParseRawPrimitivesBenchmark.cs index 2f226a323c..6df1c872c3 100644 --- a/csharp/src/Google.Protobuf.Benchmarks/ParseRawPrimitivesBenchmark.cs +++ b/csharp/src/Google.Protobuf.Benchmarks/ParseRawPrimitivesBenchmark.cs @@ -54,10 +54,12 @@ namespace Google.Protobuf.Benchmarks // key is the encodedSize of string values Dictionary stringInputBuffers; + Dictionary> stringInputBuffersSegmented; Random random = new Random(417384220); // random but deterministic seed public IEnumerable StringEncodedSizes => new[] { 1, 4, 10, 105, 10080 }; + public IEnumerable StringSegmentedEncodedSizes => new[] { 105, 10080 }; [GlobalSetup] public void GlobalSetup() @@ -78,11 +80,18 @@ namespace Google.Protobuf.Benchmarks fixedIntInputBuffer = CreateBufferWithRandomData(random, BytesToParse / sizeof(long), sizeof(long), paddingValueCount); stringInputBuffers = new Dictionary(); - foreach(var encodedSize in StringEncodedSizes) + foreach (var encodedSize in StringEncodedSizes) { byte[] buffer = CreateBufferWithStrings(BytesToParse / encodedSize, encodedSize, encodedSize < 10 ? 10 : 1 ); stringInputBuffers.Add(encodedSize, buffer); } + + stringInputBuffersSegmented = new Dictionary>(); + foreach (var encodedSize in StringSegmentedEncodedSizes) + { + byte[] buffer = CreateBufferWithStrings(BytesToParse / encodedSize, encodedSize, encodedSize < 10 ? 10 : 1); + stringInputBuffersSegmented.Add(encodedSize, ReadOnlySequenceFactory.CreateWithContent(buffer, segmentSize: 128, addEmptySegmentDelimiters: false)); + } } // Total number of bytes that each benchmark will parse. @@ -300,6 +309,19 @@ namespace Google.Protobuf.Benchmarks return sum; } + [Benchmark] + [ArgumentsSource(nameof(StringSegmentedEncodedSizes))] + public int ParseString_ParseContext_MultipleSegments(int encodedSize) + { + InitializeParseContext(stringInputBuffersSegmented[encodedSize], out ParseContext ctx); + int sum = 0; + for (int i = 0; i < BytesToParse / encodedSize; i++) + { + sum += ctx.ReadString().Length; + } + return sum; + } + [Benchmark] [ArgumentsSource(nameof(StringEncodedSizes))] public int ParseBytes_CodedInputStream(int encodedSize) @@ -326,11 +348,29 @@ namespace Google.Protobuf.Benchmarks return sum; } + [Benchmark] + [ArgumentsSource(nameof(StringSegmentedEncodedSizes))] + public int ParseBytes_ParseContext_MultipleSegments(int encodedSize) + { + InitializeParseContext(stringInputBuffersSegmented[encodedSize], out ParseContext ctx); + int sum = 0; + for (int i = 0; i < BytesToParse / encodedSize; i++) + { + sum += ctx.ReadBytes().Length; + } + return sum; + } + private static void InitializeParseContext(byte[] buffer, out ParseContext ctx) { ParseContext.Initialize(new ReadOnlySequence(buffer), out ctx); } + private static void InitializeParseContext(ReadOnlySequence buffer, out ParseContext ctx) + { + ParseContext.Initialize(buffer, out ctx); + } + private static byte[] CreateBufferWithRandomVarints(Random random, int valueCount, int encodedSize, int paddingValueCount) { MemoryStream ms = new MemoryStream(); diff --git a/csharp/src/Google.Protobuf.Test/CodedInputStreamTest.cs b/csharp/src/Google.Protobuf.Test/CodedInputStreamTest.cs index bcc6ace289..1fb3bb5039 100644 --- a/csharp/src/Google.Protobuf.Test/CodedInputStreamTest.cs +++ b/csharp/src/Google.Protobuf.Test/CodedInputStreamTest.cs @@ -324,7 +324,25 @@ namespace Google.Protobuf Assert.AreEqual(message, message2); } } - + + [Test] + public void ReadWholeMessage_VaryingBlockSizes_FromSequence() + { + TestAllTypes message = SampleMessages.CreateFullTestAllTypes(); + + byte[] rawBytes = message.ToByteArray(); + Assert.AreEqual(rawBytes.Length, message.CalculateSize()); + TestAllTypes message2 = TestAllTypes.Parser.ParseFrom(rawBytes); + Assert.AreEqual(message, message2); + + // Try different block sizes. + for (int blockSize = 1; blockSize < 256; blockSize *= 2) + { + message2 = TestAllTypes.Parser.ParseFrom(ReadOnlySequenceFactory.CreateWithContent(rawBytes, blockSize)); + Assert.AreEqual(message, message2); + } + } + [Test] public void ReadHugeBlob() { @@ -365,6 +383,70 @@ namespace Google.Protobuf Assert.Throws(() => input.ReadBytes()); } + [Test] + public void ReadBlobGreaterThanCurrentLimit() + { + MemoryStream ms = new MemoryStream(); + CodedOutputStream output = new CodedOutputStream(ms); + uint tag = WireFormat.MakeTag(1, WireFormat.WireType.LengthDelimited); + output.WriteRawVarint32(tag); + output.WriteRawVarint32(4); + output.WriteRawBytes(new byte[4]); // Pad with a few random bytes. + output.Flush(); + ms.Position = 0; + + CodedInputStream input = new CodedInputStream(ms); + Assert.AreEqual(tag, input.ReadTag()); + + // Specify limit smaller than data length + input.PushLimit(3); + Assert.Throws(() => input.ReadBytes()); + + AssertReadFromParseContext(new ReadOnlySequence(ms.ToArray()), (ref ParseContext ctx) => + { + Assert.AreEqual(tag, ctx.ReadTag()); + SegmentedBufferHelper.PushLimit(ref ctx.state, 3); + try + { + ctx.ReadBytes(); + Assert.Fail(); + } + catch (InvalidProtocolBufferException) {} + }, true); + } + + [Test] + public void ReadStringGreaterThanCurrentLimit() + { + MemoryStream ms = new MemoryStream(); + CodedOutputStream output = new CodedOutputStream(ms); + uint tag = WireFormat.MakeTag(1, WireFormat.WireType.LengthDelimited); + output.WriteRawVarint32(tag); + output.WriteRawVarint32(4); + output.WriteRawBytes(new byte[4]); // Pad with a few random bytes. + output.Flush(); + ms.Position = 0; + + CodedInputStream input = new CodedInputStream(ms.ToArray()); + Assert.AreEqual(tag, input.ReadTag()); + + // Specify limit smaller than data length + input.PushLimit(3); + Assert.Throws(() => input.ReadString()); + + AssertReadFromParseContext(new ReadOnlySequence(ms.ToArray()), (ref ParseContext ctx) => + { + Assert.AreEqual(tag, ctx.ReadTag()); + SegmentedBufferHelper.PushLimit(ref ctx.state, 3); + try + { + ctx.ReadString(); + Assert.Fail(); + } + catch (InvalidProtocolBufferException) { } + }, true); + } + // Representations of a tag for field 0 with various wire types [Test] [TestCase(0)] diff --git a/csharp/src/Google.Protobuf.Test/ReadOnlySequenceFactory.cs b/csharp/src/Google.Protobuf.Test/ReadOnlySequenceFactory.cs index 588b559e9f..f0248ac3c3 100644 --- a/csharp/src/Google.Protobuf.Test/ReadOnlySequenceFactory.cs +++ b/csharp/src/Google.Protobuf.Test/ReadOnlySequenceFactory.cs @@ -41,11 +41,18 @@ namespace Google.Protobuf { internal static class ReadOnlySequenceFactory { - public static ReadOnlySequence CreateWithContent(byte[] data, int segmentSize = 1) + /// + /// Create a sequence from the specified data. The data will be divided up into segments in the sequence. + /// + public static ReadOnlySequence CreateWithContent(byte[] data, int segmentSize = 1, bool addEmptySegmentDelimiters = true) { var segments = new List(); - segments.Add(new byte[0]); + if (addEmptySegmentDelimiters) + { + segments.Add(new byte[0]); + } + var currentIndex = 0; while (currentIndex < data.Length) { @@ -55,7 +62,11 @@ namespace Google.Protobuf segment.Add(data[currentIndex++]); } segments.Add(segment.ToArray()); - segments.Add(new byte[0]); + + if (addEmptySegmentDelimiters) + { + segments.Add(new byte[0]); + } } return CreateSegments(segments.ToArray()); diff --git a/csharp/src/Google.Protobuf/Collections/RepeatedField.cs b/csharp/src/Google.Protobuf/Collections/RepeatedField.cs index 69adcdfafc..19114caa24 100644 --- a/csharp/src/Google.Protobuf/Collections/RepeatedField.cs +++ b/csharp/src/Google.Protobuf/Collections/RepeatedField.cs @@ -133,7 +133,7 @@ namespace Google.Protobuf.Collections // // Check that the supplied length doesn't exceed the underlying buffer. // That prevents a malicious length from initializing a very large collection. - if (codec.FixedSize > 0 && length % codec.FixedSize == 0 && IsDataAvailable(ref ctx, length)) + if (codec.FixedSize > 0 && length % codec.FixedSize == 0 && ParsingPrimitives.IsDataAvailable(ref ctx.state, length)) { EnsureSize(count + (length / codec.FixedSize)); @@ -167,24 +167,6 @@ namespace Google.Protobuf.Collections } } - private bool IsDataAvailable(ref ParseContext ctx, int size) - { - // Data fits in remaining buffer - if (size <= ctx.state.bufferSize - ctx.state.bufferPos) - { - return true; - } - - // Data fits in remaining source data. - // Note that this will never be true when reading from a stream as the total length is unknown. - if (size < ctx.state.segmentedBufferHelper.TotalLength - ctx.state.totalBytesRetired - ctx.state.bufferPos) - { - return true; - } - - return false; - } - /// /// Calculates the size of this collection based on the given codec. /// diff --git a/csharp/src/Google.Protobuf/ParserInternalState.cs b/csharp/src/Google.Protobuf/ParserInternalState.cs index 50d489dfc4..cb4f47143c 100644 --- a/csharp/src/Google.Protobuf/ParserInternalState.cs +++ b/csharp/src/Google.Protobuf/ParserInternalState.cs @@ -43,7 +43,7 @@ using Google.Protobuf.Collections; namespace Google.Protobuf { - + // warning: this is a mutable struct, so it needs to be only passed as a ref! internal struct ParserInternalState { @@ -54,12 +54,12 @@ namespace Google.Protobuf /// The position within the current buffer (i.e. the next byte to read) /// internal int bufferPos; - + /// /// Size of the current buffer /// internal int bufferSize; - + /// /// If we are currently inside a length-delimited block, this is the number of /// bytes in the buffer that are still available once we leave the delimited block. @@ -79,9 +79,9 @@ namespace Google.Protobuf internal int totalBytesRetired; internal int recursionDepth; // current recursion depth - + internal SegmentedBufferHelper segmentedBufferHelper; - + /// /// The last tag we read. 0 indicates we've read to the end of the stream /// (or haven't read anything yet). @@ -101,7 +101,7 @@ namespace Google.Protobuf // If non-null, the top level parse method was started with given coded input stream as an argument // which also means we can potentially fallback to calling MergeFrom(CodedInputStream cis) if needed. internal CodedInputStream CodedInputStream => segmentedBufferHelper.CodedInputStream; - + /// /// Internal-only property; when set to true, unknown fields will be discarded while parsing. /// diff --git a/csharp/src/Google.Protobuf/ParsingPrimitives.cs b/csharp/src/Google.Protobuf/ParsingPrimitives.cs index ebb40840aa..d52d7d6a77 100644 --- a/csharp/src/Google.Protobuf/ParsingPrimitives.cs +++ b/csharp/src/Google.Protobuf/ParsingPrimitives.cs @@ -34,6 +34,7 @@ using System; using System.Buffers; using System.Buffers.Binary; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -49,6 +50,7 @@ namespace Google.Protobuf [SecuritySafeCritical] internal static class ParsingPrimitives { + private const int StackallocThreshold = 256; /// /// Reads a length for length-delimited data. @@ -58,7 +60,6 @@ namespace Google.Protobuf /// to make the calling code clearer. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static int ParseLength(ref ReadOnlySpan buffer, ref ParserInternalState state) { return (int)ParseRawVarint32(ref buffer, ref state); @@ -437,13 +438,7 @@ namespace Google.Protobuf throw InvalidProtocolBufferException.NegativeSize(); } - if (state.totalBytesRetired + state.bufferPos + size > state.currentLimit) - { - // Read to the end of the stream (up to the current limit) anyway. - SkipRawBytes(ref buffer, ref state, state.currentLimit - state.totalBytesRetired - state.bufferPos); - // Then fail. - throw InvalidProtocolBufferException.TruncatedMessage(); - } + ValidateCurrentLimit(ref buffer, ref state, size); if (size <= state.bufferSize - state.bufferPos) { @@ -453,36 +448,13 @@ namespace Google.Protobuf state.bufferPos += size; return bytes; } - else if (size < buffer.Length || size < state.segmentedBufferHelper.TotalLength) + else if (IsDataAvailableInSource(ref state, size)) { // Reading more bytes than are in the buffer, but not an excessive number // of bytes. We can safely allocate the resulting array ahead of time. - // First copy what we have. byte[] bytes = new byte[size]; - var bytesSpan = new Span(bytes); - int pos = state.bufferSize - state.bufferPos; - buffer.Slice(state.bufferPos, pos).CopyTo(bytesSpan.Slice(0, pos)); - state.bufferPos = state.bufferSize; - - // We want to use RefillBuffer() and then copy from the buffer into our - // byte array rather than reading directly into our byte array because - // the input may be unbuffered. - state.segmentedBufferHelper.RefillBuffer(ref buffer, ref state, true); - - while (size - pos > state.bufferSize) - { - buffer.Slice(0, state.bufferSize) - .CopyTo(bytesSpan.Slice(pos, state.bufferSize)); - pos += state.bufferSize; - state.bufferPos = state.bufferSize; - state.segmentedBufferHelper.RefillBuffer(ref buffer, ref state, true); - } - - buffer.Slice(0, size - pos) - .CopyTo(bytesSpan.Slice(pos, size - pos)); - state.bufferPos = size - pos; - + ReadRawBytesIntoSpan(ref buffer, ref state, size, bytes); return bytes; } else @@ -543,13 +515,7 @@ namespace Google.Protobuf throw InvalidProtocolBufferException.NegativeSize(); } - if (state.totalBytesRetired + state.bufferPos + size > state.currentLimit) - { - // Read to the end of the stream anyway. - SkipRawBytes(ref buffer, ref state, state.currentLimit - state.totalBytesRetired - state.bufferPos); - // Then fail. - throw InvalidProtocolBufferException.TruncatedMessage(); - } + ValidateCurrentLimit(ref buffer, ref state, size); if (size <= state.bufferSize - state.bufferPos) { @@ -619,7 +585,7 @@ namespace Google.Protobuf } #if GOOGLE_PROTOBUF_SUPPORT_FAST_STRING - if (length <= state.bufferSize - state.bufferPos && length > 0) + if (length <= state.bufferSize - state.bufferPos) { // Fast path: all bytes to decode appear in the same span. ReadOnlySpan data = buffer.Slice(state.bufferPos, length); @@ -638,20 +604,76 @@ namespace Google.Protobuf } #endif - var decoder = WritingPrimitives.Utf8Encoding.GetDecoder(); + return ReadStringSlow(ref buffer, ref state, length); + } + + /// + /// Reads a string assuming that it is spread across multiple spans in a . + /// + private static string ReadStringSlow(ref ReadOnlySpan buffer, ref ParserInternalState state, int length) + { + ValidateCurrentLimit(ref buffer, ref state, length); - // TODO: even if GOOGLE_PROTOBUF_SUPPORT_FAST_STRING is not supported, - // we could still create a string efficiently by using Utf8Encoding.GetString(byte[] bytes, int index, int count) - // whenever the buffer is backed by a byte array (and avoid creating a new byte array), but the problem is - // there is no way to get the underlying byte array from a span. +#if GOOGLE_PROTOBUF_SUPPORT_FAST_STRING + if (IsDataAvailable(ref state, length)) + { + // Read string data into a temporary buffer, either stackalloc'ed or from ArrayPool + // Once all data is read then call Encoding.GetString on buffer and return to pool if needed. - // TODO: in case the string spans multiple buffer segments, creating a char[] and decoding into it and then - // creating a string from that array might be more efficient than creating a string from the copied bytes. + byte[] byteArray = null; + Span byteSpan = length <= StackallocThreshold ? + stackalloc byte[length] : + (byteArray = ArrayPool.Shared.Rent(length)); + + try + { + unsafe + { + fixed (byte* pByteSpan = &MemoryMarshal.GetReference(byteSpan)) + { + // Compiler doesn't like that a potentially stackalloc'd Span is being used + // in a method with a "ref Span buffer" argument. If the stackalloc'd span was assigned + // to the ref argument then bad things would happen. We'll never do that so it is ok. + // Make compiler happy by passing a new span created from pointer. + var tempSpan = new Span(pByteSpan, byteSpan.Length); + ReadRawBytesIntoSpan(ref buffer, ref state, length, tempSpan); + + return WritingPrimitives.Utf8Encoding.GetString(pByteSpan, length); + } + } + } + finally + { + if (byteArray != null) + { + ArrayPool.Shared.Return(byteArray); + } + } + } +#endif // Slow path: Build a byte array first then copy it. + // This will be called when reading from a Stream because we don't know the length of the stream, + // or there is not enough data in the sequence. If there is not enough data then ReadRawBytes will + // throw an exception. return WritingPrimitives.Utf8Encoding.GetString(ReadRawBytes(ref buffer, ref state, length), 0, length); } + /// + /// Validates that the specified size doesn't exceed the current limit. If it does then remaining bytes + /// are skipped and an error is thrown. + /// + private static void ValidateCurrentLimit(ref ReadOnlySpan buffer, ref ParserInternalState state, int size) + { + if (state.totalBytesRetired + state.bufferPos + size > state.currentLimit) + { + // Read to the end of the stream (up to the current limit) anyway. + SkipRawBytes(ref buffer, ref state, state.currentLimit - state.totalBytesRetired - state.bufferPos); + // Then fail. + throw InvalidProtocolBufferException.TruncatedMessage(); + } + } + [SecuritySafeCritical] private static byte ReadRawByte(ref ReadOnlySpan buffer, ref ParserInternalState state) { @@ -731,5 +753,56 @@ namespace Google.Protobuf { return (long)(n >> 1) ^ -(long)(n & 1); } + + /// + /// Checks whether there is known data available of the specified size remaining to parse. + /// When parsing from a Stream this can return false because we have no knowledge of the amount + /// of data remaining in the stream until it is read. + /// + public static bool IsDataAvailable(ref ParserInternalState state, int size) + { + // Data fits in remaining buffer + if (size <= state.bufferSize - state.bufferPos) + { + return true; + } + + return IsDataAvailableInSource(ref state, size); + } + + /// + /// Checks whether there is known data available of the specified size remaining to parse + /// in the underlying data source. + /// When parsing from a Stream this will return false because we have no knowledge of the amount + /// of data remaining in the stream until it is read. + /// + private static bool IsDataAvailableInSource(ref ParserInternalState state, int size) + { + // Data fits in remaining source data. + // Note that this will never be true when reading from a stream as the total length is unknown. + return size <= state.segmentedBufferHelper.TotalLength - state.totalBytesRetired - state.bufferPos; + } + + /// + /// Read raw bytes of the specified length into a span. The amount of data available and the current limit should + /// be checked before calling this method. + /// + private static void ReadRawBytesIntoSpan(ref ReadOnlySpan buffer, ref ParserInternalState state, int length, Span byteSpan) + { + int remainingByteLength = length; + while (remainingByteLength > 0) + { + if (state.bufferSize - state.bufferPos == 0) + { + state.segmentedBufferHelper.RefillBuffer(ref buffer, ref state, true); + } + + ReadOnlySpan unreadSpan = buffer.Slice(state.bufferPos, Math.Min(remainingByteLength, state.bufferSize - state.bufferPos)); + unreadSpan.CopyTo(byteSpan.Slice(length - remainingByteLength)); + + remainingByteLength -= unreadSpan.Length; + state.bufferPos += unreadSpan.Length; + } + } } -} \ No newline at end of file +} From e36163deed57683ac8bd507f76a0f5aaf2b7813b Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 1 Jul 2020 20:31:33 +1200 Subject: [PATCH 2/2] PR feedback --- csharp/src/Google.Protobuf/ParsingPrimitives.cs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/csharp/src/Google.Protobuf/ParsingPrimitives.cs b/csharp/src/Google.Protobuf/ParsingPrimitives.cs index d52d7d6a77..e270ed8aa1 100644 --- a/csharp/src/Google.Protobuf/ParsingPrimitives.cs +++ b/csharp/src/Google.Protobuf/ParsingPrimitives.cs @@ -438,8 +438,6 @@ namespace Google.Protobuf throw InvalidProtocolBufferException.NegativeSize(); } - ValidateCurrentLimit(ref buffer, ref state, size); - if (size <= state.bufferSize - state.bufferPos) { // We have all the bytes we need already. @@ -448,7 +446,16 @@ namespace Google.Protobuf state.bufferPos += size; return bytes; } - else if (IsDataAvailableInSource(ref state, size)) + + return ReadRawBytesSlow(ref buffer, ref state, size); + } + + private static byte[] ReadRawBytesSlow(ref ReadOnlySpan buffer, ref ParserInternalState state, int size) + { + ValidateCurrentLimit(ref buffer, ref state, size); + + if ((!state.segmentedBufferHelper.TotalLength.HasValue && size < buffer.Length) || + IsDataAvailableInSource(ref state, size)) { // Reading more bytes than are in the buffer, but not an excessive number // of bytes. We can safely allocate the resulting array ahead of time. @@ -490,7 +497,7 @@ namespace Google.Protobuf } // OK, got everything. Now concatenate it all into one buffer. - byte[] bytes = new byte[size]; + byte[] bytes = new byte[size]; int newPos = 0; foreach (byte[] chunk in chunks) {