Merge pull request #2643 from jtattermusch/csharp_timeouts

C# support for timeouts.
pull/2649/head
Michael Lumish 9 years ago
commit d357cc0a63
  1. 12
      src/compiler/csharp_generator.cc
  2. 3
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  3. 202
      src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs
  4. 207
      src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
  5. 101
      src/csharp/Grpc.Core.Tests/TimespecTest.cs
  6. 15
      src/csharp/Grpc.Core/Call.cs
  7. 18
      src/csharp/Grpc.Core/Calls.cs
  8. 6
      src/csharp/Grpc.Core/ClientBase.cs
  9. 16
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  10. 20
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  11. 10
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  12. 10
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  13. 16
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  14. 183
      src/csharp/Grpc.Core/Internal/Timespec.cs
  15. 54
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  16. 30
      src/csharp/Grpc.Examples/MathGrpc.cs
  17. 9
      src/csharp/Grpc.Examples/MathServiceImpl.cs
  18. 12
      src/csharp/Grpc.HealthCheck/HealthGrpc.cs
  19. 48
      src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
  20. 16
      src/csharp/ext/grpc_csharp_ext.c

@ -269,7 +269,7 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) {
if (method_type == METHODTYPE_NO_STREAMING) {
// unary calls have an extra synchronous stub method
out->Print(
"$response$ $methodname$($request$ request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));\n",
"$response$ $methodname$($request$ request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));\n",
"methodname", method->name(), "request",
GetClassName(method->input_type()), "response",
GetClassName(method->output_type()));
@ -280,7 +280,7 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) {
method_name += "Async"; // prevent name clash with synchronous method.
}
out->Print(
"$returntype$ $methodname$($request_maybe$Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));\n",
"$returntype$ $methodname$($request_maybe$Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));\n",
"methodname", method_name, "request_maybe",
GetMethodRequestParamMaybe(method), "returntype",
GetMethodReturnTypeClient(method));
@ -332,13 +332,13 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
if (method_type == METHODTYPE_NO_STREAMING) {
// unary calls have an extra synchronous stub method
out->Print(
"public $response$ $methodname$($request$ request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))\n",
"public $response$ $methodname$($request$ request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))\n",
"methodname", method->name(), "request",
GetClassName(method->input_type()), "response",
GetClassName(method->output_type()));
out->Print("{\n");
out->Indent();
out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers);\n",
out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers, deadline);\n",
"servicenamefield", GetServiceNameFieldName(), "methodfield",
GetMethodFieldName(method));
out->Print("return Calls.BlockingUnaryCall(call, request, cancellationToken);\n");
@ -351,13 +351,13 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
method_name += "Async"; // prevent name clash with synchronous method.
}
out->Print(
"public $returntype$ $methodname$($request_maybe$Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))\n",
"public $returntype$ $methodname$($request_maybe$Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))\n",
"methodname", method_name, "request_maybe",
GetMethodRequestParamMaybe(method), "returntype",
GetMethodReturnTypeClient(method));
out->Print("{\n");
out->Indent();
out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers);\n",
out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers, deadline);\n",
"servicenamefield", GetServiceNameFieldName(), "methodfield",
GetMethodFieldName(method));
switch (GetMethodType(method)) {

@ -44,13 +44,14 @@
<Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" />
<Compile Include="GrpcEnvironmentTest.cs" />
<Compile Include="TimespecTest.cs" />
<Compile Include="PInvokeTest.cs" />
<Compile Include="Internal\MetadataArraySafeHandleTest.cs" />
<Compile Include="Internal\CompletionQueueSafeHandleTest.cs" />
<Compile Include="Internal\CompletionQueueEventTest.cs" />
<Compile Include="Internal\ChannelArgsSafeHandleTest.cs" />
<Compile Include="ChannelOptionsTest.cs" />
<Compile Include="Internal\TimespecTest.cs" />
<Compile Include="TimeoutsTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -0,0 +1,202 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using Grpc.Core.Internal;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class TimespecTest
{
[Test]
public void Now_IsInUtc()
{
Assert.AreEqual(DateTimeKind.Utc, Timespec.Now.ToDateTime().Kind);
}
[Test]
public void Now_AgreesWithUtcNow()
{
var timespec = Timespec.Now;
var utcNow = DateTime.UtcNow;
TimeSpan difference = utcNow - timespec.ToDateTime();
// This test is inherently a race - but the two timestamps
// should really be way less that a minute apart.
Assert.IsTrue(difference.TotalSeconds < 60);
}
[Test]
public void InfFuture()
{
var timespec = Timespec.InfFuture;
}
[Test]
public void InfPast()
{
var timespec = Timespec.InfPast;
}
[Test]
public void TimespecSizeIsNativeSize()
{
Assert.AreEqual(Timespec.NativeSize, Marshal.SizeOf(typeof(Timespec)));
}
[Test]
public void ToDateTime()
{
Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc),
new Timespec(IntPtr.Zero, 0).ToDateTime());
Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 10, DateTimeKind.Utc).AddTicks(50),
new Timespec(new IntPtr(10), 5000).ToDateTime());
Assert.AreEqual(new DateTime(2015, 7, 21, 4, 21, 48, DateTimeKind.Utc),
new Timespec(new IntPtr(1437452508), 0).ToDateTime());
// before epoch
Assert.AreEqual(new DateTime(1969, 12, 31, 23, 59, 55, DateTimeKind.Utc).AddTicks(10),
new Timespec(new IntPtr(-5), 1000).ToDateTime());
// infinity
Assert.AreEqual(DateTime.MaxValue, Timespec.InfFuture.ToDateTime());
Assert.AreEqual(DateTime.MinValue, Timespec.InfPast.ToDateTime());
// nanos are rounded to ticks are rounded up
Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddTicks(1),
new Timespec(IntPtr.Zero, 99).ToDateTime());
// Illegal inputs
Assert.Throws(typeof(InvalidOperationException),
() => new Timespec(new IntPtr(0), -2).ToDateTime());
Assert.Throws(typeof(InvalidOperationException),
() => new Timespec(new IntPtr(0), 1000 * 1000 * 1000).ToDateTime());
Assert.Throws(typeof(InvalidOperationException),
() => new Timespec(new IntPtr(0), 0, GPRClockType.Monotonic).ToDateTime());
}
[Test]
public void ToDateTime_ReturnsUtc()
{
Assert.AreEqual(DateTimeKind.Utc, new Timespec(new IntPtr(1437452508), 0).ToDateTime().Kind);
Assert.AreNotEqual(DateTimeKind.Unspecified, new Timespec(new IntPtr(1437452508), 0).ToDateTime().Kind);
}
[Test]
public void ToDateTime_Overflow()
{
// we can only get overflow in ticks arithmetic on 64-bit
if (IntPtr.Size == 8)
{
var timespec = new Timespec(new IntPtr(long.MaxValue - 100), 0);
Assert.AreNotEqual(Timespec.InfFuture, timespec);
Assert.AreEqual(DateTime.MaxValue, timespec.ToDateTime());
Assert.AreEqual(DateTime.MinValue, new Timespec(new IntPtr(long.MinValue + 100), 0).ToDateTime());
}
else
{
Console.WriteLine("Test cannot be run on this platform, skipping the test.");
}
}
[Test]
public void ToDateTime_OutOfDateTimeRange()
{
// we can only get out of range on 64-bit, on 32 bit the max
// timestamp is ~ Jan 19 2038, which is far within range of DateTime
// same case for min value.
if (IntPtr.Size == 8)
{
// DateTime range goes up to year 9999, 20000 years from now should
// be out of range.
long seconds = 20000L * 365L * 24L * 3600L;
var timespec = new Timespec(new IntPtr(seconds), 0);
Assert.AreNotEqual(Timespec.InfFuture, timespec);
Assert.AreEqual(DateTime.MaxValue, timespec.ToDateTime());
Assert.AreEqual(DateTime.MinValue, new Timespec(new IntPtr(-seconds), 0).ToDateTime());
}
else
{
Console.WriteLine("Test cannot be run on this platform, skipping the test");
}
}
[Test]
public void FromDateTime()
{
Assert.AreEqual(new Timespec(IntPtr.Zero, 0),
Timespec.FromDateTime(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)));
Assert.AreEqual(new Timespec(new IntPtr(10), 5000),
Timespec.FromDateTime(new DateTime(1970, 1, 1, 0, 0, 10, DateTimeKind.Utc).AddTicks(50)));
Assert.AreEqual(new Timespec(new IntPtr(1437452508), 0),
Timespec.FromDateTime(new DateTime(2015, 7, 21, 4, 21, 48, DateTimeKind.Utc)));
// before epoch
Assert.AreEqual(new Timespec(new IntPtr(-5), 1000),
Timespec.FromDateTime(new DateTime(1969, 12, 31, 23, 59, 55, DateTimeKind.Utc).AddTicks(10)));
// infinity
Assert.AreEqual(Timespec.InfFuture, Timespec.FromDateTime(DateTime.MaxValue));
Assert.AreEqual(Timespec.InfPast, Timespec.FromDateTime(DateTime.MinValue));
// illegal inputs
Assert.Throws(typeof(ArgumentException),
() => Timespec.FromDateTime(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Unspecified)));
}
[Test]
public void FromDateTime_OutOfTimespecRange()
{
// we can only get overflow in Timespec on 32-bit
if (IntPtr.Size == 4)
{
Assert.AreEqual(Timespec.InfFuture, Timespec.FromDateTime(new DateTime(2040, 1, 1, 0, 0, 0, DateTimeKind.Utc)));
Assert.AreEqual(Timespec.InfPast, Timespec.FromDateTime(new DateTime(1800, 1, 1, 0, 0, 0, DateTimeKind.Utc)));
}
else
{
Console.WriteLine("Test cannot be run on this platform, skipping the test.");
}
}
}
}

@ -0,0 +1,207 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
/// <summary>
/// Tests for Deadline support.
/// </summary>
public class TimeoutsTest
{
const string Host = "localhost";
const string ServiceName = "/tests.Test";
static readonly Method<string, string> TestMethod = new Method<string, string>(
MethodType.Unary,
"/tests.Test/Test",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(TestMethod, TestMethodHandler)
.Build();
// provides a way how to retrieve an out-of-band result value from server handler
static TaskCompletionSource<string> stringFromServerHandlerTcs;
Server server;
Channel channel;
[SetUp]
public void Init()
{
server = new Server();
server.AddServiceDefinition(ServiceDefinition);
int port = server.AddListeningPort(Host, Server.PickUnusedPort);
server.Start();
channel = new Channel(Host, port);
stringFromServerHandlerTcs = new TaskCompletionSource<string>();
}
[TearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void InfiniteDeadline()
{
// no deadline specified, check server sees infinite deadline
var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty);
Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE", CancellationToken.None));
// DateTime.MaxValue deadline specified, check server sees infinite deadline
var internalCall2 = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, DateTime.MaxValue);
Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall2, "RETURN_DEADLINE", CancellationToken.None));
}
[Test]
public void DeadlineTransferredToServer()
{
var remainingTimeClient = TimeSpan.FromDays(7);
var deadline = DateTime.UtcNow + remainingTimeClient;
Thread.Sleep(1000);
var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline);
var serverDeadlineTicksString = Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE", CancellationToken.None);
var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc);
// A fairly relaxed check that the deadline set by client and deadline seen by server
// are in agreement. C core takes care of the work with transferring deadline over the wire,
// so we don't need an exact check here.
Assert.IsTrue(Math.Abs((deadline - serverDeadline).TotalMilliseconds) < 5000);
}
[Test]
public void DeadlineInThePast()
{
var deadline = DateTime.MinValue;
var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline);
try
{
Calls.BlockingUnaryCall(internalCall, "TIMEOUT", CancellationToken.None);
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
}
}
[Test]
public void DeadlineExceededStatusOnTimeout()
{
var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline);
try
{
Calls.BlockingUnaryCall(internalCall, "TIMEOUT", CancellationToken.None);
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
}
}
[Test]
public void ServerReceivesCancellationOnTimeout()
{
var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline);
try
{
Calls.BlockingUnaryCall(internalCall, "CHECK_CANCELLATION_RECEIVED", CancellationToken.None);
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
}
Assert.AreEqual("CANCELLED", stringFromServerHandlerTcs.Task.Result);
}
private static async Task<string> TestMethodHandler(string request, ServerCallContext context)
{
if (request == "TIMEOUT")
{
await Task.Delay(60000);
return "";
}
if (request == "RETURN_DEADLINE")
{
if (context.Deadline == DateTime.MaxValue)
{
return "DATETIME_MAXVALUE";
}
return context.Deadline.Ticks.ToString();
}
if (request == "CHECK_CANCELLATION_RECEIVED")
{
// wait until cancellation token is fired.
var tcs = new TaskCompletionSource<object>();
context.CancellationToken.Register(() => { tcs.SetResult(null); });
await tcs.Task;
stringFromServerHandlerTcs.SetResult("CANCELLED");
return "";
}
return "";
}
}
}

@ -1,101 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using Grpc.Core.Internal;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class TimespecTest
{
[Test]
public void Now()
{
var timespec = Timespec.Now;
}
[Test]
public void InfFuture()
{
var timespec = Timespec.InfFuture;
}
[Test]
public void TimespecSizeIsNativeSize()
{
Assert.AreEqual(Timespec.NativeSize, Marshal.SizeOf(typeof(Timespec)));
}
[Test]
public void ToDateTime()
{
Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc),
new Timespec(IntPtr.Zero, 0).ToDateTime());
Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 10, DateTimeKind.Utc).AddTicks(50),
new Timespec(new IntPtr(10), 5000).ToDateTime());
Assert.AreEqual(new DateTime(2015, 7, 21, 4, 21, 48, DateTimeKind.Utc),
new Timespec(new IntPtr(1437452508), 0).ToDateTime());
}
[Test]
public void Add()
{
var t = new Timespec { tv_sec = new IntPtr(12345), tv_nsec = 123456789 };
var result = t.Add(TimeSpan.FromTicks(TimeSpan.TicksPerSecond * 10));
Assert.AreEqual(result.tv_sec, new IntPtr(12355));
Assert.AreEqual(result.tv_nsec, 123456789);
}
[Test]
public void Add_Nanos()
{
var t = new Timespec { tv_sec = new IntPtr(12345), tv_nsec = 123456789 };
var result = t.Add(TimeSpan.FromTicks(10));
Assert.AreEqual(result.tv_sec, new IntPtr(12345));
Assert.AreEqual(result.tv_nsec, 123456789 + 1000);
}
[Test]
public void Add_NanosOverflow()
{
var t = new Timespec { tv_sec = new IntPtr(12345), tv_nsec = 999999999 };
var result = t.Add(TimeSpan.FromTicks(TimeSpan.TicksPerSecond * 10 + 10));
Assert.AreEqual(result.tv_sec, new IntPtr(12356));
Assert.AreEqual(result.tv_nsec, 999);
}
}
}

@ -47,14 +47,21 @@ namespace Grpc.Core
readonly Marshaller<TResponse> responseMarshaller;
readonly Channel channel;
readonly Metadata headers;
readonly DateTime deadline;
public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers)
: this(serviceName, method, channel, headers, DateTime.MaxValue)
{
}
public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers, DateTime deadline)
{
this.name = method.GetFullName(serviceName);
this.requestMarshaller = method.RequestMarshaller;
this.responseMarshaller = method.ResponseMarshaller;
this.channel = Preconditions.CheckNotNull(channel);
this.headers = Preconditions.CheckNotNull(headers);
this.deadline = deadline;
}
public Channel Channel
@ -87,6 +94,14 @@ namespace Grpc.Core
}
}
public DateTime Deadline
{
get
{
return this.deadline;
}
}
public Marshaller<TRequest> RequestMarshaller
{
get

@ -50,7 +50,7 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
// TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts.
RegisterCancellationCallback(asyncCall, token);
return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers);
return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers, call.Deadline);
}
public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
@ -58,8 +58,8 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers, call.Deadline);
RegisterCancellationCallback(asyncCall, token);
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
@ -69,8 +69,8 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
asyncCall.StartServerStreamingCall(req, call.Headers);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
asyncCall.StartServerStreamingCall(req, call.Headers, call.Deadline);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
@ -81,8 +81,8 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers, call.Deadline);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
@ -93,8 +93,8 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
asyncCall.StartDuplexStreamingCall(call.Headers);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
asyncCall.StartDuplexStreamingCall(call.Headers, call.Deadline);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);

@ -76,7 +76,7 @@ namespace Grpc.Core
/// <summary>
/// Creates a new call to given method.
/// </summary>
protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata)
protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata, DateTime? deadline)
where TRequest : class
where TResponse : class
{
@ -87,8 +87,8 @@ namespace Grpc.Core
interceptor(metadata);
metadata.Freeze();
}
metadata = metadata ?? Metadata.Empty;
return new Call<TRequest, TResponse>(serviceName, method, channel, metadata);
return new Call<TRequest, TResponse>(serviceName, method, channel,
metadata ?? Metadata.Empty, deadline ?? DateTime.MaxValue);
}
}
}

@ -61,10 +61,10 @@ namespace Grpc.Core.Internal
{
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline)
{
this.channel = channel;
var call = CallSafeHandle.Create(channel.Handle, channel.CompletionRegistry, cq, methodName, channel.Target, Timespec.InfFuture);
var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, channel.Target, deadline);
channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
}
@ -76,7 +76,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Blocking unary request - unary response call.
/// </summary>
public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers)
public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline)
{
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
@ -86,7 +86,7 @@ namespace Grpc.Core.Internal
lock (myLock)
{
Initialize(channel, cq, methodName);
Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline));
started = true;
halfcloseRequested = true;
readingDone = true;
@ -126,7 +126,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - unary response call.
/// </summary>
public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers)
public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline)
{
lock (myLock)
{
@ -151,7 +151,7 @@ namespace Grpc.Core.Internal
/// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
public Task<TResponse> ClientStreamingCallAsync(Metadata headers)
public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline)
{
lock (myLock)
{
@ -173,7 +173,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - streamed response call.
/// </summary>
public void StartServerStreamingCall(TRequest msg, Metadata headers)
public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline)
{
lock (myLock)
{
@ -196,7 +196,7 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
public void StartDuplexStreamingCall(Metadata headers)
public void StartDuplexStreamingCall(Metadata headers, DateTime deadline)
{
lock (myLock)
{

@ -48,6 +48,7 @@ namespace Grpc.Core.Internal
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly GrpcEnvironment environment;
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
@ -118,6 +119,18 @@ namespace Grpc.Core.Internal
}
}
/// <summary>
/// Gets cancellation token that gets cancelled once close completion
/// is received and the cancelled flag is set.
/// </summary>
public CancellationToken CancellationToken
{
get
{
return cancellationTokenSource.Token;
}
}
protected override void OnReleaseResources()
{
environment.DebugStats.ActiveServerCalls.Decrement();
@ -138,6 +151,8 @@ namespace Grpc.Core.Internal
{
// Once we cancel, we don't have to care that much
// about reads and writes.
// TODO(jtattermusch): is this still necessary?
Cancel();
}
@ -145,6 +160,11 @@ namespace Grpc.Core.Internal
}
// TODO(jtattermusch): handle error
if (cancelled)
{
cancellationTokenSource.Cancel();
}
finishedServersideTcs.SetResult(null);
}
}

@ -45,9 +45,6 @@ namespace Grpc.Core.Internal
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
@ -98,13 +95,6 @@ namespace Grpc.Core.Internal
{
}
public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
var result = grpcsharp_channel_create_call(channel, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}
public void SetCompletionRegistry(CompletionRegistry completionRegistry)
{
this.completionRegistry = completionRegistry;

@ -46,6 +46,9 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_channel_destroy(IntPtr channel);
@ -63,6 +66,13 @@ namespace Grpc.Core.Internal
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
var result = grpcsharp_channel_create_call(this, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);

@ -72,7 +72,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc);
var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@ -126,7 +126,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc);
var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@ -180,7 +180,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc);
var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@ -238,7 +238,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc);
var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@ -295,11 +295,13 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
public static ServerCallContext NewContext(ServerRpcNew newRpc)
public static ServerCallContext NewContext(ServerRpcNew newRpc, CancellationToken cancellationToken)
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
return new ServerCallContext(
newRpc.Method, newRpc.Host, newRpc.Deadline.ToDateTime(),
newRpc.RequestMetadata, CancellationToken.None);
newRpc.Method, newRpc.Host, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken);
}
}
}

@ -32,6 +32,8 @@ using System;
using System.Runtime.InteropServices;
using System.Threading;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
@ -40,32 +42,43 @@ namespace Grpc.Core.Internal
[StructLayout(LayoutKind.Sequential)]
internal struct Timespec
{
const int NanosPerSecond = 1000 * 1000 * 1000;
const int NanosPerTick = 100;
const long NanosPerSecond = 1000 * 1000 * 1000;
const long NanosPerTick = 100;
const long TicksPerSecond = NanosPerSecond / NanosPerTick;
static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
[DllImport("grpc_csharp_ext.dll")]
static extern Timespec gprsharp_now();
static extern Timespec gprsharp_now(GPRClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
static extern Timespec gprsharp_inf_future(GPRClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
static extern Timespec gprsharp_inf_past(GPRClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
static extern Timespec gprsharp_inf_future();
static extern Timespec gprsharp_convert_clock_type(Timespec t, GPRClockType targetClock);
[DllImport("grpc_csharp_ext.dll")]
static extern int gprsharp_sizeof_timespec();
public Timespec(IntPtr tv_sec, int tv_nsec)
public Timespec(IntPtr tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, GPRClockType.Realtime)
{
}
public Timespec(IntPtr tv_sec, int tv_nsec, GPRClockType clock_type)
{
this.tv_sec = tv_sec;
this.tv_nsec = tv_nsec;
this.clock_type = GPRClockType.Realtime;
this.clock_type = clock_type;
}
// NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// so IntPtr seems to have the right size to work on both.
public System.IntPtr tv_sec;
public int tv_nsec;
public GPRClockType clock_type;
private System.IntPtr tv_sec;
private int tv_nsec;
private GPRClockType clock_type;
/// <summary>
/// Timespec a long time in the future.
@ -74,54 +87,164 @@ namespace Grpc.Core.Internal
{
get
{
return gprsharp_inf_future();
return gprsharp_inf_future(GPRClockType.Realtime);
}
}
/// <summary>
/// Timespec a long time in the past.
/// </summary>
public static Timespec InfPast
{
get
{
return gprsharp_inf_past(GPRClockType.Realtime);
}
}
/// <summary>
/// Return Timespec representing the current time.
/// </summary>
public static Timespec Now
{
get
{
return gprsharp_now();
return gprsharp_now(GPRClockType.Realtime);
}
}
public DateTime ToDateTime()
/// <summary>
/// Seconds since unix epoch.
/// </summary>
public IntPtr TimevalSeconds
{
return UnixEpoch.AddTicks(tv_sec.ToInt64() * (NanosPerSecond / NanosPerTick) + tv_nsec / NanosPerTick);
get
{
return tv_sec;
}
}
internal static int NativeSize
/// <summary>
/// The nanoseconds part of timeval.
/// </summary>
public int TimevalNanos
{
get
{
return gprsharp_sizeof_timespec();
return tv_nsec;
}
}
/// <summary>
/// Creates a GPR deadline from current instant and given timeout.
/// Converts the timespec to desired clock type.
/// </summary>
/// <returns>The from timeout.</returns>
public static Timespec DeadlineFromTimeout(TimeSpan timeout)
public Timespec ToClockType(GPRClockType targetClock)
{
return gprsharp_convert_clock_type(this, targetClock);
}
/// <summary>
/// Converts Timespec to DateTime.
/// Timespec needs to be of type GPRClockType.Realtime and needs to represent a legal value.
/// DateTime has lower resolution (100ns), so rounding can occurs.
/// Value are always rounded up to the nearest DateTime value in the future.
///
/// For Timespec.InfFuture or if timespec is after the largest representable DateTime, DateTime.MaxValue is returned.
/// For Timespec.InfPast or if timespec is before the lowest representable DateTime, DateTime.MinValue is returned.
///
/// Unless DateTime.MaxValue or DateTime.MinValue is returned, the resulting DateTime is always in UTC
/// (DateTimeKind.Utc)
/// </summary>
public DateTime ToDateTime()
{
if (timeout == Timeout.InfiniteTimeSpan)
Preconditions.CheckState(tv_nsec >= 0 && tv_nsec < NanosPerSecond);
Preconditions.CheckState(clock_type == GPRClockType.Realtime);
// fast path for InfFuture
if (this.Equals(InfFuture))
{
return DateTime.MaxValue;
}
// fast path for InfPast
if (this.Equals(InfPast))
{
return DateTime.MinValue;
}
try
{
// convert nanos to ticks, round up to the nearest tick
long ticksFromNanos = tv_nsec / NanosPerTick + ((tv_nsec % NanosPerTick != 0) ? 1 : 0);
long ticksTotal = checked(tv_sec.ToInt64() * TicksPerSecond + ticksFromNanos);
return UnixEpoch.AddTicks(ticksTotal);
}
catch (OverflowException)
{
// ticks out of long range
return tv_sec.ToInt64() > 0 ? DateTime.MaxValue : DateTime.MinValue;
}
catch (ArgumentOutOfRangeException)
{
// resulting date time would be larger than MaxValue
return tv_sec.ToInt64() > 0 ? DateTime.MaxValue : DateTime.MinValue;
}
}
/// <summary>
/// Creates DateTime to Timespec.
/// DateTime has to be in UTC (DateTimeKind.Utc) unless it's DateTime.MaxValue or DateTime.MinValue.
/// For DateTime.MaxValue of date time after the largest representable Timespec, Timespec.InfFuture is returned.
/// For DateTime.MinValue of date time before the lowest representable Timespec, Timespec.InfPast is returned.
/// </summary>
/// <returns>The date time.</returns>
/// <param name="dateTime">Date time.</param>
public static Timespec FromDateTime(DateTime dateTime)
{
if (dateTime == DateTime.MaxValue)
{
return Timespec.InfFuture;
}
return Timespec.Now.Add(timeout);
if (dateTime == DateTime.MinValue)
{
return Timespec.InfPast;
}
Preconditions.CheckArgument(dateTime.Kind == DateTimeKind.Utc, "dateTime");
try
{
TimeSpan timeSpan = dateTime - UnixEpoch;
long ticks = timeSpan.Ticks;
long seconds = ticks / TicksPerSecond;
int nanos = (int)((ticks % TicksPerSecond) * NanosPerTick);
if (nanos < 0)
{
// correct the result based on C# modulo semantics for negative dividend
seconds--;
nanos += (int)NanosPerSecond;
}
// new IntPtr possibly throws OverflowException
return new Timespec(new IntPtr(seconds), nanos);
}
catch (OverflowException)
{
return dateTime > UnixEpoch ? Timespec.InfFuture : Timespec.InfPast;
}
catch (ArgumentOutOfRangeException)
{
return dateTime > UnixEpoch ? Timespec.InfFuture : Timespec.InfPast;
}
}
public Timespec Add(TimeSpan timeSpan)
internal static int NativeSize
{
long nanos = (long)tv_nsec + (timeSpan.Ticks % TimeSpan.TicksPerSecond) * NanosPerTick;
long overflow_sec = (nanos > NanosPerSecond) ? 1 : 0;
Timespec result;
result.tv_nsec = (int)(nanos % NanosPerSecond);
result.tv_sec = new IntPtr(tv_sec.ToInt64() + (timeSpan.Ticks / TimeSpan.TicksPerSecond) + overflow_sec);
result.clock_type = GPRClockType.Realtime;
return result;
get
{
return gprsharp_sizeof_timespec();
}
}
}
}

@ -132,6 +132,60 @@ namespace math.Tests
}).Wait();
}
[Test]
public void FibWithCancel()
{
Task.Run(async () =>
{
var cts = new CancellationTokenSource();
using (var call = client.Fib(new FibArgs.Builder { Limit = 0 }.Build(),
cancellationToken: cts.Token))
{
List<long> responses = new List<long>();
try
{
while (await call.ResponseStream.MoveNext())
{
if (responses.Count == 0)
{
cts.CancelAfter(500); // make sure we cancel soon
}
responses.Add(call.ResponseStream.Current.Num_);
}
Assert.Fail();
}
catch (RpcException e)
{
Assert.IsTrue(responses.Count > 0);
Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
}
}
}).Wait();
}
[Test]
public void FibWithDeadline()
{
Task.Run(async () =>
{
using (var call = client.Fib(new FibArgs.Builder { Limit = 0 }.Build(),
deadline: DateTime.UtcNow.AddMilliseconds(500)))
{
try
{
await call.ResponseStream.ToList();
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
}
}
}).Wait();
}
// TODO: test Fib with limit=0 and cancellation
[Test]
public void Sum()

@ -44,11 +44,11 @@ namespace math {
// client interface
public interface IMathClient
{
global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
}
// server-side interface
@ -66,29 +66,29 @@ namespace math {
public MathClient(Channel channel) : base(channel)
{
}
public global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Div, headers);
var call = CreateCall(__ServiceName, __Method_Div, headers, deadline);
return Calls.BlockingUnaryCall(call, request, cancellationToken);
}
public AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Div, headers);
var call = CreateCall(__ServiceName, __Method_Div, headers, deadline);
return Calls.AsyncUnaryCall(call, request, cancellationToken);
}
public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_DivMany, headers);
var call = CreateCall(__ServiceName, __Method_DivMany, headers, deadline);
return Calls.AsyncDuplexStreamingCall(call, cancellationToken);
}
public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Fib, headers);
var call = CreateCall(__ServiceName, __Method_Fib, headers, deadline);
return Calls.AsyncServerStreamingCall(call, request, cancellationToken);
}
public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Sum, headers);
var call = CreateCall(__ServiceName, __Method_Sum, headers, deadline);
return Calls.AsyncClientStreamingCall(call, cancellationToken);
}
}

@ -54,8 +54,13 @@ namespace math
{
if (request.Limit <= 0)
{
// TODO(jtattermusch): support cancellation
throw new NotImplementedException("Not implemented yet");
// keep streaming the sequence until cancelled.
IEnumerator<Num> fibEnumerator = FibInternal(long.MaxValue).GetEnumerator();
while (!context.CancellationToken.IsCancellationRequested && fibEnumerator.MoveNext())
{
await responseStream.WriteAsync(fibEnumerator.Current);
await Task.Delay(100);
}
}
if (request.Limit > 0)

@ -24,8 +24,8 @@ namespace Grpc.Health.V1Alpha {
// client interface
public interface IHealthClient
{
global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
}
// server-side interface
@ -40,14 +40,14 @@ namespace Grpc.Health.V1Alpha {
public HealthClient(Channel channel) : base(channel)
{
}
public global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Check, headers);
var call = CreateCall(__ServiceName, __Method_Check, headers, deadline);
return Calls.BlockingUnaryCall(call, request, cancellationToken);
}
public AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Check, headers);
var call = CreateCall(__ServiceName, __Method_Check, headers, deadline);
return Calls.AsyncUnaryCall(call, request, cancellationToken);
}
}

@ -59,14 +59,14 @@ namespace grpc.testing {
// client interface
public interface ITestServiceClient
{
global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
}
// server-side interface
@ -86,44 +86,44 @@ namespace grpc.testing {
public TestServiceClient(Channel channel) : base(channel)
{
}
public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_EmptyCall, headers);
var call = CreateCall(__ServiceName, __Method_EmptyCall, headers, deadline);
return Calls.BlockingUnaryCall(call, request, cancellationToken);
}
public AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_EmptyCall, headers);
var call = CreateCall(__ServiceName, __Method_EmptyCall, headers, deadline);
return Calls.AsyncUnaryCall(call, request, cancellationToken);
}
public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_UnaryCall, headers);
var call = CreateCall(__ServiceName, __Method_UnaryCall, headers, deadline);
return Calls.BlockingUnaryCall(call, request, cancellationToken);
}
public AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_UnaryCall, headers);
var call = CreateCall(__ServiceName, __Method_UnaryCall, headers, deadline);
return Calls.AsyncUnaryCall(call, request, cancellationToken);
}
public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_StreamingOutputCall, headers);
var call = CreateCall(__ServiceName, __Method_StreamingOutputCall, headers, deadline);
return Calls.AsyncServerStreamingCall(call, request, cancellationToken);
}
public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_StreamingInputCall, headers);
var call = CreateCall(__ServiceName, __Method_StreamingInputCall, headers, deadline);
return Calls.AsyncClientStreamingCall(call, cancellationToken);
}
public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_FullDuplexCall, headers);
var call = CreateCall(__ServiceName, __Method_FullDuplexCall, headers, deadline);
return Calls.AsyncDuplexStreamingCall(call, cancellationToken);
}
public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_HalfDuplexCall, headers);
var call = CreateCall(__ServiceName, __Method_HalfDuplexCall, headers, deadline);
return Calls.AsyncDuplexStreamingCall(call, cancellationToken);
}
}

@ -433,10 +433,20 @@ grpcsharp_channel_args_destroy(grpc_channel_args *args) {
/* Timespec */
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(GPR_CLOCK_REALTIME); }
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(gpr_clock_type clock_type) {
return gpr_now(clock_type);
}
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(gpr_clock_type clock_type) {
return gpr_inf_future(clock_type);
}
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_past(gpr_clock_type clock_type) {
return gpr_inf_past(clock_type);
}
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) {
return gpr_inf_future(GPR_CLOCK_REALTIME);
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_convert_clock_type(gpr_timespec t, gpr_clock_type target_clock) {
return gpr_convert_clock_type(t, target_clock);
}
GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) {

Loading…
Cancel
Save