From 9580c45c65e0fb2032a5ac1375a210fd05b01639 Mon Sep 17 00:00:00 2001 From: John Luo Date: Fri, 5 Apr 2019 17:31:27 -0700 Subject: [PATCH 01/36] Add VS integration for design time build --- .../build/_protobuf/Google.Protobuf.Tools.targets | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets index 1a862337c58..de2f66e5be1 100644 --- a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets +++ b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets @@ -38,6 +38,21 @@ + + + + + + + + + + + + MSBuild:Compile + + + - - - - MSBuild:Compile - From d116f58e8e0aa376ce1e8fc1147aa1a1780c09d0 Mon Sep 17 00:00:00 2001 From: John Luo Date: Mon, 8 Apr 2019 11:19:57 -0700 Subject: [PATCH 03/36] Remove fix for bug in project system --- .../build/_protobuf/Google.Protobuf.Tools.targets | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets index d1597872dab..80c7109e016 100644 --- a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets +++ b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets @@ -39,16 +39,6 @@ - - - - - - - - - - + + + + + + + + - From 76e3489216204951f0115f4f007a782272876286 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 12 Jul 2018 16:36:35 +0200 Subject: [PATCH 08/36] csharp: support slice-by-slice deserialization --- .../Grpc.Core.Api/DeserializationContext.cs | 19 +- src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj | 8 + .../Grpc.Core.Tests/Grpc.Core.Tests.csproj | 4 + .../Internal/AsyncCallServerTest.cs | 12 +- .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 49 ++-- .../DefaultDeserializationContextTest.cs | 240 ++++++++++++++++++ .../Internal/FakeBufferReaderManager.cs | 118 +++++++++ .../Internal/FakeBufferReaderManagerTest.cs | 121 +++++++++ .../Internal/ReusableSliceBufferTest.cs | 151 +++++++++++ .../Grpc.Core.Tests/Internal/SliceTest.cs | 83 ++++++ src/csharp/Grpc.Core/Grpc.Core.csproj | 9 + src/csharp/Grpc.Core/Internal/AsyncCall.cs | 10 +- .../Grpc.Core/Internal/AsyncCallBase.cs | 15 +- .../Internal/BatchContextSafeHandle.cs | 42 ++- .../Grpc.Core/Internal/CallSafeHandle.cs | 4 +- .../Internal/DefaultDeserializationContext.cs | 64 ++++- src/csharp/Grpc.Core/Internal/INativeCall.cs | 4 +- .../Internal/NativeMethods.Generated.cs | 11 + .../Grpc.Core/Internal/ReusableSliceBuffer.cs | 148 +++++++++++ src/csharp/Grpc.Core/Internal/Slice.cs | 68 +++++ src/csharp/ext/grpc_csharp_ext.c | 46 ++++ src/csharp/tests.json | 4 + .../runtimes/grpc_csharp_ext_dummy_stubs.c | 4 + .../Grpc.Core/Internal/native_methods.include | 1 + 24 files changed, 1181 insertions(+), 54 deletions(-) create mode 100644 src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs create mode 100644 src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs create mode 100644 src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs create mode 100644 src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs create mode 100644 src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs create mode 100644 src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs create mode 100644 src/csharp/Grpc.Core/Internal/Slice.cs diff --git a/src/csharp/Grpc.Core.Api/DeserializationContext.cs b/src/csharp/Grpc.Core.Api/DeserializationContext.cs index d69e0db5bdf..966bcfa8c8e 100644 --- a/src/csharp/Grpc.Core.Api/DeserializationContext.cs +++ b/src/csharp/Grpc.Core.Api/DeserializationContext.cs @@ -39,7 +39,7 @@ namespace Grpc.Core /// Also, allocating a new buffer each time can put excessive pressure on GC, especially if /// the payload is more than 86700 bytes large (which means the newly allocated buffer will be placed in LOH, /// and LOH object can only be garbage collected via a full ("stop the world") GC run). - /// NOTE: Deserializers are expected not to call this method more than once per received message + /// NOTE: Deserializers are expected not to call this method (or other payload accessor methods) more than once per received message /// (as there is no practical reason for doing so) and DeserializationContext implementations are free to assume so. /// /// byte array containing the entire payload. @@ -47,5 +47,22 @@ namespace Grpc.Core { throw new NotImplementedException(); } + +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + /// + /// Gets the entire payload as a ReadOnlySequence. + /// The ReadOnlySequence is only valid for the duration of the deserializer routine and the caller must not access it after the deserializer returns. + /// Using the read only sequence is the most efficient way to access the message payload. Where possible it allows directly + /// accessing the received payload without needing to perform any buffer copying or buffer allocations. + /// NOTE: This method is only available in the netstandard2.0 build of the library. + /// NOTE: Deserializers are expected not to call this method (or other payload accessor methods) more than once per received message + /// (as there is no practical reason for doing so) and DeserializationContext implementations are free to assume so. + /// + /// read only sequence containing the entire payload. + public virtual System.Buffers.ReadOnlySequence PayloadAsReadOnlySequence() + { + throw new NotImplementedException(); + } +#endif } } diff --git a/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj b/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj index 6c29530402c..8a32bc757df 100755 --- a/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj +++ b/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj @@ -19,12 +19,20 @@ true + + $(DefineConstants);GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + + + + + + diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 23e5d7f65ef..7fef2c77091 100755 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -8,6 +8,10 @@ true + + $(DefineConstants);GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + + diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 5c7d48f786c..fd221613c04 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -35,6 +35,7 @@ namespace Grpc.Core.Internal.Tests Server server; FakeNativeCall fakeCall; AsyncCallServer asyncCallServer; + FakeBufferReaderManager fakeBufferReaderManager; [SetUp] public void Init() @@ -52,11 +53,13 @@ namespace Grpc.Core.Internal.Tests Marshallers.StringMarshaller.ContextualSerializer, Marshallers.StringMarshaller.ContextualDeserializer, server); asyncCallServer.InitializeForTesting(fakeCall); + fakeBufferReaderManager = new FakeBufferReaderManager(); } [TearDown] public void Cleanup() { + fakeBufferReaderManager.Dispose(); server.ShutdownAsync().Wait(); } @@ -77,7 +80,7 @@ namespace Grpc.Core.Internal.Tests var moveNextTask = requestStream.MoveNext(); fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); Assert.IsFalse(moveNextTask.Result); AssertFinished(asyncCallServer, fakeCall, finishedTask); @@ -107,7 +110,7 @@ namespace Grpc.Core.Internal.Tests // if a read completion's success==false, the request stream will silently finish // and we rely on C core cancelling the call. var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, CreateNullResponse()); Assert.IsFalse(moveNextTask.Result); fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); @@ -182,5 +185,10 @@ namespace Grpc.Core.Internal.Tests Assert.IsTrue(finishedTask.IsCompleted); Assert.DoesNotThrow(() => finishedTask.Wait()); } + + IBufferReader CreateNullResponse() + { + return fakeBufferReaderManager.CreateNullPayloadBufferReader(); + } } } diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 775849d89b6..78c7f3ad5bb 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -21,6 +21,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using Grpc.Core.Internal; +using Grpc.Core.Utils; using NUnit.Framework; namespace Grpc.Core.Internal.Tests @@ -33,6 +34,7 @@ namespace Grpc.Core.Internal.Tests Channel channel; FakeNativeCall fakeCall; AsyncCall asyncCall; + FakeBufferReaderManager fakeBufferReaderManager; [SetUp] public void Init() @@ -43,12 +45,14 @@ namespace Grpc.Core.Internal.Tests var callDetails = new CallInvocationDetails(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions()); asyncCall = new AsyncCall(callDetails, fakeCall); + fakeBufferReaderManager = new FakeBufferReaderManager(); } [TearDown] public void Cleanup() { channel.ShutdownAsync().Wait(); + fakeBufferReaderManager.Dispose(); } [Test] @@ -87,7 +91,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.UnaryCallAsync("request1"); fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.InvalidArgument), - null, + CreateNullResponse(), new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); @@ -168,7 +172,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.InvalidArgument), - null, + CreateNullResponse(), new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); @@ -214,7 +218,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Internal), - null, + CreateNullResponse(), new Metadata()); var ex = Assert.ThrowsAsync(async () => await writeTask); @@ -233,7 +237,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Internal), - null, + CreateNullResponse(), new Metadata()); fakeCall.SendCompletionCallback.OnSendCompletion(false); @@ -259,7 +263,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Internal), - null, + CreateNullResponse(), new Metadata()); var ex = Assert.ThrowsAsync(async () => await writeTask); @@ -357,7 +361,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Cancelled), - null, + CreateNullResponse(), new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled); @@ -390,7 +394,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata()); Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); @@ -405,7 +409,7 @@ namespace Grpc.Core.Internal.Tests // try alternative order of completions fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); } @@ -417,7 +421,7 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); // after a failed read, we rely on C core to deliver appropriate status code. + fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, CreateNullResponse()); // after a failed read, we rely on C core to deliver appropriate status code. fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal)); AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal); @@ -441,7 +445,7 @@ namespace Grpc.Core.Internal.Tests var readTask3 = responseStream.MoveNext(); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3); } @@ -479,7 +483,7 @@ namespace Grpc.Core.Internal.Tests Assert.DoesNotThrowAsync(async () => await writeTask1); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); @@ -493,7 +497,7 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); @@ -511,7 +515,7 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); @@ -533,7 +537,7 @@ namespace Grpc.Core.Internal.Tests Assert.IsFalse(writeTask.IsCompleted); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); var ex = Assert.ThrowsAsync(async () => await writeTask); @@ -552,7 +556,7 @@ namespace Grpc.Core.Internal.Tests var writeTask = requestStream.WriteAsync("request1"); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); fakeCall.SendCompletionCallback.OnSendCompletion(false); @@ -576,7 +580,7 @@ namespace Grpc.Core.Internal.Tests Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled); @@ -597,7 +601,7 @@ namespace Grpc.Core.Internal.Tests Assert.AreEqual("response1", responseStream.Current); var readTask2 = responseStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); @@ -618,7 +622,7 @@ namespace Grpc.Core.Internal.Tests Assert.AreEqual("response1", responseStream.Current); var readTask2 = responseStream.MoveNext(); - fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); @@ -638,9 +642,14 @@ namespace Grpc.Core.Internal.Tests return new ClientSideStatus(new Status(statusCode, ""), new Metadata()); } - byte[] CreateResponsePayload() + IBufferReader CreateResponsePayload() + { + return fakeBufferReaderManager.CreateSingleSegmentBufferReader(Marshallers.StringMarshaller.Serializer("response1")); + } + + IBufferReader CreateNullResponse() { - return Marshallers.StringMarshaller.Serializer("response1"); + return fakeBufferReaderManager.CreateNullPayloadBufferReader(); } static void AssertUnaryResponseSuccess(AsyncCall asyncCall, FakeNativeCall fakeCall, Task resultTask) diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs new file mode 100644 index 00000000000..63baab31624 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs @@ -0,0 +1,240 @@ +#region Copyright notice and license + +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +using System.Runtime.InteropServices; + +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY +using System.Buffers; +#endif + +namespace Grpc.Core.Internal.Tests +{ + public class DefaultDeserializationContextTest + { + FakeBufferReaderManager fakeBufferReaderManager; + + [SetUp] + public void Init() + { + fakeBufferReaderManager = new FakeBufferReaderManager(); + } + + [TearDown] + public void Cleanup() + { + fakeBufferReaderManager.Dispose(); + } + +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + [TestCase] + public void PayloadAsReadOnlySequence_ZeroSegmentPayload() + { + var context = new DefaultDeserializationContext(); + context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List {})); + + Assert.AreEqual(0, context.PayloadLength); + + var sequence = context.PayloadAsReadOnlySequence(); + + Assert.AreEqual(ReadOnlySequence.Empty, sequence); + Assert.IsTrue(sequence.IsEmpty); + Assert.IsTrue(sequence.IsSingleSegment); + } + + [TestCase(0)] + [TestCase(1)] + [TestCase(10)] + [TestCase(100)] + [TestCase(1000)] + public void PayloadAsReadOnlySequence_SingleSegmentPayload(int segmentLength) + { + var origBuffer = GetTestBuffer(segmentLength); + var context = new DefaultDeserializationContext(); + context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer)); + + Assert.AreEqual(origBuffer.Length, context.PayloadLength); + + var sequence = context.PayloadAsReadOnlySequence(); + + Assert.AreEqual(origBuffer.Length, sequence.Length); + Assert.AreEqual(origBuffer.Length, sequence.First.Length); + Assert.IsTrue(sequence.IsSingleSegment); + CollectionAssert.AreEqual(origBuffer, sequence.First.ToArray()); + } + + [TestCase(0, 5, 10)] + [TestCase(1, 1, 1)] + [TestCase(10, 100, 1000)] + [TestCase(100, 100, 10)] + [TestCase(1000, 1000, 1000)] + public void PayloadAsReadOnlySequence_MultiSegmentPayload(int segmentLen1, int segmentLen2, int segmentLen3) + { + var origBuffer1 = GetTestBuffer(segmentLen1); + var origBuffer2 = GetTestBuffer(segmentLen2); + var origBuffer3 = GetTestBuffer(segmentLen3); + int totalLen = origBuffer1.Length + origBuffer2.Length + origBuffer3.Length; + + var context = new DefaultDeserializationContext(); + context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List { origBuffer1, origBuffer2, origBuffer3 })); + + Assert.AreEqual(totalLen, context.PayloadLength); + + var sequence = context.PayloadAsReadOnlySequence(); + + Assert.AreEqual(totalLen, sequence.Length); + + var segmentEnumerator = sequence.GetEnumerator(); + + Assert.IsTrue(segmentEnumerator.MoveNext()); + CollectionAssert.AreEqual(origBuffer1, segmentEnumerator.Current.ToArray()); + + Assert.IsTrue(segmentEnumerator.MoveNext()); + CollectionAssert.AreEqual(origBuffer2, segmentEnumerator.Current.ToArray()); + + Assert.IsTrue(segmentEnumerator.MoveNext()); + CollectionAssert.AreEqual(origBuffer3, segmentEnumerator.Current.ToArray()); + + Assert.IsFalse(segmentEnumerator.MoveNext()); + } +#endif + + [TestCase] + public void NullPayloadNotAllowed() + { + var context = new DefaultDeserializationContext(); + Assert.Throws(typeof(InvalidOperationException), () => context.Initialize(fakeBufferReaderManager.CreateNullPayloadBufferReader())); + } + + [TestCase] + public void PayloadAsNewByteBuffer_ZeroSegmentPayload() + { + var context = new DefaultDeserializationContext(); + context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List {})); + + Assert.AreEqual(0, context.PayloadLength); + + var payload = context.PayloadAsNewBuffer(); + Assert.AreEqual(0, payload.Length); + } + + [TestCase(0)] + [TestCase(1)] + [TestCase(10)] + [TestCase(100)] + [TestCase(1000)] + public void PayloadAsNewByteBuffer_SingleSegmentPayload(int segmentLength) + { + var origBuffer = GetTestBuffer(segmentLength); + var context = new DefaultDeserializationContext(); + context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer)); + + Assert.AreEqual(origBuffer.Length, context.PayloadLength); + + var payload = context.PayloadAsNewBuffer(); + CollectionAssert.AreEqual(origBuffer, payload); + } + + [TestCase(0, 5, 10)] + [TestCase(1, 1, 1)] + [TestCase(10, 100, 1000)] + [TestCase(100, 100, 10)] + [TestCase(1000, 1000, 1000)] + public void PayloadAsNewByteBuffer_MultiSegmentPayload(int segmentLen1, int segmentLen2, int segmentLen3) + { + var origBuffer1 = GetTestBuffer(segmentLen1); + var origBuffer2 = GetTestBuffer(segmentLen2); + var origBuffer3 = GetTestBuffer(segmentLen3); + + var context = new DefaultDeserializationContext(); + context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List { origBuffer1, origBuffer2, origBuffer3 })); + + var payload = context.PayloadAsNewBuffer(); + + var concatenatedOrigBuffers = new List(); + concatenatedOrigBuffers.AddRange(origBuffer1); + concatenatedOrigBuffers.AddRange(origBuffer2); + concatenatedOrigBuffers.AddRange(origBuffer3); + + Assert.AreEqual(concatenatedOrigBuffers.Count, context.PayloadLength); + Assert.AreEqual(concatenatedOrigBuffers.Count, payload.Length); + CollectionAssert.AreEqual(concatenatedOrigBuffers, payload); + } + + [TestCase] + public void GetPayloadMultipleTimesIsIllegal() + { + var origBuffer = GetTestBuffer(100); + var context = new DefaultDeserializationContext(); + context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer)); + + Assert.AreEqual(origBuffer.Length, context.PayloadLength); + + var payload = context.PayloadAsNewBuffer(); + CollectionAssert.AreEqual(origBuffer, payload); + + // Getting payload multiple times is illegal + Assert.Throws(typeof(InvalidOperationException), () => context.PayloadAsNewBuffer()); +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + Assert.Throws(typeof(InvalidOperationException), () => context.PayloadAsReadOnlySequence()); +#endif + } + + [TestCase] + public void ResetContextAndReinitialize() + { + var origBuffer = GetTestBuffer(100); + var context = new DefaultDeserializationContext(); + context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer)); + + Assert.AreEqual(origBuffer.Length, context.PayloadLength); + + // Reset invalidates context + context.Reset(); + + Assert.AreEqual(0, context.PayloadLength); + Assert.Throws(typeof(NullReferenceException), () => context.PayloadAsNewBuffer()); +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + Assert.Throws(typeof(NullReferenceException), () => context.PayloadAsReadOnlySequence()); +#endif + + // Previously reset context can be initialized again + var origBuffer2 = GetTestBuffer(50); + context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer2)); + + Assert.AreEqual(origBuffer2.Length, context.PayloadLength); + CollectionAssert.AreEqual(origBuffer2, context.PayloadAsNewBuffer()); + } + + private byte[] GetTestBuffer(int length) + { + var testBuffer = new byte[length]; + for (int i = 0; i < testBuffer.Length; i++) + { + testBuffer[i] = (byte) i; + } + return testBuffer; + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs new file mode 100644 index 00000000000..d8d0c0a6354 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs @@ -0,0 +1,118 @@ +#region Copyright notice and license + +// Copyright 2018 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal.Tests +{ + // Creates instances of fake IBufferReader. All created instances will become invalid once Dispose is called. + internal class FakeBufferReaderManager : IDisposable + { + List pinnedHandles = new List(); + bool disposed = false; + public IBufferReader CreateSingleSegmentBufferReader(byte[] data) + { + return CreateMultiSegmentBufferReader(new List { data }); + } + + public IBufferReader CreateMultiSegmentBufferReader(IEnumerable dataSegments) + { + GrpcPreconditions.CheckState(!disposed); + GrpcPreconditions.CheckNotNull(dataSegments); + var segments = new List(); + foreach (var data in dataSegments) + { + GrpcPreconditions.CheckNotNull(data); + segments.Add(GCHandle.Alloc(data, GCHandleType.Pinned)); + } + pinnedHandles.AddRange(segments); // all the allocated GCHandles will be freed on Dispose() + return new FakeBufferReader(segments); + } + + public IBufferReader CreateNullPayloadBufferReader() + { + GrpcPreconditions.CheckState(!disposed); + return new FakeBufferReader(null); + } + + public void Dispose() + { + if (!disposed) + { + disposed = true; + for (int i = 0; i < pinnedHandles.Count; i++) + { + pinnedHandles[i].Free(); + } + } + } + + private class FakeBufferReader : IBufferReader + { + readonly List bufferSegments; + readonly int? totalLength; + readonly IEnumerator segmentEnumerator; + + public FakeBufferReader(List bufferSegments) + { + this.bufferSegments = bufferSegments; + this.totalLength = ComputeTotalLength(bufferSegments); + this.segmentEnumerator = bufferSegments?.GetEnumerator(); + } + + public int? TotalLength => totalLength; + + public bool TryGetNextSlice(out Slice slice) + { + GrpcPreconditions.CheckNotNull(bufferSegments); + if (!segmentEnumerator.MoveNext()) + { + slice = default(Slice); + return false; + } + + var segment = segmentEnumerator.Current; + int sliceLen = ((byte[]) segment.Target).Length; + slice = new Slice(segment.AddrOfPinnedObject(), sliceLen); + return true; + } + + static int? ComputeTotalLength(List bufferSegments) + { + if (bufferSegments == null) + { + return null; + } + + int sum = 0; + foreach (var segment in bufferSegments) + { + var data = (byte[]) segment.Target; + sum += data.Length; + } + return sum; + } + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs new file mode 100644 index 00000000000..7c4ff652bd3 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs @@ -0,0 +1,121 @@ +#region Copyright notice and license + +// Copyright 2018 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + public class FakeBufferReaderManagerTest + { + FakeBufferReaderManager fakeBufferReaderManager; + + [SetUp] + public void Init() + { + fakeBufferReaderManager = new FakeBufferReaderManager(); + } + + [TearDown] + public void Cleanup() + { + fakeBufferReaderManager.Dispose(); + } + + [TestCase] + public void NullPayload() + { + var fakeBufferReader = fakeBufferReaderManager.CreateNullPayloadBufferReader(); + Assert.IsFalse(fakeBufferReader.TotalLength.HasValue); + Assert.Throws(typeof(ArgumentNullException), () => fakeBufferReader.TryGetNextSlice(out Slice slice)); + } + [TestCase] + public void ZeroSegmentPayload() + { + var fakeBufferReader = fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List {}); + Assert.AreEqual(0, fakeBufferReader.TotalLength.Value); + Assert.IsFalse(fakeBufferReader.TryGetNextSlice(out Slice slice)); + } + + [TestCase(0)] + [TestCase(1)] + [TestCase(10)] + [TestCase(30)] + [TestCase(100)] + [TestCase(1000)] + public void SingleSegmentPayload(int bufferLen) + { + var origBuffer = GetTestBuffer(bufferLen); + var fakeBufferReader = fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer); + Assert.AreEqual(origBuffer.Length, fakeBufferReader.TotalLength.Value); + + Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice)); + AssertSliceDataEqual(origBuffer, slice); + + Assert.IsFalse(fakeBufferReader.TryGetNextSlice(out Slice slice2)); + } + + [TestCase(0, 5, 10)] + [TestCase(1, 1, 1)] + [TestCase(10, 100, 1000)] + [TestCase(100, 100, 10)] + [TestCase(1000, 1000, 1000)] + public void MultiSegmentPayload(int segmentLen1, int segmentLen2, int segmentLen3) + { + var origBuffer1 = GetTestBuffer(segmentLen1); + var origBuffer2 = GetTestBuffer(segmentLen2); + var origBuffer3 = GetTestBuffer(segmentLen3); + var fakeBufferReader = fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List { origBuffer1, origBuffer2, origBuffer3 }); + + Assert.AreEqual(origBuffer1.Length + origBuffer2.Length + origBuffer3.Length, fakeBufferReader.TotalLength.Value); + + Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice1)); + AssertSliceDataEqual(origBuffer1, slice1); + + Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice2)); + AssertSliceDataEqual(origBuffer2, slice2); + + Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice3)); + AssertSliceDataEqual(origBuffer3, slice3); + + Assert.IsFalse(fakeBufferReader.TryGetNextSlice(out Slice slice4)); + } + + private void AssertSliceDataEqual(byte[] expected, Slice actual) + { + var actualSliceData = new byte[actual.Length]; + actual.CopyTo(new ArraySegment(actualSliceData)); + CollectionAssert.AreEqual(expected, actualSliceData); + } + + // create a buffer of given size and fill it with some data + private byte[] GetTestBuffer(int length) + { + var testBuffer = new byte[length]; + for (int i = 0; i < testBuffer.Length; i++) + { + testBuffer[i] = (byte) i; + } + return testBuffer; + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs b/src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs new file mode 100644 index 00000000000..7630785aef4 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs @@ -0,0 +1,151 @@ +#region Copyright notice and license + +// Copyright 2018 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +using System.Runtime.InteropServices; + +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY +using System.Buffers; +#endif + +namespace Grpc.Core.Internal.Tests +{ + // Converts IBufferReader into instances of ReadOnlySequence + // Objects representing the sequence segments are cached to decrease GC load. + public class ReusableSliceBufferTest + { + FakeBufferReaderManager fakeBufferReaderManager; + + [SetUp] + public void Init() + { + fakeBufferReaderManager = new FakeBufferReaderManager(); + } + + [TearDown] + public void Cleanup() + { + fakeBufferReaderManager.Dispose(); + } + +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + [TestCase] + public void NullPayload() + { + var sliceBuffer = new ReusableSliceBuffer(); + Assert.Throws(typeof(ArgumentNullException), () => sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateNullPayloadBufferReader())); + } + + [TestCase] + public void ZeroSegmentPayload() + { + var sliceBuffer = new ReusableSliceBuffer(); + var sequence = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List {})); + + Assert.AreEqual(ReadOnlySequence.Empty, sequence); + Assert.IsTrue(sequence.IsEmpty); + Assert.IsTrue(sequence.IsSingleSegment); + } + + [TestCase] + public void SegmentsAreCached() + { + var bufferSegments1 = Enumerable.Range(0, 100).Select((_) => GetTestBuffer(50)).ToList(); + var bufferSegments2 = Enumerable.Range(0, 100).Select((_) => GetTestBuffer(50)).ToList(); + + var sliceBuffer = new ReusableSliceBuffer(); + + var sequence1 = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(bufferSegments1)); + var memoryManagers1 = GetMemoryManagersForSequenceSegments(sequence1); + + sliceBuffer.Invalidate(); + + var sequence2 = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(bufferSegments2)); + var memoryManagers2 = GetMemoryManagersForSequenceSegments(sequence2); + + // check memory managers are identical objects (i.e. they've been reused) + CollectionAssert.AreEquivalent(memoryManagers1, memoryManagers2); + } + + [TestCase] + public void MultiSegmentPayload_LotsOfSegments() + { + var bufferSegments = Enumerable.Range(0, ReusableSliceBuffer.MaxCachedSegments + 100).Select((_) => GetTestBuffer(10)).ToList(); + + var sliceBuffer = new ReusableSliceBuffer(); + var sequence = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(bufferSegments)); + + int index = 0; + foreach (var memory in sequence) + { + CollectionAssert.AreEqual(bufferSegments[index], memory.ToArray()); + index ++; + } + } + + [TestCase] + public void InvalidateMakesSequenceUnusable() + { + var origBuffer = GetTestBuffer(100); + + var sliceBuffer = new ReusableSliceBuffer(); + var sequence = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List { origBuffer })); + + Assert.AreEqual(origBuffer.Length, sequence.Length); + + sliceBuffer.Invalidate(); + + // Invalidate with make the returned sequence completely unusable and broken, users must not use it beyond the deserializer functions. + Assert.Throws(typeof(ArgumentOutOfRangeException), () => { var first = sequence.First; }); + } + + private List> GetMemoryManagersForSequenceSegments(ReadOnlySequence sequence) + { + var result = new List>(); + foreach (var memory in sequence) + { + Assert.IsTrue(MemoryMarshal.TryGetMemoryManager(memory, out MemoryManager memoryManager)); + result.Add(memoryManager); + } + return result; + } +#else + [TestCase] + public void OnlySupportedOnNetCore() + { + // Test case needs to exist to make C# sanity test happy. + } +#endif + private byte[] GetTestBuffer(int length) + { + var testBuffer = new byte[length]; + for (int i = 0; i < testBuffer.Length; i++) + { + testBuffer[i] = (byte) i; + } + return testBuffer; + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs b/src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs new file mode 100644 index 00000000000..eb090bbfa50 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs @@ -0,0 +1,83 @@ +#region Copyright notice and license + +// Copyright 2018 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +using System.Runtime.InteropServices; + +namespace Grpc.Core.Internal.Tests +{ + public class SliceTest + { + [TestCase(0)] + [TestCase(1)] + [TestCase(10)] + [TestCase(100)] + [TestCase(1000)] + public void SliceFromNativePtr_CopyToArraySegment(int bufferLength) + { + var origBuffer = GetTestBuffer(bufferLength); + var gcHandle = GCHandle.Alloc(origBuffer, GCHandleType.Pinned); + try + { + var slice = new Slice(gcHandle.AddrOfPinnedObject(), origBuffer.Length); + Assert.AreEqual(bufferLength, slice.Length); + + var newBuffer = new byte[bufferLength]; + slice.CopyTo(new ArraySegment(newBuffer)); + CollectionAssert.AreEqual(origBuffer, newBuffer); + } + finally + { + gcHandle.Free(); + } + } + + [TestCase] + public void SliceFromNativePtr_CopyToArraySegmentTooSmall() + { + var origBuffer = GetTestBuffer(100); + var gcHandle = GCHandle.Alloc(origBuffer, GCHandleType.Pinned); + try + { + var slice = new Slice(gcHandle.AddrOfPinnedObject(), origBuffer.Length); + var tooSmall = new byte[origBuffer.Length - 1]; + Assert.Catch(typeof(ArgumentException), () => slice.CopyTo(new ArraySegment(tooSmall))); + } + finally + { + gcHandle.Free(); + } + } + + // create a buffer of given size and fill it with some data + private byte[] GetTestBuffer(int length) + { + var testBuffer = new byte[length]; + for (int i = 0; i < testBuffer.Length; i++) + { + testBuffer[i] = (byte) i; + } + return testBuffer; + } + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index b7c191ea6a9..afd60e73a21 100755 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -19,6 +19,15 @@ true + + true + + + + 7.2 + $(DefineConstants);GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + + diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 785081c341a..a1c688140d1 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -111,7 +111,7 @@ namespace Grpc.Core.Internal { using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessageReader(), ctx.GetReceivedInitialMetadata()); } } catch (Exception e) @@ -537,14 +537,14 @@ namespace Grpc.Core.Internal /// /// Handler for unary response completion. /// - private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) + private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders) { // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, // success will be always set to true. TaskCompletionSource delayedStreamingWriteTcs = null; TResponse msg = default(TResponse); - var deserializeException = TryDeserialize(receivedMessage, out msg); + var deserializeException = TryDeserialize(receivedMessageReader, out msg); bool releasedResources; lock (myLock) @@ -634,9 +634,9 @@ namespace Grpc.Core.Internal IUnaryResponseClientCallback UnaryResponseClientCallback => this; - void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) + void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders) { - HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders); + HandleUnaryResponse(success, receivedStatus, receivedMessageReader, responseHeaders); } IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 39c9f7c6160..9497371cc1b 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -228,12 +228,12 @@ namespace Grpc.Core.Internal } } - protected Exception TryDeserialize(byte[] payload, out TRead msg) + protected Exception TryDeserialize(IBufferReader reader, out TRead msg) { DefaultDeserializationContext context = null; try { - context = DefaultDeserializationContext.GetInitializedThreadLocal(payload); + context = DefaultDeserializationContext.GetInitializedThreadLocal(reader); msg = deserializer(context); return null; } @@ -245,7 +245,6 @@ namespace Grpc.Core.Internal finally { context?.Reset(); - } } @@ -333,21 +332,21 @@ namespace Grpc.Core.Internal /// /// Handles streaming read completion. /// - protected void HandleReadFinished(bool success, byte[] receivedMessage) + protected void HandleReadFinished(bool success, IBufferReader receivedMessageReader) { // if success == false, received message will be null. It that case we will // treat this completion as the last read an rely on C core to handle the failed // read (e.g. deliver approriate statusCode on the clientside). TRead msg = default(TRead); - var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; + var deserializeException = (success && receivedMessageReader.TotalLength.HasValue) ? TryDeserialize(receivedMessageReader, out msg) : null; TaskCompletionSource origTcs = null; bool releasedResources; lock (myLock) { origTcs = streamingReadTcs; - if (receivedMessage == null) + if (!receivedMessageReader.TotalLength.HasValue) { // This was the last read. readingDone = true; @@ -391,9 +390,9 @@ namespace Grpc.Core.Internal IReceivedMessageCallback ReceivedMessageCallback => this; - void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage) + void IReceivedMessageCallback.OnReceivedMessage(bool success, IBufferReader receivedMessageReader) { - HandleReadFinished(success, receivedMessage); + HandleReadFinished(success, receivedMessageReader); } } } diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 085e7faf595..61af26e9f90 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -30,10 +30,17 @@ namespace Grpc.Core.Internal void OnComplete(bool success); } + internal interface IBufferReader + { + int? TotalLength { get; } + + bool TryGetNextSlice(out Slice slice); + } + /// /// grpcsharp_batch_context /// - internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject + internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject, IBufferReader { static readonly NativeMethods Native = NativeMethods.Get(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); @@ -106,6 +113,25 @@ namespace Grpc.Core.Internal return data; } + public bool GetReceivedMessageNextSlicePeek(out Slice slice) + { + UIntPtr sliceLen; + IntPtr sliceDataPtr; + + if (0 == Native.grpcsharp_batch_context_recv_message_next_slice_peek(this, out sliceLen, out sliceDataPtr)) + { + slice = default(Slice); + return false; + } + slice = new Slice(sliceDataPtr, (int) sliceLen); + return true; + } + + public IBufferReader GetReceivedMessageReader() + { + return this; + } + // Gets data of receive_close_on_server completion. public bool GetReceivedCloseOnServerCancelled() { @@ -153,6 +179,20 @@ namespace Grpc.Core.Internal } } + int? IBufferReader.TotalLength + { + get + { + var len = Native.grpcsharp_batch_context_recv_message_length(this); + return len != new IntPtr(-1) ? (int?) len : null; + } + } + + bool IBufferReader.TryGetNextSlice(out Slice slice) + { + return GetReceivedMessageNextSlicePeek(out slice); + } + struct CompletionCallbackData { public CompletionCallbackData(BatchCompletionDelegate callback, object state) diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index a3ef3e61ee1..858d2a69605 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -35,11 +35,11 @@ namespace Grpc.Core.Internal // Completion handlers are pre-allocated to avoid unneccessary delegate allocations. // The "state" field is used to store the actual callback to invoke. static readonly BatchCompletionDelegate CompletionHandler_IUnaryResponseClientCallback = - (success, context, state) => ((IUnaryResponseClientCallback)state).OnUnaryResponseClient(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()); + (success, context, state) => ((IUnaryResponseClientCallback)state).OnUnaryResponseClient(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessageReader(), context.GetReceivedInitialMetadata()); static readonly BatchCompletionDelegate CompletionHandler_IReceivedStatusOnClientCallback = (success, context, state) => ((IReceivedStatusOnClientCallback)state).OnReceivedStatusOnClient(success, context.GetReceivedStatusOnClient()); static readonly BatchCompletionDelegate CompletionHandler_IReceivedMessageCallback = - (success, context, state) => ((IReceivedMessageCallback)state).OnReceivedMessage(success, context.GetReceivedMessage()); + (success, context, state) => ((IReceivedMessageCallback)state).OnReceivedMessage(success, context.GetReceivedMessageReader()); static readonly BatchCompletionDelegate CompletionHandler_IReceivedResponseHeadersCallback = (success, context, state) => ((IReceivedResponseHeadersCallback)state).OnReceivedResponseHeaders(success, context.GetReceivedInitialMetadata()); static readonly BatchCompletionDelegate CompletionHandler_ISendCompletionCallback = diff --git a/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs index 7ace80e8d53..bac7bbe4c59 100644 --- a/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs +++ b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs @@ -20,6 +20,10 @@ using Grpc.Core.Utils; using System; using System.Threading; +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY +using System.Buffers; +#endif + namespace Grpc.Core.Internal { internal class DefaultDeserializationContext : DeserializationContext @@ -27,40 +31,74 @@ namespace Grpc.Core.Internal static readonly ThreadLocal threadLocalInstance = new ThreadLocal(() => new DefaultDeserializationContext(), false); - byte[] payload; - bool alreadyCalledPayloadAsNewBuffer; + IBufferReader bufferReader; + int payloadLength; +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + ReusableSliceBuffer cachedSliceBuffer = new ReusableSliceBuffer(); +#endif public DefaultDeserializationContext() { Reset(); } - public override int PayloadLength => payload.Length; + public override int PayloadLength => payloadLength; public override byte[] PayloadAsNewBuffer() { - GrpcPreconditions.CheckState(!alreadyCalledPayloadAsNewBuffer); - alreadyCalledPayloadAsNewBuffer = true; - return payload; + var buffer = new byte[payloadLength]; + FillContinguousBuffer(bufferReader, buffer); + return buffer; } - public void Initialize(byte[] payload) +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + public override ReadOnlySequence PayloadAsReadOnlySequence() { - this.payload = GrpcPreconditions.CheckNotNull(payload); - this.alreadyCalledPayloadAsNewBuffer = false; + var sequence = cachedSliceBuffer.PopulateFrom(bufferReader); + GrpcPreconditions.CheckState(sequence.Length == payloadLength); + return sequence; + } +#endif + + public void Initialize(IBufferReader bufferReader) + { + this.bufferReader = GrpcPreconditions.CheckNotNull(bufferReader); + this.payloadLength = bufferReader.TotalLength.Value; // payload must not be null } public void Reset() { - this.payload = null; - this.alreadyCalledPayloadAsNewBuffer = true; // mark payload as read + this.bufferReader = null; + this.payloadLength = 0; +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + this.cachedSliceBuffer.Invalidate(); +#endif } - public static DefaultDeserializationContext GetInitializedThreadLocal(byte[] payload) + public static DefaultDeserializationContext GetInitializedThreadLocal(IBufferReader bufferReader) { var instance = threadLocalInstance.Value; - instance.Initialize(payload); + instance.Initialize(bufferReader); return instance; } + +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + private void FillContinguousBuffer(IBufferReader reader, byte[] destination) + { + PayloadAsReadOnlySequence().CopyTo(new Span(destination)); + } +#else + private void FillContinguousBuffer(IBufferReader reader, byte[] destination) + { + int offset = 0; + while (reader.TryGetNextSlice(out Slice slice)) + { + slice.CopyTo(new ArraySegment(destination, offset, (int)slice.Length)); + offset += (int)slice.Length; + } + // check that we filled the entire destination + GrpcPreconditions.CheckState(offset == payloadLength); + } +#endif } } diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index 5c35b2ba461..98117c6988a 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -22,7 +22,7 @@ namespace Grpc.Core.Internal { internal interface IUnaryResponseClientCallback { - void OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders); + void OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders); } // Received status for streaming response calls. @@ -33,7 +33,7 @@ namespace Grpc.Core.Internal internal interface IReceivedMessageCallback { - void OnReceivedMessage(bool success, byte[] receivedMessage); + void OnReceivedMessage(bool success, IBufferReader receivedMessageReader); } internal interface IReceivedResponseHeadersCallback diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs index a1387aff562..9752a8e5a05 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs @@ -41,6 +41,7 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_batch_context_recv_initial_metadata_delegate grpcsharp_batch_context_recv_initial_metadata; public readonly Delegates.grpcsharp_batch_context_recv_message_length_delegate grpcsharp_batch_context_recv_message_length; public readonly Delegates.grpcsharp_batch_context_recv_message_to_buffer_delegate grpcsharp_batch_context_recv_message_to_buffer; + public readonly Delegates.grpcsharp_batch_context_recv_message_next_slice_peek_delegate grpcsharp_batch_context_recv_message_next_slice_peek; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate grpcsharp_batch_context_recv_status_on_client_status; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata; @@ -142,6 +143,7 @@ namespace Grpc.Core.Internal this.grpcsharp_batch_context_recv_initial_metadata = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_message_length = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_message_to_buffer = GetMethodDelegate(library); + this.grpcsharp_batch_context_recv_message_next_slice_peek = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_status_on_client_status = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate(library); @@ -242,6 +244,7 @@ namespace Grpc.Core.Internal this.grpcsharp_batch_context_recv_initial_metadata = DllImportsFromStaticLib.grpcsharp_batch_context_recv_initial_metadata; this.grpcsharp_batch_context_recv_message_length = DllImportsFromStaticLib.grpcsharp_batch_context_recv_message_length; this.grpcsharp_batch_context_recv_message_to_buffer = DllImportsFromStaticLib.grpcsharp_batch_context_recv_message_to_buffer; + this.grpcsharp_batch_context_recv_message_next_slice_peek = DllImportsFromStaticLib.grpcsharp_batch_context_recv_message_next_slice_peek; this.grpcsharp_batch_context_recv_status_on_client_status = DllImportsFromStaticLib.grpcsharp_batch_context_recv_status_on_client_status; this.grpcsharp_batch_context_recv_status_on_client_details = DllImportsFromStaticLib.grpcsharp_batch_context_recv_status_on_client_details; this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = DllImportsFromStaticLib.grpcsharp_batch_context_recv_status_on_client_trailing_metadata; @@ -342,6 +345,7 @@ namespace Grpc.Core.Internal this.grpcsharp_batch_context_recv_initial_metadata = DllImportsFromSharedLib.grpcsharp_batch_context_recv_initial_metadata; this.grpcsharp_batch_context_recv_message_length = DllImportsFromSharedLib.grpcsharp_batch_context_recv_message_length; this.grpcsharp_batch_context_recv_message_to_buffer = DllImportsFromSharedLib.grpcsharp_batch_context_recv_message_to_buffer; + this.grpcsharp_batch_context_recv_message_next_slice_peek = DllImportsFromSharedLib.grpcsharp_batch_context_recv_message_next_slice_peek; this.grpcsharp_batch_context_recv_status_on_client_status = DllImportsFromSharedLib.grpcsharp_batch_context_recv_status_on_client_status; this.grpcsharp_batch_context_recv_status_on_client_details = DllImportsFromSharedLib.grpcsharp_batch_context_recv_status_on_client_details; this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = DllImportsFromSharedLib.grpcsharp_batch_context_recv_status_on_client_trailing_metadata; @@ -445,6 +449,7 @@ namespace Grpc.Core.Internal public delegate IntPtr grpcsharp_batch_context_recv_initial_metadata_delegate(BatchContextSafeHandle ctx); public delegate IntPtr grpcsharp_batch_context_recv_message_length_delegate(BatchContextSafeHandle ctx); public delegate void grpcsharp_batch_context_recv_message_to_buffer_delegate(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen); + public delegate int grpcsharp_batch_context_recv_message_next_slice_peek_delegate(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr); public delegate StatusCode grpcsharp_batch_context_recv_status_on_client_status_delegate(BatchContextSafeHandle ctx); public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx, out UIntPtr detailsLength); public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx); @@ -564,6 +569,9 @@ namespace Grpc.Core.Internal [DllImport(ImportName)] public static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen); + [DllImport(ImportName)] + public static extern int grpcsharp_batch_context_recv_message_next_slice_peek(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr); + [DllImport(ImportName)] public static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx); @@ -860,6 +868,9 @@ namespace Grpc.Core.Internal [DllImport(ImportName)] public static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen); + [DllImport(ImportName)] + public static extern int grpcsharp_batch_context_recv_message_next_slice_peek(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr); + [DllImport(ImportName)] public static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx); diff --git a/src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs b/src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs new file mode 100644 index 00000000000..fb8d2e720de --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs @@ -0,0 +1,148 @@ +#region Copyright notice and license + +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + +using Grpc.Core.Utils; +using System; +using System.Threading; + +using System.Buffers; + +namespace Grpc.Core.Internal +{ + internal class ReusableSliceBuffer + { + public const int MaxCachedSegments = 1024; // ~4MB payload for 4K slices + + readonly SliceSegment[] cachedSegments = new SliceSegment[MaxCachedSegments]; + int populatedSegmentCount = 0; + + public ReadOnlySequence PopulateFrom(IBufferReader bufferReader) + { + long offset = 0; + int index = 0; + SliceSegment prevSegment = null; + while (bufferReader.TryGetNextSlice(out Slice slice)) + { + // Initialize cached segment if still null or just allocate a new segment if we already reached MaxCachedSegments + var current = index < cachedSegments.Length ? cachedSegments[index] : new SliceSegment(); + if (current == null) + { + current = cachedSegments[index] = new SliceSegment(); + } + + current.Reset(slice, offset); + prevSegment?.SetNext(current); + + index ++; + offset += slice.Length; + prevSegment = current; + } + populatedSegmentCount = index; + + // Not necessary for ending the ReadOnlySequence, but for making sure we + // don't keep more than MaxCachedSegments alive. + prevSegment?.SetNext(null); + + if (index == 0) + { + return ReadOnlySequence.Empty; + } + + var firstSegment = cachedSegments[0]; + var lastSegment = prevSegment; + return new ReadOnlySequence(firstSegment, 0, lastSegment, lastSegment.Memory.Length); + } + + public void Invalidate() + { + if (populatedSegmentCount == 0) + { + return; + } + var segment = cachedSegments[0]; + while (segment != null) + { + segment.Reset(new Slice(IntPtr.Zero, 0), 0); + segment.SetNext(null); + segment = (SliceSegment) segment.Next; + } + populatedSegmentCount = 0; + } + + // Represents a segment in ReadOnlySequence + // Segment is backed by Slice and the instances are reusable. + private class SliceSegment : ReadOnlySequenceSegment + { + readonly SliceMemoryManager pointerMemoryManager = new SliceMemoryManager(); + + public void Reset(Slice slice, long runningIndex) + { + pointerMemoryManager.Reset(slice); + Memory = pointerMemoryManager.Memory; // maybe not always necessary + RunningIndex = runningIndex; + } + + public void SetNext(ReadOnlySequenceSegment next) + { + Next = next; + } + } + + // Allow creating instances of Memory from Slice. + // Represents a chunk of native memory, but doesn't manage its lifetime. + // Instances of this class are reuseable - they can be reset to point to a different memory chunk. + // That is important to make the instances cacheable (rather then creating new instances + // the old ones will be reused to reduce GC pressure). + private class SliceMemoryManager : MemoryManager + { + private Slice slice; + + public void Reset(Slice slice) + { + this.slice = slice; + } + + public void Reset() + { + Reset(new Slice(IntPtr.Zero, 0)); + } + + public override Span GetSpan() + { + return slice.ToSpanUnsafe(); + } + + public override MemoryHandle Pin(int elementIndex = 0) + { + throw new NotSupportedException(); + } + + public override void Unpin() + { + } + + protected override void Dispose(bool disposing) + { + // NOP + } + } + } +} +#endif diff --git a/src/csharp/Grpc.Core/Internal/Slice.cs b/src/csharp/Grpc.Core/Internal/Slice.cs new file mode 100644 index 00000000000..22eb9537951 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/Slice.cs @@ -0,0 +1,68 @@ +#region Copyright notice and license + +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// + /// Slice of native memory. + /// Rough equivalent of grpc_slice (but doesn't support inlined slices, just a pointer to data and length) + /// + internal struct Slice + { + private readonly IntPtr dataPtr; + private readonly int length; + + public Slice(IntPtr dataPtr, int length) + { + this.dataPtr = dataPtr; + this.length = length; + } + + public int Length => length; + + // copies data of the slice to given span. + // there needs to be enough space in the destination buffer + public void CopyTo(ArraySegment destination) + { + Marshal.Copy(dataPtr, destination.Array, destination.Offset, length); + } + +#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY + public Span ToSpanUnsafe() + { + unsafe + { + return new Span((byte*) dataPtr, length); + } + } +#endif + + /// + /// Returns a that represents the current . + /// + public override string ToString() + { + return string.Format("[Slice: dataPtr={0}, length={1}]", dataPtr, length); + } + } +} diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 91d3957dbf0..b42e1e875d2 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -59,12 +59,16 @@ typedef struct grpcsharp_batch_context { } send_status_from_server; grpc_metadata_array recv_initial_metadata; grpc_byte_buffer* recv_message; + grpc_byte_buffer_reader* recv_message_reader; struct { grpc_metadata_array trailing_metadata; grpc_status_code status; grpc_slice status_details; } recv_status_on_client; int recv_close_on_server_cancelled; + + /* reserve space for byte_buffer_reader */ + grpc_byte_buffer_reader reserved_recv_message_reader; } grpcsharp_batch_context; GPR_EXPORT grpcsharp_batch_context* GPR_CALLTYPE @@ -206,6 +210,9 @@ grpcsharp_batch_context_reset(grpcsharp_batch_context* ctx) { grpcsharp_metadata_array_destroy_metadata_only(&(ctx->recv_initial_metadata)); + if (ctx->recv_message_reader) { + grpc_byte_buffer_reader_destroy(ctx->recv_message_reader); + } grpc_byte_buffer_destroy(ctx->recv_message); grpcsharp_metadata_array_destroy_metadata_only( @@ -287,6 +294,45 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_recv_message_to_buffer( grpc_byte_buffer_reader_destroy(&reader); } +/* + * Gets the next slice from recv_message byte buffer. + * Returns 1 if a slice was get successfully, 0 if there are no more slices to + * read. Set slice_len to the length of the slice and the slice_data_ptr to + * point to slice's data. Caller must ensure that the byte buffer being read + * from stays alive as long as the data of the slice are being accessed + * (grpc_byte_buffer_reader_peek method is used internally) + * + * Remarks: + * Slices can only be iterated once. + * Initializes recv_message_buffer_reader if it was not initialized yet. + */ +GPR_EXPORT int GPR_CALLTYPE +grpcsharp_batch_context_recv_message_next_slice_peek( + grpcsharp_batch_context* ctx, size_t* slice_len, uint8_t** slice_data_ptr) { + *slice_len = 0; + *slice_data_ptr = NULL; + + if (!ctx->recv_message) { + return 0; + } + + if (!ctx->recv_message_reader) { + ctx->recv_message_reader = &ctx->reserved_recv_message_reader; + GPR_ASSERT(grpc_byte_buffer_reader_init(ctx->recv_message_reader, + ctx->recv_message)); + } + + grpc_slice* slice_ptr; + if (!grpc_byte_buffer_reader_peek(ctx->recv_message_reader, &slice_ptr)) { + return 0; + } + + /* recv_message buffer must not be deleted before all the data is read */ + *slice_len = GRPC_SLICE_LENGTH(*slice_ptr); + *slice_data_ptr = GRPC_SLICE_START_PTR(*slice_ptr); + return 1; +} + GPR_EXPORT grpc_status_code GPR_CALLTYPE grpcsharp_batch_context_recv_status_on_client_status( const grpcsharp_batch_context* ctx) { diff --git a/src/csharp/tests.json b/src/csharp/tests.json index c1e7fc1a6bf..cacdb305d2e 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -7,8 +7,12 @@ "Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest", "Grpc.Core.Internal.Tests.CompletionQueueEventTest", "Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest", + "Grpc.Core.Internal.Tests.DefaultDeserializationContextTest", "Grpc.Core.Internal.Tests.DefaultObjectPoolTest", + "Grpc.Core.Internal.Tests.FakeBufferReaderManagerTest", "Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest", + "Grpc.Core.Internal.Tests.ReusableSliceBufferTest", + "Grpc.Core.Internal.Tests.SliceTest", "Grpc.Core.Internal.Tests.TimespecTest", "Grpc.Core.Tests.AppDomainUnloadTest", "Grpc.Core.Tests.AuthContextTest", diff --git a/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c b/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c index 0e9d56f5bdf..1da79e2bcad 100644 --- a/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c +++ b/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c @@ -50,6 +50,10 @@ void grpcsharp_batch_context_recv_message_to_buffer() { fprintf(stderr, "Should never reach here"); abort(); } +void grpcsharp_batch_context_recv_message_next_slice_peek() { + fprintf(stderr, "Should never reach here"); + abort(); +} void grpcsharp_batch_context_recv_status_on_client_status() { fprintf(stderr, "Should never reach here"); abort(); diff --git a/templates/src/csharp/Grpc.Core/Internal/native_methods.include b/templates/src/csharp/Grpc.Core/Internal/native_methods.include index e8ec4c87b06..dab95991bca 100644 --- a/templates/src/csharp/Grpc.Core/Internal/native_methods.include +++ b/templates/src/csharp/Grpc.Core/Internal/native_methods.include @@ -7,6 +7,7 @@ native_method_signatures = [ 'IntPtr grpcsharp_batch_context_recv_initial_metadata(BatchContextSafeHandle ctx)', 'IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx)', 'void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen)', + 'int grpcsharp_batch_context_recv_message_next_slice_peek(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr)', 'StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx)', 'IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx, out UIntPtr detailsLength)', 'IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata(BatchContextSafeHandle ctx)', From d74f04680f94b22982103cdddd9f6e7e76783d47 Mon Sep 17 00:00:00 2001 From: John Luo Date: Tue, 30 Apr 2019 15:04:11 -0700 Subject: [PATCH 09/36] Restrict workaround to MSBuild 15.0 and above --- .../build/_protobuf/Google.Protobuf.Tools.targets | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets index 784f789528e..b3fd02d4faa 100644 --- a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets +++ b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets @@ -40,10 +40,11 @@ - - - - + + + + + From 5538bb81434ccb01216c82d09ade04fdf9ec65b7 Mon Sep 17 00:00:00 2001 From: Soheil Hassas Yeganeh Date: Tue, 30 Apr 2019 17:56:43 -0400 Subject: [PATCH 10/36] Use compress and decompress slice_buffers only when they are needed. Compression is not used for performance senstiive RPCs, yet chttp2 always moves buffers in/out of the compressed/decmpressed buffers. Even initilizing them is costly. Use the (de)compression buffer and state only when we are using non-identity compression. This is part of a larger performance change, and this one buys us 1%-2% depending on the benchmark. --- .../chttp2/transport/chttp2_transport.cc | 79 +++++++++++++------ .../chttp2/transport/hpack_parser.cc | 6 ++ .../ext/transport/chttp2/transport/internal.h | 35 ++++---- .../ext/transport/chttp2/transport/writing.cc | 63 ++++++++++++--- 4 files changed, 131 insertions(+), 52 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index d4188775722..6eec4359563 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -679,8 +679,6 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_slice_buffer_init(&frame_storage); grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&flow_controlled_buffer); - grpc_slice_buffer_init(&compressed_data_buffer); - grpc_slice_buffer_init(&decompressed_data_buffer); GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, grpc_combiner_scheduler(t->combiner)); @@ -704,8 +702,13 @@ grpc_chttp2_stream::~grpc_chttp2_stream() { grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(&frame_storage); - grpc_slice_buffer_destroy_internal(&compressed_data_buffer); - grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); + if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + grpc_slice_buffer_destroy_internal(&compressed_data_buffer); + } + if (stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); + } grpc_chttp2_list_remove_stalled_by_transport(t, this); grpc_chttp2_list_remove_stalled_by_stream(t, this); @@ -759,12 +762,15 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, GPR_TIMER_SCOPE("destroy_stream", 0); grpc_chttp2_transport* t = reinterpret_cast(gt); grpc_chttp2_stream* s = reinterpret_cast(gs); - - if (s->stream_compression_ctx != nullptr) { + if (s->stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS && + s->stream_compression_ctx != nullptr) { grpc_stream_compression_context_destroy(s->stream_compression_ctx); s->stream_compression_ctx = nullptr; } - if (s->stream_decompression_ctx != nullptr) { + if (s->stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS && + s->stream_decompression_ctx != nullptr) { grpc_stream_compression_context_destroy(s->stream_decompression_ctx); s->stream_decompression_ctx = nullptr; } @@ -1443,6 +1449,13 @@ static void perform_stream_op_locked(void* stream_op, s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; } + if (s->stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + s->uncompressed_data_size = 0; + s->stream_compression_ctx = nullptr; + grpc_slice_buffer_init(&s->compressed_data_buffer); + } + s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata = op_payload->send_initial_metadata.send_initial_metadata; @@ -1998,27 +2011,39 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, !s->seen_error && s->recv_trailing_metadata_finished != nullptr) { /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and * maybe decompress the next 5 bytes in the stream. */ - bool end_of_context; - if (!s->stream_decompression_ctx) { - s->stream_decompression_ctx = grpc_stream_compression_context_create( - s->stream_decompression_method); - } - if (!grpc_stream_decompress( - s->stream_decompression_ctx, &s->frame_storage, - &s->unprocessed_incoming_frames_buffer, nullptr, - GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { - grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); - grpc_slice_buffer_reset_and_unref_internal( - &s->unprocessed_incoming_frames_buffer); - s->seen_error = true; - } else { + if (s->stream_decompression_method == + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + grpc_slice_buffer_move_first(&s->frame_storage, + GRPC_HEADER_SIZE_IN_BYTES, + &s->unprocessed_incoming_frames_buffer); if (s->unprocessed_incoming_frames_buffer.length > 0) { s->unprocessed_incoming_frames_decompressed = true; pending_data = true; } - if (end_of_context) { - grpc_stream_compression_context_destroy(s->stream_decompression_ctx); - s->stream_decompression_ctx = nullptr; + } else { + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = grpc_stream_compression_context_create( + s->stream_decompression_method); + } + if (!grpc_stream_decompress( + s->stream_decompression_ctx, &s->frame_storage, + &s->unprocessed_incoming_frames_buffer, nullptr, + GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { + grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal( + &s->unprocessed_incoming_frames_buffer); + s->seen_error = true; + } else { + if (s->unprocessed_incoming_frames_buffer.length > 0) { + s->unprocessed_incoming_frames_decompressed = true; + pending_data = true; + } + if (end_of_context) { + grpc_stream_compression_context_destroy( + s->stream_decompression_ctx); + s->stream_decompression_ctx = nullptr; + } } } } @@ -2941,6 +2966,8 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, } void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() { + GPR_DEBUG_ASSERT(stream_->stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS); if (!stream_->stream_decompression_ctx) { stream_->stream_decompression_ctx = grpc_stream_compression_context_create( stream_->stream_decompression_method); @@ -2951,7 +2978,9 @@ grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); grpc_error* error; if (stream_->unprocessed_incoming_frames_buffer.length > 0) { - if (!stream_->unprocessed_incoming_frames_decompressed) { + if (!stream_->unprocessed_incoming_frames_decompressed && + stream_->stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { bool end_of_context; MaybeCreateStreamDecompressionCtx(); if (!grpc_stream_decompress(stream_->stream_decompression_ctx, diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index 5bcdb4e2326..7a37d37fd10 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1616,6 +1616,12 @@ static void parse_stream_compression_md(grpc_chttp2_transport* t, s->stream_decompression_method = GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; } + + if (s->stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + s->stream_decompression_ctx = nullptr; + grpc_slice_buffer_init(&s->decompressed_data_buffer); + } } grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 00b1fe18b28..0a55ee111ee 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -583,10 +583,6 @@ struct grpc_chttp2_stream { grpc_slice_buffer frame_storage; /* protected by t combiner */ - /* Accessed only by transport thread when stream->pending_byte_stream == false - * Accessed only by application thread when stream->pending_byte_stream == - * true */ - grpc_slice_buffer unprocessed_incoming_frames_buffer; grpc_closure* on_next = nullptr; /* protected by t combiner */ bool pending_byte_stream = false; /* protected by t combiner */ // cached length of buffer to be used by the transport thread in cases where @@ -594,6 +590,10 @@ struct grpc_chttp2_stream { // application threads are allowed to modify // unprocessed_incoming_frames_buffer size_t unprocessed_incoming_frames_buffer_cached_length = 0; + /* Accessed only by transport thread when stream->pending_byte_stream == false + * Accessed only by application thread when stream->pending_byte_stream == + * true */ + grpc_slice_buffer unprocessed_incoming_frames_buffer; grpc_closure reset_byte_stream; grpc_error* byte_stream_error = GRPC_ERROR_NONE; /* protected by t combiner */ bool received_last_frame = false; /* protected by t combiner */ @@ -634,18 +634,7 @@ struct grpc_chttp2_stream { /* Stream decompression method to be used. */ grpc_stream_compression_method stream_decompression_method = GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; - /** Stream compression decompress context */ - grpc_stream_compression_context* stream_decompression_ctx = nullptr; - /** Stream compression compress context */ - grpc_stream_compression_context* stream_compression_ctx = nullptr; - /** Buffer storing data that is compressed but not sent */ - grpc_slice_buffer compressed_data_buffer; - /** Amount of uncompressed bytes sent out when compressed_data_buffer is - * emptied */ - size_t uncompressed_data_size = 0; - /** Temporary buffer storing decompressed data */ - grpc_slice_buffer decompressed_data_buffer; /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed */ bool unprocessed_incoming_frames_decompressed = false; @@ -655,6 +644,22 @@ struct grpc_chttp2_stream { size_t decompressed_header_bytes = 0; /** Byte counter for number of bytes written */ size_t byte_counter = 0; + + /** Amount of uncompressed bytes sent out when compressed_data_buffer is + * emptied */ + size_t uncompressed_data_size; + /** Stream compression compress context */ + grpc_stream_compression_context* stream_compression_ctx; + /** Buffer storing data that is compressed but not sent */ + grpc_slice_buffer compressed_data_buffer; + + /** Stream compression decompress context */ + grpc_stream_compression_context* stream_decompression_ctx; + /** Temporary buffer storing decompressed data. + * Initialized, used, and destroyed only when stream uses (non-identity) + * compression. + */ + grpc_slice_buffer decompressed_data_buffer; }; /** Transport writing call flow: diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 3d1db0aa144..90015bd97ec 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -25,6 +25,7 @@ #include +#include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -150,7 +151,11 @@ static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s, ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64 ":s_win=%d:s_delta=%" PRId64 "]", t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length, - s->compressed_data_buffer.length, s->flow_controlled_bytes_flowed, + s->stream_compression_method == + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS + ? 0 + : s->compressed_data_buffer.length, + s->flow_controlled_bytes_flowed, t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], t->flow_control->remote_window(), @@ -325,7 +330,23 @@ class DataSendContext { bool AnyOutgoing() const { return max_outgoing() > 0; } + void FlushUncompressedBytes() { + uint32_t send_bytes = static_cast GPR_MIN( + max_outgoing(), s_->flow_controlled_buffer.length); + is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length && + s_->fetching_send_message == nullptr && + s_->send_trailing_metadata != nullptr && + grpc_metadata_batch_is_empty(s_->send_trailing_metadata); + grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes, + is_last_frame_, &s_->stats.outgoing, &t_->outbuf); + s_->flow_control->SentData(send_bytes); + s_->sending_bytes += send_bytes; + } + void FlushCompressedBytes() { + GPR_DEBUG_ASSERT(s_->stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS); + uint32_t send_bytes = static_cast GPR_MIN( max_outgoing(), s_->compressed_data_buffer.length); bool is_last_data_frame = @@ -360,6 +381,9 @@ class DataSendContext { } void CompressMoreBytes() { + GPR_DEBUG_ASSERT(s_->stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS); + if (s_->stream_compression_ctx == nullptr) { s_->stream_compression_ctx = grpc_stream_compression_context_create(s_->stream_compression_method); @@ -417,7 +441,7 @@ class StreamWriteContext { // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid if (!t_->is_client && s_->fetching_send_message == nullptr && s_->flow_controlled_buffer.length == 0 && - s_->compressed_data_buffer.length == 0 && + compressed_data_buffer_len() == 0 && s_->send_trailing_metadata != nullptr && is_default_initial_metadata(s_->send_initial_metadata)) { ConvertInitialMetadataToTrailingMetadata(); @@ -446,6 +470,13 @@ class StreamWriteContext { "send_initial_metadata_finished"); } + bool compressed_data_buffer_len() { + return s_->stream_compression_method == + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS + ? 0 + : s_->compressed_data_buffer.length; + } + void FlushWindowUpdates() { /* send any window updates */ const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate(); @@ -462,7 +493,7 @@ class StreamWriteContext { if (!s_->sent_initial_metadata) return; if (s_->flow_controlled_buffer.length == 0 && - s_->compressed_data_buffer.length == 0) { + compressed_data_buffer_len() == 0) { return; // early out: nothing to do } @@ -479,13 +510,21 @@ class StreamWriteContext { return; // early out: nothing to do } - while ((s_->flow_controlled_buffer.length > 0 || - s_->compressed_data_buffer.length > 0) && - data_send_context.max_outgoing() > 0) { - if (s_->compressed_data_buffer.length > 0) { - data_send_context.FlushCompressedBytes(); - } else { - data_send_context.CompressMoreBytes(); + if (s_->stream_compression_method == + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + while (s_->flow_controlled_buffer.length > 0 && + data_send_context.max_outgoing() > 0) { + data_send_context.FlushUncompressedBytes(); + } + } else { + while ((s_->flow_controlled_buffer.length > 0 || + s_->compressed_data_buffer.length > 0) && + data_send_context.max_outgoing() > 0) { + if (s_->compressed_data_buffer.length > 0) { + data_send_context.FlushCompressedBytes(); + } else { + data_send_context.CompressMoreBytes(); + } } } write_context_->ResetPingClock(); @@ -495,7 +534,7 @@ class StreamWriteContext { data_send_context.CallCallbacks(); stream_became_writable_ = true; if (s_->flow_controlled_buffer.length > 0 || - s_->compressed_data_buffer.length > 0) { + compressed_data_buffer_len() > 0) { GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t_, s_); } @@ -508,7 +547,7 @@ class StreamWriteContext { if (s_->send_trailing_metadata == nullptr) return; if (s_->fetching_send_message != nullptr) return; if (s_->flow_controlled_buffer.length != 0) return; - if (s_->compressed_data_buffer.length != 0) return; + if (compressed_data_buffer_len() != 0) return; GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) { From 0217450e2ca2f3b52fe18f62e7516326d4a355b2 Mon Sep 17 00:00:00 2001 From: Esun Kim Date: Fri, 3 May 2019 10:05:32 -0700 Subject: [PATCH 11/36] Sanitized some sources --- .../resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc | 2 +- .../resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc | 2 +- .../resolver/dns/c_ares/grpc_ares_wrapper_libuv_windows.cc | 2 +- src/python/grpcio_tests/tests/interop/service.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc index e272e5a8800..04e36fbcee7 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc @@ -176,4 +176,4 @@ UniquePtr NewGrpcPolledFdFactory(grpc_combiner* combiner) { } // namespace grpc_core -#endif /* GRPC_ARES == 1 && defined(GRPC_UV) */ \ No newline at end of file +#endif /* GRPC_ARES == 1 && defined(GRPC_UV) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc index cab74f8ba64..fdbb8969a1f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc @@ -49,4 +49,4 @@ bool grpc_ares_maybe_resolve_localhost_manually_locked( return out; } -#endif /* GRPC_ARES == 1 && defined(GRPC_UV) */ \ No newline at end of file +#endif /* GRPC_ARES == 1 && defined(GRPC_UV) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv_windows.cc index 07906f282aa..1232fc9d57c 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv_windows.cc @@ -80,4 +80,4 @@ bool inner_maybe_resolve_localhost_manually_locked( return false; } -#endif /* GRPC_ARES == 1 && (defined(GRPC_UV) || defined(GPR_WINDOWS)) */ \ No newline at end of file +#endif /* GRPC_ARES == 1 && (defined(GRPC_UV) || defined(GPR_WINDOWS)) */ diff --git a/src/python/grpcio_tests/tests/interop/service.py b/src/python/grpcio_tests/tests/interop/service.py index 20f76fceebf..37e4404c141 100644 --- a/src/python/grpcio_tests/tests/interop/service.py +++ b/src/python/grpcio_tests/tests/interop/service.py @@ -94,4 +94,4 @@ class TestService(test_pb2_grpc.TestServiceServicer): # NOTE(nathaniel): Apparently this is the same as the full-duplex call? # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)... def HalfDuplexCall(self, request_iterator, context): - return self.FullDuplexCall(request_iterator, context) \ No newline at end of file + return self.FullDuplexCall(request_iterator, context) From 93dc228a8a795a9b3820a0ce0f57277408694b6b Mon Sep 17 00:00:00 2001 From: Soheil Hassas Yeganeh Date: Fri, 3 May 2019 16:35:09 -0400 Subject: [PATCH 12/36] Avoid copy on slice_ref. This was accidentially added in PR #18407 --- src/core/lib/slice/slice_internal.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/slice/slice_internal.h b/src/core/lib/slice/slice_internal.h index db8c8e5c7cc..a9f6087e11f 100644 --- a/src/core/lib/slice/slice_internal.h +++ b/src/core/lib/slice/slice_internal.h @@ -222,7 +222,7 @@ inline uint32_t grpc_slice_refcount::Hash(const grpc_slice& slice) { g_hash_seed); } -inline grpc_slice grpc_slice_ref_internal(const grpc_slice& slice) { +inline const grpc_slice& grpc_slice_ref_internal(const grpc_slice& slice) { if (slice.refcount) { slice.refcount->Ref(); } From 4269ce08f4e6c6fb7a91a662e04469b26ceb0c30 Mon Sep 17 00:00:00 2001 From: vam Date: Fri, 3 May 2019 16:43:54 -0700 Subject: [PATCH 13/36] Make cc_grpc_library compatible with native proto_library and cc_proto_library rules. This is needed to comply with bazel best practices (each proto file is first processed by proto_library: https://docs.bazel.build/versions/master/be/protocol-buffer.html#proto_library) and generally bring cc_grcp_library rule up to date with latest Bazel changes (the rule hasn't gotten much updates since 2016). Detailed description. Bazel has native `cc_proto_library` rule, but it does not have `cc_grpc_library`. The rule in cc_grpc_library in this repo seems like the best candidate. This change makes possible using `cc_grpc_library` to generate on grpc library, and consume protobuf protion as dependencies. The typical `BUILD.bazel` file configuraiton now should look like the following: ```python proto_library( name = "my_proto", srcs = ["my.proto"], ) cc_proto_library( name = "my_cc_proto", deps = [":my_proto"] ) cc_grpc_library( name = "my_cc_grpc", srcs = [":my_proto"], deps = [":my_cc_proto"] ) ``` This allows to decouple all thre phases: proto descriptors generation (`proto_library`), protobuf messages library creation (`cc_proto_library`), grpc library creatio (`cc_grpc_library`). Notice how `cc_grpc_library` depends on `proto_library` (as `src`) and on `cc_proto_library` (as `deps`). Currently cc_grpc_library is designed in a way that it encapsulates all of the above and directly accepts .proto files as srcs. The previous version (before this PR) of cc_proto_library was encapsulating all of the 3 phases inside single `cc_proto_library` and also was doing it manually (without relying on the native `cc_proto_library`). The `cc_proto_library` is kept backward-compatible with the old version. --- bazel/BUILD | 12 ---- bazel/cc_grpc_library.bzl | 146 ++++++++++++++++++++++---------------- examples/BUILD | 27 +++++-- 3 files changed, 107 insertions(+), 78 deletions(-) diff --git a/bazel/BUILD b/bazel/BUILD index 32402892cc3..c3c82c9c0c7 100644 --- a/bazel/BUILD +++ b/bazel/BUILD @@ -17,15 +17,3 @@ licenses(["notice"]) # Apache v2 package(default_visibility = ["//:__subpackages__"]) load(":cc_grpc_library.bzl", "cc_grpc_library") - -proto_library( - name = "well_known_protos_list", - srcs = ["@com_google_protobuf//:well_known_protos"], -) - -cc_grpc_library( - name = "well_known_protos", - srcs = "well_known_protos_list", - proto_only = True, - deps = [], -) diff --git a/bazel/cc_grpc_library.bzl b/bazel/cc_grpc_library.bzl index 6bfcd653f51..6bfd9e0c185 100644 --- a/bazel/cc_grpc_library.bzl +++ b/bazel/cc_grpc_library.bzl @@ -2,70 +2,94 @@ load("//bazel:generate_cc.bzl", "generate_cc") -def cc_grpc_library(name, srcs, deps, proto_only, well_known_protos, generate_mocks = False, use_external = False, **kwargs): - """Generates C++ grpc classes from a .proto file. +def cc_grpc_library( + name, + srcs, + deps, + proto_only = False, + well_known_protos = True, + generate_mocks = False, + use_external = False, + grpc_only = False, + **kwargs): + """Generates C++ grpc classes for services defined in a proto file. - Assumes the generated classes will be used in cc_api_version = 2. + If grpc_only is True, this rule is compatible with proto_library and + cc_proto_library native rules such that it expects proto_library target + as srcs argument and generates only grpc library classes, expecting + protobuf messages classes library (cc_proto_library target) to be passed in + deps argument. By default grpc_only is False which makes this rule to behave + in a backwards-compatible mode (trying to generate both proto and grpc + classes). - Arguments: - name: name of rule. - srcs: a single proto_library, which wraps the .proto files with services. - deps: a list of C++ proto_library (or cc_proto_library) which provides - the compiled code of any message that the services depend on. - well_known_protos: Should this library additionally depend on well known - protos - use_external: When True the grpc deps are prefixed with //external. This - allows grpc to be used as a dependency in other bazel projects. - generate_mocks: When True, Google Mock code for client stub is generated. - **kwargs: rest of arguments, e.g., compatible_with and visibility. - """ - if len(srcs) > 1: - fail("Only one srcs value supported", "srcs") + Assumes the generated classes will be used in cc_api_version = 2. - proto_target = "_" + name + "_only" - codegen_target = "_" + name + "_codegen" - codegen_grpc_target = "_" + name + "_grpc_codegen" - proto_deps = ["_" + dep + "_only" for dep in deps if dep.find(':') == -1] - proto_deps += [dep.split(':')[0] + ':' + "_" + dep.split(':')[1] + "_only" for dep in deps if dep.find(':') != -1] + Args: + name (str): Name of rule. + srcs (list): A single .proto file which contains services definitions, + or if grpc_only parameter is True, a single proto_library which + contains services descriptors. + deps (list): A list of C++ proto_library (or cc_proto_library) which + provides the compiled code of any message that the services depend on. + proto_only (bool): If True, create only C++ proto classes library, + avoid creating C++ grpc classes library (expect it in deps). + well_known_protos (bool): Should this library additionally depend on + well known protos. + generate_mocks: when True, Google Mock code for client stub is generated. + use_external: Not used. + grpc_only: if True, generate only grpc library, expecting protobuf + messages library (cc_proto_library target) to be passed as deps. + **kwargs: rest of arguments, e.g., compatible_with and visibility + """ + if len(srcs) > 1: + fail("Only one srcs value supported", "srcs") + if grpc_only and proto_only: + fail("A mutualy exclusive configuraiton is specified: grpc_only = True and proto_only = True") - native.proto_library( - name = proto_target, - srcs = srcs, - deps = proto_deps, - **kwargs - ) + extra_deps = [] - generate_cc( - name = codegen_target, - srcs = [proto_target], - well_known_protos = well_known_protos, - **kwargs - ) + if not grpc_only: + proto_target = "_" + name + "_only" + cc_proto_target = name if proto_only else "_" + name + "_cc_proto" - if not proto_only: - plugin = "@com_github_grpc_grpc//:grpc_cpp_plugin" - generate_cc( - name = codegen_grpc_target, - srcs = [proto_target], - plugin = plugin, - well_known_protos = well_known_protos, - generate_mocks = generate_mocks, - **kwargs - ) - grpc_deps = ["@com_github_grpc_grpc//:grpc++_codegen_proto", - "//external:protobuf"] - native.cc_library( - name = name, - srcs = [":" + codegen_grpc_target, ":" + codegen_target], - hdrs = [":" + codegen_grpc_target, ":" + codegen_target], - deps = deps + grpc_deps, - **kwargs - ) - else: - native.cc_library( - name = name, - srcs = [":" + codegen_target], - hdrs = [":" + codegen_target], - deps = deps + ["//external:protobuf"], - **kwargs - ) + proto_deps = ["_" + dep + "_only" for dep in deps if dep.find(":") == -1] + proto_deps += [dep.split(":")[0] + ":" + "_" + dep.split(":")[1] + "_only" for dep in deps if dep.find(":") != -1] + + native.proto_library( + name = proto_target, + srcs = srcs, + deps = proto_deps, + **kwargs + ) + + native.cc_proto_library( + name = cc_proto_target, + deps = [":" + proto_target], + **kwargs + ) + extra_deps.append(":" + cc_proto_target) + else: + if not srcs: + fail("srcs cannot be empty", "srcs") + proto_target = srcs[0] + + if not proto_only: + codegen_grpc_target = "_" + name + "_grpc_codegen" + generate_cc( + name = codegen_grpc_target, + srcs = [proto_target], + plugin = "@com_github_grpc_grpc//:grpc_cpp_plugin", + well_known_protos = well_known_protos, + generate_mocks = generate_mocks, + **kwargs + ) + + native.cc_library( + name = name, + srcs = [":" + codegen_grpc_target], + hdrs = [":" + codegen_grpc_target], + deps = deps + + extra_deps + + ["@com_github_grpc_grpc//:grpc++_codegen_proto"], + **kwargs + ) diff --git a/examples/BUILD b/examples/BUILD index d2b39b87f4d..80f2762ff43 100644 --- a/examples/BUILD +++ b/examples/BUILD @@ -18,6 +18,7 @@ package(default_visibility = ["//visibility:public"]) load("@grpc_python_dependencies//:requirements.bzl", "requirement") load("//bazel:grpc_build_system.bzl", "grpc_proto_library") +load("//bazel:cc_grpc_library.bzl", "cc_grpc_library") load("@org_pubref_rules_protobuf//python:rules.bzl", "py_proto_library") grpc_proto_library( @@ -30,11 +31,25 @@ grpc_proto_library( srcs = ["protos/hellostreamingworld.proto"], ) -grpc_proto_library( - name = "helloworld", +# The following three rules demonstrate the usage of the cc_grpc_library rule in +# in a mode compatible with the native proto_library and cc_proto_library rules. +proto_library( + name = "helloworld_proto", srcs = ["protos/helloworld.proto"], ) +cc_proto_library( + name = "helloworld_cc_proto", + deps = [":helloworld_proto"], +) + +cc_grpc_library( + name = "helloworld_cc_grpc", + srcs = [":helloworld_proto"], + grpc_only = True, + deps = [":helloworld_cc_proto"], +) + grpc_proto_library( name = "route_guide", srcs = ["protos/route_guide.proto"], @@ -49,7 +64,7 @@ py_proto_library( name = "py_helloworld", protos = ["protos/helloworld.proto"], with_grpc = True, - deps = [requirement('protobuf'),], + deps = [requirement("protobuf")], ) cc_binary( @@ -164,8 +179,10 @@ cc_binary( cc_binary( name = "keyvaluestore_client", - srcs = ["cpp/keyvaluestore/caching_interceptor.h", - "cpp/keyvaluestore/client.cc"], + srcs = [ + "cpp/keyvaluestore/caching_interceptor.h", + "cpp/keyvaluestore/client.cc", + ], defines = ["BAZEL_BUILD"], deps = [ ":keyvaluestore", From 1dab7cf91abe80b071227fde60ffefa8cc9a7644 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Sat, 4 May 2019 19:04:39 +0200 Subject: [PATCH 14/36] job split followup: increase timeout for macos and windows C/C++ jobs --- tools/internal_ci/macos/grpc_basictests_c_cpp.cfg | 2 +- tools/internal_ci/windows/grpc_basictests_c.cfg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/internal_ci/macos/grpc_basictests_c_cpp.cfg b/tools/internal_ci/macos/grpc_basictests_c_cpp.cfg index 9783dd64673..f16e3e8ee68 100644 --- a/tools/internal_ci/macos/grpc_basictests_c_cpp.cfg +++ b/tools/internal_ci/macos/grpc_basictests_c_cpp.cfg @@ -17,7 +17,7 @@ # Location of the continuous shell script in repository. build_file: "grpc/tools/internal_ci/macos/grpc_run_tests_matrix.sh" gfile_resources: "/bigstore/grpc-testing-secrets/gcp_credentials/GrpcTesting-d0eeee2db331.json" -timeout_mins: 60 +timeout_mins: 120 action { define_artifacts { regex: "**/*sponge_log.*" diff --git a/tools/internal_ci/windows/grpc_basictests_c.cfg b/tools/internal_ci/windows/grpc_basictests_c.cfg index 150a28e3f89..223cf389d0e 100644 --- a/tools/internal_ci/windows/grpc_basictests_c.cfg +++ b/tools/internal_ci/windows/grpc_basictests_c.cfg @@ -16,7 +16,7 @@ # Location of the continuous shell script in repository. build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat" -timeout_mins: 60 +timeout_mins: 120 action { define_artifacts { regex: "**/*sponge_log.*" From 57c4877352b00fd4c86e557d7ab3f3d523240774 Mon Sep 17 00:00:00 2001 From: John Luo Date: Sat, 4 May 2019 22:29:08 -0700 Subject: [PATCH 15/36] Remove non-compatible workaround Will add this to Grpc.AspNetCore.Server instead --- .../build/_protobuf/Google.Protobuf.Tools.targets | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets index b3fd02d4faa..b1030ba1f8b 100644 --- a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets +++ b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets @@ -39,15 +39,6 @@ - - - - - - - - -