diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index b571fe90259..f730936062d 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -64,6 +64,7 @@
Version.cs
+
diff --git a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
new file mode 100644
index 00000000000..83707e0c6da
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
@@ -0,0 +1,176 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class MarshallingErrorsTest
+ {
+ const string Host = "127.0.0.1";
+
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ var marshaller = new Marshaller(
+ (str) =>
+ {
+ if (str == "UNSERIALIZABLE_VALUE")
+ {
+ // Google.Protobuf throws exception inherited from IOException
+ throw new IOException("Error serializing the message.");
+ }
+ return System.Text.Encoding.UTF8.GetBytes(str);
+ },
+ (payload) =>
+ {
+ var s = System.Text.Encoding.UTF8.GetString(payload);
+ if (s == "UNPARSEABLE_VALUE")
+ {
+ // Google.Protobuf throws exception inherited from IOException
+ throw new IOException("Error parsing the message.");
+ }
+ return s;
+ });
+ helper = new MockServiceHelper(Host, marshaller);
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.ShutdownAsync().Wait();
+ server.ShutdownAsync().Wait();
+ }
+
+ [Test]
+ public void ResponseParsingError_UnaryResponse()
+ {
+ helper.UnaryHandler = new UnaryServerMethod((request, context) =>
+ {
+ return Task.FromResult("UNPARSEABLE_VALUE");
+ });
+
+ var ex = Assert.Throws(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "REQUEST"));
+ Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
+ }
+
+ [Test]
+ public void ResponseParsingError_StreamingResponse()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod(async (request, responseStream, context) =>
+ {
+ await responseStream.WriteAsync("UNPARSEABLE_VALUE");
+ await Task.Delay(10000);
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "REQUEST");
+ var ex = Assert.Throws(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
+ }
+
+ [Test]
+ public void RequestParsingError_UnaryRequest()
+ {
+ helper.UnaryHandler = new UnaryServerMethod((request, context) =>
+ {
+ return Task.FromResult("RESPONSE");
+ });
+
+ var ex = Assert.Throws(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "UNPARSEABLE_VALUE"));
+ // Spec doesn't define the behavior. With the current implementation server handler throws exception which results in StatusCode.Unknown.
+ Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
+ }
+
+ [Test]
+ public async Task RequestParsingError_StreamingRequest()
+ {
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod(async (requestStream, context) =>
+ {
+ Assert.Throws(async () => await requestStream.MoveNext());
+ return "RESPONSE";
+ });
+
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
+ await call.RequestStream.WriteAsync("UNPARSEABLE_VALUE");
+
+ Assert.AreEqual("RESPONSE", await call);
+ }
+
+ [Test]
+ public void RequestSerializationError_BlockingUnary()
+ {
+ Assert.Throws(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "UNSERIALIZABLE_VALUE"));
+ }
+
+ [Test]
+ public void RequestSerializationError_AsyncUnary()
+ {
+ Assert.Throws(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "UNSERIALIZABLE_VALUE"));
+ }
+
+ [Test]
+ public async Task RequestSerializationError_ClientStreaming()
+ {
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod(async (requestStream, context) =>
+ {
+ CollectionAssert.AreEqual(new [] {"A", "B"}, await requestStream.ToListAsync());
+ return "RESPONSE";
+ });
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
+ await call.RequestStream.WriteAsync("A");
+ Assert.Throws(async () => await call.RequestStream.WriteAsync("UNSERIALIZABLE_VALUE"));
+ await call.RequestStream.WriteAsync("B");
+ await call.RequestStream.CompleteAsync();
+
+ Assert.AreEqual("RESPONSE", await call);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
index bb69648d8bf..765732c7687 100644
--- a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
+++ b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
@@ -50,37 +50,14 @@ namespace Grpc.Core.Tests
{
public const string ServiceName = "tests.Test";
- public static readonly Method UnaryMethod = new Method(
- MethodType.Unary,
- ServiceName,
- "Unary",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- public static readonly Method ClientStreamingMethod = new Method(
- MethodType.ClientStreaming,
- ServiceName,
- "ClientStreaming",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- public static readonly Method ServerStreamingMethod = new Method(
- MethodType.ServerStreaming,
- ServiceName,
- "ServerStreaming",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- public static readonly Method DuplexStreamingMethod = new Method(
- MethodType.DuplexStreaming,
- ServiceName,
- "DuplexStreaming",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
readonly string host;
readonly ServerServiceDefinition serviceDefinition;
+ readonly Method unaryMethod;
+ readonly Method clientStreamingMethod;
+ readonly Method serverStreamingMethod;
+ readonly Method duplexStreamingMethod;
+
UnaryServerMethod unaryHandler;
ClientStreamingServerMethod clientStreamingHandler;
ServerStreamingServerMethod serverStreamingHandler;
@@ -89,15 +66,44 @@ namespace Grpc.Core.Tests
Server server;
Channel channel;
- public MockServiceHelper(string host = null)
+ public MockServiceHelper(string host = null, Marshaller marshaller = null)
{
this.host = host ?? "localhost";
+ marshaller = marshaller ?? Marshallers.StringMarshaller;
+
+ unaryMethod = new Method(
+ MethodType.Unary,
+ ServiceName,
+ "Unary",
+ marshaller,
+ marshaller);
+
+ clientStreamingMethod = new Method(
+ MethodType.ClientStreaming,
+ ServiceName,
+ "ClientStreaming",
+ marshaller,
+ marshaller);
+
+ serverStreamingMethod = new Method(
+ MethodType.ServerStreaming,
+ ServiceName,
+ "ServerStreaming",
+ marshaller,
+ marshaller);
+
+ duplexStreamingMethod = new Method(
+ MethodType.DuplexStreaming,
+ ServiceName,
+ "DuplexStreaming",
+ marshaller,
+ marshaller);
serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
- .AddMethod(UnaryMethod, (request, context) => unaryHandler(request, context))
- .AddMethod(ClientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context))
- .AddMethod(ServerStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context))
- .AddMethod(DuplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context))
+ .AddMethod(unaryMethod, (request, context) => unaryHandler(request, context))
+ .AddMethod(clientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context))
+ .AddMethod(serverStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context))
+ .AddMethod(duplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context))
.Build();
var defaultStatus = new Status(StatusCode.Unknown, "Default mock implementation. Please provide your own.");
@@ -155,22 +161,22 @@ namespace Grpc.Core.Tests
public CallInvocationDetails CreateUnaryCall(CallOptions options = default(CallOptions))
{
- return new CallInvocationDetails(channel, UnaryMethod, options);
+ return new CallInvocationDetails(channel, unaryMethod, options);
}
public CallInvocationDetails CreateClientStreamingCall(CallOptions options = default(CallOptions))
{
- return new CallInvocationDetails(channel, ClientStreamingMethod, options);
+ return new CallInvocationDetails(channel, clientStreamingMethod, options);
}
public CallInvocationDetails CreateServerStreamingCall(CallOptions options = default(CallOptions))
{
- return new CallInvocationDetails(channel, ServerStreamingMethod, options);
+ return new CallInvocationDetails(channel, serverStreamingMethod, options);
}
public CallInvocationDetails CreateDuplexStreamingCall(CallOptions options = default(CallOptions))
{
- return new CallInvocationDetails(channel, DuplexStreamingMethod, options);
+ return new CallInvocationDetails(channel, duplexStreamingMethod, options);
}
public string Host
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index be5d611a538..e3b00781c62 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -322,6 +322,11 @@ namespace Grpc.Core.Internal
details.Channel.RemoveCallReference(this);
}
+ protected override bool IsClient
+ {
+ get { return true; }
+ }
+
private void Initialize(CompletionQueueSafeHandle cq)
{
var call = CreateNativeCall(cq);
@@ -376,9 +381,17 @@ namespace Grpc.Core.Internal
///
private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
{
+ TResponse msg = default(TResponse);
+ var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null;
+
lock (myLock)
{
finished = true;
+
+ if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
+ {
+ receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
+ }
finishedStatus = receivedStatus;
ReleaseResourcesIfPossible();
@@ -394,10 +407,6 @@ namespace Grpc.Core.Internal
return;
}
- // TODO: handle deserialization error
- TResponse msg;
- TryDeserialize(receivedMessage, out msg);
-
unaryResponseTcs.SetResult(msg);
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 4d203946449..3e2c57c9b5b 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -33,10 +33,12 @@
using System;
using System.Diagnostics;
+using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
+
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@@ -50,6 +52,7 @@ namespace Grpc.Core.Internal
internal abstract class AsyncCallBase
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType>();
+ protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
readonly Func serializer;
readonly Func deserializer;
@@ -100,11 +103,10 @@ namespace Grpc.Core.Internal
///
/// Requests cancelling the call with given status.
///
- public void CancelWithStatus(Status status)
+ protected void CancelWithStatus(Status status)
{
lock (myLock)
{
- Preconditions.CheckState(started);
cancelRequested = true;
if (!disposed)
@@ -177,6 +179,11 @@ namespace Grpc.Core.Internal
return false;
}
+ protected abstract bool IsClient
+ {
+ get;
+ }
+
private void ReleaseResources()
{
if (call != null)
@@ -224,33 +231,31 @@ namespace Grpc.Core.Internal
return serializer(msg);
}
- protected bool TrySerialize(TWrite msg, out byte[] payload)
+ protected Exception TrySerialize(TWrite msg, out byte[] payload)
{
try
{
payload = serializer(msg);
- return true;
+ return null;
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured while trying to serialize message");
payload = null;
- return false;
+ return e;
}
}
- protected bool TryDeserialize(byte[] payload, out TRead msg)
+ protected Exception TryDeserialize(byte[] payload, out TRead msg)
{
try
{
msg = deserializer(payload);
- return true;
+ return null;
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured while trying to deserialize message.");
msg = default(TRead);
- return false;
+ return e;
}
}
@@ -319,6 +324,9 @@ namespace Grpc.Core.Internal
///
protected void HandleReadFinished(bool success, byte[] receivedMessage)
{
+ TRead msg = default(TRead);
+ var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
+
AsyncCompletionDelegate origCompletionDelegate = null;
lock (myLock)
{
@@ -331,23 +339,23 @@ namespace Grpc.Core.Internal
readingDone = true;
}
+ if (deserializeException != null && IsClient)
+ {
+ readingDone = true;
+ CancelWithStatus(DeserializeResponseFailureStatus);
+ }
+
ReleaseResourcesIfPossible();
}
- // TODO: handle the case when error occured...
+ // TODO: handle the case when success==false
- if (receivedMessage != null)
- {
- // TODO: handle deserialization error
- TRead msg;
- TryDeserialize(receivedMessage, out msg);
-
- FireCompletion(origCompletionDelegate, msg, null);
- }
- else
+ if (deserializeException != null && !IsClient)
{
- FireCompletion(origCompletionDelegate, default(TRead), null);
+ FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
+ return;
}
+ FireCompletion(origCompletionDelegate, msg, null);
}
}
}
\ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 5c47251030e..46ca4593493 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -169,6 +169,11 @@ namespace Grpc.Core.Internal
}
}
+ protected override bool IsClient
+ {
+ get { return false; }
+ }
+
protected override void CheckReadingAllowed()
{
base.CheckReadingAllowed();
diff --git a/src/csharp/Grpc.Core/Marshaller.cs b/src/csharp/Grpc.Core/Marshaller.cs
index f38cb0863ff..3493d2d38f0 100644
--- a/src/csharp/Grpc.Core/Marshaller.cs
+++ b/src/csharp/Grpc.Core/Marshaller.cs
@@ -39,7 +39,7 @@ namespace Grpc.Core
///
/// Encapsulates the logic for serializing and deserializing messages.
///
- public struct Marshaller
+ public class Marshaller
{
readonly Func serializer;
readonly Func deserializer;
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index d8547758d24..e2975b5da93 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -162,7 +162,7 @@ namespace Math.Tests
{
using (var call = client.Sum())
{
- var numbers = new List { 10, 20, 30 }.ConvertAll(n => new Num{ Num_ = n });
+ var numbers = new List { 10, 20, 30 }.ConvertAll(n => new Num { Num_ = n });
await call.RequestStream.WriteAllAsync(numbers);
var result = await call.ResponseAsync;
diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
index 95f742cc99d..6c3a53bec05 100644
--- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
+++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
@@ -88,7 +88,7 @@ namespace Grpc.HealthCheck.Tests
[Test]
public void ServiceDoesntExist()
{
- Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(new HealthCheckRequest{ Host = "", Service = "nonexistent.service" }));
+ Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(new HealthCheckRequest { Host = "", Service = "nonexistent.service" }));
}
// TODO(jtattermusch): add test with timeout once timeouts are supported
diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
index 8de8645cd18..2097c0dc8cf 100644
--- a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
+++ b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
@@ -101,7 +101,7 @@ namespace Grpc.HealthCheck.Tests
private static HealthCheckResponse.Types.ServingStatus GetStatusHelper(HealthServiceImpl impl, string host, string service)
{
- return impl.Check(new HealthCheckRequest{ Host = host, Service = service}, null).Result.Status;
+ return impl.Check(new HealthCheckRequest { Host = host, Service = service }, null).Result.Status;
}
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index ed51af19421..8343e54122c 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -37,13 +37,12 @@ using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
+using Google.Apis.Auth.OAuth2;
+using Google.Protobuf;
using Grpc.Auth;
using Grpc.Core;
using Grpc.Core.Utils;
using Grpc.Testing;
-using Google.Protobuf;
-using Google.Apis.Auth.OAuth2;
-
using NUnit.Framework;
namespace Grpc.IntegrationTesting