switch C# to contextual serializer and deserializer internally

pull/17167/head
Jan Tattermusch 6 years ago
parent 55e56f6b31
commit 5344c81f3c
  1. 2
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  2. 7
      src/csharp/Grpc.Core.Tests/MarshallerTest.cs
  3. 7
      src/csharp/Grpc.Core/DeserializationContext.cs
  4. 2
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  5. 27
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  6. 2
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  7. 66
      src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs
  8. 62
      src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs
  9. 16
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  10. 67
      src/csharp/Grpc.Core/Marshaller.cs
  11. 7
      src/csharp/Grpc.Core/SerializationContext.cs

@ -49,7 +49,7 @@ namespace Grpc.Core.Internal.Tests
fakeCall = new FakeNativeCall(); fakeCall = new FakeNativeCall();
asyncCallServer = new AsyncCallServer<string, string>( asyncCallServer = new AsyncCallServer<string, string>(
Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer, Marshallers.StringMarshaller.ContextualSerializer, Marshallers.StringMarshaller.ContextualDeserializer,
server); server);
asyncCallServer.InitializeForTesting(fakeCall); asyncCallServer.InitializeForTesting(fakeCall);
} }

@ -69,11 +69,8 @@ namespace Grpc.Core.Tests
Assert.AreSame(contextualSerializer, marshaller.ContextualSerializer); Assert.AreSame(contextualSerializer, marshaller.ContextualSerializer);
Assert.AreSame(contextualDeserializer, marshaller.ContextualDeserializer); Assert.AreSame(contextualDeserializer, marshaller.ContextualDeserializer);
Assert.Throws(typeof(NotImplementedException), () => marshaller.Serializer("abc"));
// test that emulated serializer and deserializer work Assert.Throws(typeof(NotImplementedException), () => marshaller.Deserializer(new byte[] {1, 2, 3}));
var origMsg = "abc";
var serialized = marshaller.Serializer(origMsg);
Assert.AreEqual(origMsg, marshaller.Deserializer(serialized));
} }
class FakeSerializationContext : SerializationContext class FakeSerializationContext : SerializationContext

@ -16,6 +16,8 @@
#endregion #endregion
using System;
namespace Grpc.Core namespace Grpc.Core
{ {
/// <summary> /// <summary>
@ -41,6 +43,9 @@ namespace Grpc.Core
/// (as there is no practical reason for doing so) and <c>DeserializationContext</c> implementations are free to assume so. /// (as there is no practical reason for doing so) and <c>DeserializationContext</c> implementations are free to assume so.
/// </summary> /// </summary>
/// <returns>byte array containing the entire payload.</returns> /// <returns>byte array containing the entire payload.</returns>
public abstract byte[] PayloadAsNewBuffer(); public virtual byte[] PayloadAsNewBuffer()
{
throw new NotImplementedException();
}
} }
} }

@ -54,7 +54,7 @@ namespace Grpc.Core.Internal
ClientSideStatus? finishedStatus; ClientSideStatus? finishedStatus;
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) : base(callDetails.RequestMarshaller.ContextualSerializer, callDetails.ResponseMarshaller.ContextualDeserializer)
{ {
this.details = callDetails.WithOptions(callDetails.Options.Normalize()); this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.

@ -40,8 +40,8 @@ namespace Grpc.Core.Internal
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
readonly Func<TWrite, byte[]> serializer; readonly Action<TWrite, SerializationContext> serializer;
readonly Func<byte[], TRead> deserializer; readonly Func<DeserializationContext, TRead> deserializer;
protected readonly object myLock = new object(); protected readonly object myLock = new object();
@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
protected bool initialMetadataSent; protected bool initialMetadataSent;
protected long streamingWritesCounter; // Number of streaming send operations started so far. protected long streamingWritesCounter; // Number of streaming send operations started so far.
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) public AsyncCallBase(Action<TWrite, SerializationContext> serializer, Func<DeserializationContext, TRead> deserializer)
{ {
this.serializer = GrpcPreconditions.CheckNotNull(serializer); this.serializer = GrpcPreconditions.CheckNotNull(serializer);
this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
@ -215,14 +215,26 @@ namespace Grpc.Core.Internal
protected byte[] UnsafeSerialize(TWrite msg) protected byte[] UnsafeSerialize(TWrite msg)
{ {
return serializer(msg); DefaultSerializationContext context = null;
try
{
context = DefaultSerializationContext.GetInitializedThreadLocal();
serializer(msg, context);
return context.GetPayload();
}
finally
{
context?.Reset();
}
} }
protected Exception TryDeserialize(byte[] payload, out TRead msg) protected Exception TryDeserialize(byte[] payload, out TRead msg)
{ {
DefaultDeserializationContext context = null;
try try
{ {
msg = deserializer(payload); context = DefaultDeserializationContext.GetInitializedThreadLocal(payload);
msg = deserializer(context);
return null; return null;
} }
catch (Exception e) catch (Exception e)
@ -230,6 +242,11 @@ namespace Grpc.Core.Internal
msg = default(TRead); msg = default(TRead);
return e; return e;
} }
finally
{
context?.Reset();
}
} }
/// <summary> /// <summary>

@ -37,7 +37,7 @@ namespace Grpc.Core.Internal
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly Server server; readonly Server server;
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer) public AsyncCallServer(Action<TResponse, SerializationContext> serializer, Func<DeserializationContext, TRequest> deserializer, Server server) : base(serializer, deserializer)
{ {
this.server = GrpcPreconditions.CheckNotNull(server); this.server = GrpcPreconditions.CheckNotNull(server);
} }

@ -0,0 +1,66 @@
#region Copyright notice and license
// Copyright 2018 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using Grpc.Core.Utils;
using System;
using System.Threading;
namespace Grpc.Core.Internal
{
internal class DefaultDeserializationContext : DeserializationContext
{
static readonly ThreadLocal<DefaultDeserializationContext> threadLocalInstance =
new ThreadLocal<DefaultDeserializationContext>(() => new DefaultDeserializationContext(), false);
byte[] payload;
bool alreadyCalledPayloadAsNewBuffer;
public DefaultDeserializationContext()
{
Reset();
}
public override int PayloadLength => payload.Length;
public override byte[] PayloadAsNewBuffer()
{
GrpcPreconditions.CheckState(!alreadyCalledPayloadAsNewBuffer);
alreadyCalledPayloadAsNewBuffer = true;
return payload;
}
public void Initialize(byte[] payload)
{
this.payload = GrpcPreconditions.CheckNotNull(payload);
this.alreadyCalledPayloadAsNewBuffer = false;
}
public void Reset()
{
this.payload = null;
this.alreadyCalledPayloadAsNewBuffer = true; // mark payload as read
}
public static DefaultDeserializationContext GetInitializedThreadLocal(byte[] payload)
{
var instance = threadLocalInstance.Value;
instance.Initialize(payload);
return instance;
}
}
}

@ -0,0 +1,62 @@
#region Copyright notice and license
// Copyright 2018 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using Grpc.Core.Utils;
using System.Threading;
namespace Grpc.Core.Internal
{
internal class DefaultSerializationContext : SerializationContext
{
static readonly ThreadLocal<DefaultSerializationContext> threadLocalInstance =
new ThreadLocal<DefaultSerializationContext>(() => new DefaultSerializationContext(), false);
bool isComplete;
byte[] payload;
public DefaultSerializationContext()
{
Reset();
}
public override void Complete(byte[] payload)
{
GrpcPreconditions.CheckState(!isComplete);
this.isComplete = true;
this.payload = payload;
}
internal byte[] GetPayload()
{
return this.payload;
}
public void Reset()
{
this.isComplete = false;
this.payload = null;
}
public static DefaultSerializationContext GetInitializedThreadLocal()
{
var instance = threadLocalInstance.Value;
instance.Reset();
return instance;
}
}
}

@ -52,8 +52,8 @@ namespace Grpc.Core.Internal
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{ {
var asyncCall = new AsyncCallServer<TRequest, TResponse>( var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer, method.ResponseMarshaller.ContextualSerializer,
method.RequestMarshaller.Deserializer, method.RequestMarshaller.ContextualDeserializer,
newRpc.Server); newRpc.Server);
asyncCall.Initialize(newRpc.Call, cq); asyncCall.Initialize(newRpc.Call, cq);
@ -116,8 +116,8 @@ namespace Grpc.Core.Internal
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{ {
var asyncCall = new AsyncCallServer<TRequest, TResponse>( var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer, method.ResponseMarshaller.ContextualSerializer,
method.RequestMarshaller.Deserializer, method.RequestMarshaller.ContextualDeserializer,
newRpc.Server); newRpc.Server);
asyncCall.Initialize(newRpc.Call, cq); asyncCall.Initialize(newRpc.Call, cq);
@ -179,8 +179,8 @@ namespace Grpc.Core.Internal
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{ {
var asyncCall = new AsyncCallServer<TRequest, TResponse>( var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer, method.ResponseMarshaller.ContextualSerializer,
method.RequestMarshaller.Deserializer, method.RequestMarshaller.ContextualDeserializer,
newRpc.Server); newRpc.Server);
asyncCall.Initialize(newRpc.Call, cq); asyncCall.Initialize(newRpc.Call, cq);
@ -242,8 +242,8 @@ namespace Grpc.Core.Internal
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{ {
var asyncCall = new AsyncCallServer<TRequest, TResponse>( var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer, method.ResponseMarshaller.ContextualSerializer,
method.RequestMarshaller.Deserializer, method.RequestMarshaller.ContextualDeserializer,
newRpc.Server); newRpc.Server);
asyncCall.Initialize(newRpc.Call, cq); asyncCall.Initialize(newRpc.Call, cq);

@ -41,6 +41,8 @@ namespace Grpc.Core
{ {
this.serializer = GrpcPreconditions.CheckNotNull(serializer, nameof(serializer)); this.serializer = GrpcPreconditions.CheckNotNull(serializer, nameof(serializer));
this.deserializer = GrpcPreconditions.CheckNotNull(deserializer, nameof(deserializer)); this.deserializer = GrpcPreconditions.CheckNotNull(deserializer, nameof(deserializer));
// contextual serialization/deserialization is emulated to make the marshaller
// usable with the grpc library (required for backward compatibility).
this.contextualSerializer = EmulateContextualSerializer; this.contextualSerializer = EmulateContextualSerializer;
this.contextualDeserializer = EmulateContextualDeserializer; this.contextualDeserializer = EmulateContextualDeserializer;
} }
@ -57,10 +59,10 @@ namespace Grpc.Core
{ {
this.contextualSerializer = GrpcPreconditions.CheckNotNull(serializer, nameof(serializer)); this.contextualSerializer = GrpcPreconditions.CheckNotNull(serializer, nameof(serializer));
this.contextualDeserializer = GrpcPreconditions.CheckNotNull(deserializer, nameof(deserializer)); this.contextualDeserializer = GrpcPreconditions.CheckNotNull(deserializer, nameof(deserializer));
// TODO(jtattermusch): once gRPC C# library switches to using contextual (de)serializer, // gRPC only uses contextual serializer/deserializer internally, so emulating the legacy
// emulating the simple (de)serializer will become unnecessary. // (de)serializer is not necessary.
this.serializer = EmulateSimpleSerializer; this.serializer = (msg) => { throw new NotImplementedException(); };
this.deserializer = EmulateSimpleDeserializer; this.deserializer = (payload) => { throw new NotImplementedException(); };
} }
/// <summary> /// <summary>
@ -85,25 +87,6 @@ namespace Grpc.Core
/// </summary> /// </summary>
public Func<DeserializationContext, T> ContextualDeserializer => this.contextualDeserializer; public Func<DeserializationContext, T> ContextualDeserializer => this.contextualDeserializer;
// for backward compatibility, emulate the simple serializer using the contextual one
private byte[] EmulateSimpleSerializer(T msg)
{
// TODO(jtattermusch): avoid the allocation by passing a thread-local instance
// This code will become unnecessary once gRPC C# library switches to using contextual (de)serializer.
var context = new EmulatedSerializationContext();
this.contextualSerializer(msg, context);
return context.GetPayload();
}
// for backward compatibility, emulate the simple deserializer using the contextual one
private T EmulateSimpleDeserializer(byte[] payload)
{
// TODO(jtattermusch): avoid the allocation by passing a thread-local instance
// This code will become unnecessary once gRPC C# library switches to using contextual (de)serializer.
var context = new EmulatedDeserializationContext(payload);
return this.contextualDeserializer(context);
}
// for backward compatibility, emulate the contextual serializer using the simple one // for backward compatibility, emulate the contextual serializer using the simple one
private void EmulateContextualSerializer(T message, SerializationContext context) private void EmulateContextualSerializer(T message, SerializationContext context)
{ {
@ -116,44 +99,6 @@ namespace Grpc.Core
{ {
return this.deserializer(context.PayloadAsNewBuffer()); return this.deserializer(context.PayloadAsNewBuffer());
} }
internal class EmulatedSerializationContext : SerializationContext
{
bool isComplete;
byte[] payload;
public override void Complete(byte[] payload)
{
GrpcPreconditions.CheckState(!isComplete);
this.isComplete = true;
this.payload = payload;
}
internal byte[] GetPayload()
{
return this.payload;
}
}
internal class EmulatedDeserializationContext : DeserializationContext
{
readonly byte[] payload;
bool alreadyCalledPayloadAsNewBuffer;
public EmulatedDeserializationContext(byte[] payload)
{
this.payload = GrpcPreconditions.CheckNotNull(payload);
}
public override int PayloadLength => payload.Length;
public override byte[] PayloadAsNewBuffer()
{
GrpcPreconditions.CheckState(!alreadyCalledPayloadAsNewBuffer);
alreadyCalledPayloadAsNewBuffer = true;
return payload;
}
}
} }
/// <summary> /// <summary>

@ -16,6 +16,8 @@
#endregion #endregion
using System;
namespace Grpc.Core namespace Grpc.Core
{ {
/// <summary> /// <summary>
@ -29,6 +31,9 @@ namespace Grpc.Core
/// payload which must not be accessed afterwards. /// payload which must not be accessed afterwards.
/// </summary> /// </summary>
/// <param name="payload">the serialized form of current message</param> /// <param name="payload">the serialized form of current message</param>
public abstract void Complete(byte[] payload); public virtual void Complete(byte[] payload)
{
throw new NotImplementedException();
}
} }
} }

Loading…
Cancel
Save