error spec compliance and marshalling tests

pull/3118/head
Jan Tattermusch 10 years ago
parent ee8d6a381a
commit 67c4587c88
  1. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  2. 176
      src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
  3. 76
      src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
  4. 17
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  5. 50
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  6. 5
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  7. 2
      src/csharp/Grpc.Core/Marshaller.cs
  8. 2
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  9. 2
      src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
  10. 2
      src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
  11. 5
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs

@ -64,6 +64,7 @@
<Link>Version.cs</Link> <Link>Version.cs</Link>
</Compile> </Compile>
<Compile Include="ClientBaseTest.cs" /> <Compile Include="ClientBaseTest.cs" />
<Compile Include="MarshallingErrorsTest.cs" />
<Compile Include="ShutdownTest.cs" /> <Compile Include="ShutdownTest.cs" />
<Compile Include="Internal\AsyncCallTest.cs" /> <Compile Include="Internal\AsyncCallTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />

@ -0,0 +1,176 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class MarshallingErrorsTest
{
const string Host = "127.0.0.1";
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
var marshaller = new Marshaller<string>(
(str) =>
{
if (str == "UNSERIALIZABLE_VALUE")
{
// Google.Protobuf throws exception inherited from IOException
throw new IOException("Error serializing the message.");
}
return System.Text.Encoding.UTF8.GetBytes(str);
},
(payload) =>
{
var s = System.Text.Encoding.UTF8.GetString(payload);
if (s == "UNPARSEABLE_VALUE")
{
// Google.Protobuf throws exception inherited from IOException
throw new IOException("Error parsing the message.");
}
return s;
});
helper = new MockServiceHelper(Host, marshaller);
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
public void Cleanup()
{
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[Test]
public void ResponseParsingError_UnaryResponse()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
return Task.FromResult("UNPARSEABLE_VALUE");
});
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "REQUEST"));
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
}
[Test]
public void ResponseParsingError_StreamingResponse()
{
helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
{
await responseStream.WriteAsync("UNPARSEABLE_VALUE");
await Task.Delay(10000);
});
var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "REQUEST");
var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
}
[Test]
public void RequestParsingError_UnaryRequest()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
return Task.FromResult("RESPONSE");
});
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "UNPARSEABLE_VALUE"));
// Spec doesn't define the behavior. With the current implementation server handler throws exception which results in StatusCode.Unknown.
Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
}
[Test]
public async Task RequestParsingError_StreamingRequest()
{
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
Assert.Throws<IOException>(async () => await requestStream.MoveNext());
return "RESPONSE";
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
await call.RequestStream.WriteAsync("UNPARSEABLE_VALUE");
Assert.AreEqual("RESPONSE", await call);
}
[Test]
public void RequestSerializationError_BlockingUnary()
{
Assert.Throws<IOException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "UNSERIALIZABLE_VALUE"));
}
[Test]
public void RequestSerializationError_AsyncUnary()
{
Assert.Throws<IOException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "UNSERIALIZABLE_VALUE"));
}
[Test]
public async Task RequestSerializationError_ClientStreaming()
{
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
CollectionAssert.AreEqual(new [] {"A", "B"}, await requestStream.ToListAsync());
return "RESPONSE";
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
await call.RequestStream.WriteAsync("A");
Assert.Throws<IOException>(async () => await call.RequestStream.WriteAsync("UNSERIALIZABLE_VALUE"));
await call.RequestStream.WriteAsync("B");
await call.RequestStream.CompleteAsync();
Assert.AreEqual("RESPONSE", await call);
}
}
}

@ -50,54 +50,60 @@ namespace Grpc.Core.Tests
{ {
public const string ServiceName = "tests.Test"; public const string ServiceName = "tests.Test";
public static readonly Method<string, string> UnaryMethod = new Method<string, string>( readonly string host;
readonly ServerServiceDefinition serviceDefinition;
readonly Method<string, string> unaryMethod;
readonly Method<string, string> clientStreamingMethod;
readonly Method<string, string> serverStreamingMethod;
readonly Method<string, string> duplexStreamingMethod;
UnaryServerMethod<string, string> unaryHandler;
ClientStreamingServerMethod<string, string> clientStreamingHandler;
ServerStreamingServerMethod<string, string> serverStreamingHandler;
DuplexStreamingServerMethod<string, string> duplexStreamingHandler;
Server server;
Channel channel;
public MockServiceHelper(string host = null, Marshaller<string> marshaller = null)
{
this.host = host ?? "localhost";
marshaller = marshaller ?? Marshallers.StringMarshaller;
unaryMethod = new Method<string, string>(
MethodType.Unary, MethodType.Unary,
ServiceName, ServiceName,
"Unary", "Unary",
Marshallers.StringMarshaller, marshaller,
Marshallers.StringMarshaller); marshaller);
public static readonly Method<string, string> ClientStreamingMethod = new Method<string, string>( clientStreamingMethod = new Method<string, string>(
MethodType.ClientStreaming, MethodType.ClientStreaming,
ServiceName, ServiceName,
"ClientStreaming", "ClientStreaming",
Marshallers.StringMarshaller, marshaller,
Marshallers.StringMarshaller); marshaller);
public static readonly Method<string, string> ServerStreamingMethod = new Method<string, string>( serverStreamingMethod = new Method<string, string>(
MethodType.ServerStreaming, MethodType.ServerStreaming,
ServiceName, ServiceName,
"ServerStreaming", "ServerStreaming",
Marshallers.StringMarshaller, marshaller,
Marshallers.StringMarshaller); marshaller);
public static readonly Method<string, string> DuplexStreamingMethod = new Method<string, string>( duplexStreamingMethod = new Method<string, string>(
MethodType.DuplexStreaming, MethodType.DuplexStreaming,
ServiceName, ServiceName,
"DuplexStreaming", "DuplexStreaming",
Marshallers.StringMarshaller, marshaller,
Marshallers.StringMarshaller); marshaller);
readonly string host;
readonly ServerServiceDefinition serviceDefinition;
UnaryServerMethod<string, string> unaryHandler;
ClientStreamingServerMethod<string, string> clientStreamingHandler;
ServerStreamingServerMethod<string, string> serverStreamingHandler;
DuplexStreamingServerMethod<string, string> duplexStreamingHandler;
Server server;
Channel channel;
public MockServiceHelper(string host = null)
{
this.host = host ?? "localhost";
serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName) serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(UnaryMethod, (request, context) => unaryHandler(request, context)) .AddMethod(unaryMethod, (request, context) => unaryHandler(request, context))
.AddMethod(ClientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context)) .AddMethod(clientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context))
.AddMethod(ServerStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context)) .AddMethod(serverStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context))
.AddMethod(DuplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context)) .AddMethod(duplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context))
.Build(); .Build();
var defaultStatus = new Status(StatusCode.Unknown, "Default mock implementation. Please provide your own."); var defaultStatus = new Status(StatusCode.Unknown, "Default mock implementation. Please provide your own.");
@ -155,22 +161,22 @@ namespace Grpc.Core.Tests
public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = default(CallOptions)) public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = default(CallOptions))
{ {
return new CallInvocationDetails<string, string>(channel, UnaryMethod, options); return new CallInvocationDetails<string, string>(channel, unaryMethod, options);
} }
public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = default(CallOptions)) public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = default(CallOptions))
{ {
return new CallInvocationDetails<string, string>(channel, ClientStreamingMethod, options); return new CallInvocationDetails<string, string>(channel, clientStreamingMethod, options);
} }
public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = default(CallOptions)) public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = default(CallOptions))
{ {
return new CallInvocationDetails<string, string>(channel, ServerStreamingMethod, options); return new CallInvocationDetails<string, string>(channel, serverStreamingMethod, options);
} }
public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = default(CallOptions)) public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = default(CallOptions))
{ {
return new CallInvocationDetails<string, string>(channel, DuplexStreamingMethod, options); return new CallInvocationDetails<string, string>(channel, duplexStreamingMethod, options);
} }
public string Host public string Host

@ -322,6 +322,11 @@ namespace Grpc.Core.Internal
details.Channel.RemoveCallReference(this); details.Channel.RemoveCallReference(this);
} }
protected override bool IsClient
{
get { return true; }
}
private void Initialize(CompletionQueueSafeHandle cq) private void Initialize(CompletionQueueSafeHandle cq)
{ {
var call = CreateNativeCall(cq); var call = CreateNativeCall(cq);
@ -376,9 +381,17 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
{ {
TResponse msg = default(TResponse);
var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null;
lock (myLock) lock (myLock)
{ {
finished = true; finished = true;
if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
{
receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
}
finishedStatus = receivedStatus; finishedStatus = receivedStatus;
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
@ -394,10 +407,6 @@ namespace Grpc.Core.Internal
return; return;
} }
// TODO: handle deserialization error
TResponse msg;
TryDeserialize(receivedMessage, out msg);
unaryResponseTcs.SetResult(msg); unaryResponseTcs.SetResult(msg);
} }

@ -33,10 +33,12 @@
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Grpc.Core.Internal; using Grpc.Core.Internal;
using Grpc.Core.Logging; using Grpc.Core.Logging;
using Grpc.Core.Utils; using Grpc.Core.Utils;
@ -50,6 +52,7 @@ namespace Grpc.Core.Internal
internal abstract class AsyncCallBase<TWrite, TRead> internal abstract class AsyncCallBase<TWrite, TRead>
{ {
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
readonly Func<TWrite, byte[]> serializer; readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer; readonly Func<byte[], TRead> deserializer;
@ -100,11 +103,10 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// Requests cancelling the call with given status. /// Requests cancelling the call with given status.
/// </summary> /// </summary>
public void CancelWithStatus(Status status) protected void CancelWithStatus(Status status)
{ {
lock (myLock) lock (myLock)
{ {
Preconditions.CheckState(started);
cancelRequested = true; cancelRequested = true;
if (!disposed) if (!disposed)
@ -177,6 +179,11 @@ namespace Grpc.Core.Internal
return false; return false;
} }
protected abstract bool IsClient
{
get;
}
private void ReleaseResources() private void ReleaseResources()
{ {
if (call != null) if (call != null)
@ -224,33 +231,31 @@ namespace Grpc.Core.Internal
return serializer(msg); return serializer(msg);
} }
protected bool TrySerialize(TWrite msg, out byte[] payload) protected Exception TrySerialize(TWrite msg, out byte[] payload)
{ {
try try
{ {
payload = serializer(msg); payload = serializer(msg);
return true; return null;
} }
catch (Exception e) catch (Exception e)
{ {
Logger.Error(e, "Exception occured while trying to serialize message");
payload = null; payload = null;
return false; return e;
} }
} }
protected bool TryDeserialize(byte[] payload, out TRead msg) protected Exception TryDeserialize(byte[] payload, out TRead msg)
{ {
try try
{ {
msg = deserializer(payload); msg = deserializer(payload);
return true; return null;
} }
catch (Exception e) catch (Exception e)
{ {
Logger.Error(e, "Exception occured while trying to deserialize message.");
msg = default(TRead); msg = default(TRead);
return false; return e;
} }
} }
@ -319,6 +324,9 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
protected void HandleReadFinished(bool success, byte[] receivedMessage) protected void HandleReadFinished(bool success, byte[] receivedMessage)
{ {
TRead msg = default(TRead);
var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
AsyncCompletionDelegate<TRead> origCompletionDelegate = null; AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
lock (myLock) lock (myLock)
{ {
@ -331,23 +339,23 @@ namespace Grpc.Core.Internal
readingDone = true; readingDone = true;
} }
ReleaseResourcesIfPossible(); if (deserializeException != null && IsClient)
{
readingDone = true;
CancelWithStatus(DeserializeResponseFailureStatus);
} }
// TODO: handle the case when error occured... ReleaseResourcesIfPossible();
}
if (receivedMessage != null) // TODO: handle the case when success==false
{
// TODO: handle deserialization error
TRead msg;
TryDeserialize(receivedMessage, out msg);
FireCompletion(origCompletionDelegate, msg, null); if (deserializeException != null && !IsClient)
}
else
{ {
FireCompletion(origCompletionDelegate, default(TRead), null); FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
return;
} }
FireCompletion(origCompletionDelegate, msg, null);
} }
} }
} }

@ -169,6 +169,11 @@ namespace Grpc.Core.Internal
} }
} }
protected override bool IsClient
{
get { return false; }
}
protected override void CheckReadingAllowed() protected override void CheckReadingAllowed()
{ {
base.CheckReadingAllowed(); base.CheckReadingAllowed();

@ -39,7 +39,7 @@ namespace Grpc.Core
/// <summary> /// <summary>
/// Encapsulates the logic for serializing and deserializing messages. /// Encapsulates the logic for serializing and deserializing messages.
/// </summary> /// </summary>
public struct Marshaller<T> public class Marshaller<T>
{ {
readonly Func<T, byte[]> serializer; readonly Func<T, byte[]> serializer;
readonly Func<byte[], T> deserializer; readonly Func<byte[], T> deserializer;

@ -162,7 +162,7 @@ namespace Math.Tests
{ {
using (var call = client.Sum()) using (var call = client.Sum())
{ {
var numbers = new List<long> { 10, 20, 30 }.ConvertAll(n => new Num{ Num_ = n }); var numbers = new List<long> { 10, 20, 30 }.ConvertAll(n => new Num { Num_ = n });
await call.RequestStream.WriteAllAsync(numbers); await call.RequestStream.WriteAllAsync(numbers);
var result = await call.ResponseAsync; var result = await call.ResponseAsync;

@ -88,7 +88,7 @@ namespace Grpc.HealthCheck.Tests
[Test] [Test]
public void ServiceDoesntExist() public void ServiceDoesntExist()
{ {
Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(new HealthCheckRequest{ Host = "", Service = "nonexistent.service" })); Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(new HealthCheckRequest { Host = "", Service = "nonexistent.service" }));
} }
// TODO(jtattermusch): add test with timeout once timeouts are supported // TODO(jtattermusch): add test with timeout once timeouts are supported

@ -101,7 +101,7 @@ namespace Grpc.HealthCheck.Tests
private static HealthCheckResponse.Types.ServingStatus GetStatusHelper(HealthServiceImpl impl, string host, string service) private static HealthCheckResponse.Types.ServingStatus GetStatusHelper(HealthServiceImpl impl, string host, string service)
{ {
return impl.Check(new HealthCheckRequest{ Host = host, Service = service}, null).Result.Status; return impl.Check(new HealthCheckRequest { Host = host, Service = service }, null).Result.Status;
} }
} }
} }

@ -37,13 +37,12 @@ using System.Text.RegularExpressions;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Google.Apis.Auth.OAuth2;
using Google.Protobuf;
using Grpc.Auth; using Grpc.Auth;
using Grpc.Core; using Grpc.Core;
using Grpc.Core.Utils; using Grpc.Core.Utils;
using Grpc.Testing; using Grpc.Testing;
using Google.Protobuf;
using Google.Apis.Auth.OAuth2;
using NUnit.Framework; using NUnit.Framework;
namespace Grpc.IntegrationTesting namespace Grpc.IntegrationTesting

Loading…
Cancel
Save