Merge pull request #1646 from jtattermusch/csharp_api_fixes

Incorporate feedback from API review: Use IAsyncEnumerator to represent async read stream
pull/1694/head
Tim Emiola 10 years ago
commit e1c24fcd0c
  1. 7
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  2. 1
      src/csharp/Grpc.Core.Tests/packages.config
  3. 18
      src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
  4. 27
      src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
  5. 10
      src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
  6. 2
      src/csharp/Grpc.Core/Call.cs
  7. 3
      src/csharp/Grpc.Core/Grpc.Core.csproj
  8. 1
      src/csharp/Grpc.Core/Grpc.Core.nuspec
  9. 8
      src/csharp/Grpc.Core/IAsyncStreamReader.cs
  10. 3
      src/csharp/Grpc.Core/IAsyncStreamWriter.cs
  11. 5
      src/csharp/Grpc.Core/IClientStreamWriter.cs
  12. 4
      src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
  13. 29
      src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
  14. 11
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  15. 29
      src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
  16. 3
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  17. 1
      src/csharp/Grpc.Core/ServerCallContext.cs
  18. 26
      src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
  19. 1
      src/csharp/Grpc.Core/packages.config
  20. 4
      src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
  21. 1
      src/csharp/Grpc.Examples.Tests/packages.config
  22. 3
      src/csharp/Grpc.Examples/Grpc.Examples.csproj
  23. 1
      src/csharp/Grpc.Examples/packages.config
  24. 3
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
  25. 43
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  26. 1
      src/csharp/Grpc.IntegrationTesting/packages.config

@ -34,6 +34,9 @@
<HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
@ -57,7 +60,5 @@
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<ItemGroup>
<Folder Include="Internal\" />
</ItemGroup>
<ItemGroup />
</Project>

@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages>

@ -41,8 +41,6 @@ namespace Grpc.Core
/// Return type for client streaming calls.
/// </summary>
public sealed class AsyncClientStreamingCall<TRequest, TResponse>
where TRequest : class
where TResponse : class
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> result;
@ -53,22 +51,6 @@ namespace Grpc.Core
this.result = result;
}
/// <summary>
/// Writes a request to RequestStream.
/// </summary>
public Task Write(TRequest message)
{
return requestStream.Write(message);
}
/// <summary>
/// Closes the RequestStream.
/// </summary>
public Task Close()
{
return requestStream.Close();
}
/// <summary>
/// Asynchronous call result.
/// </summary>

@ -41,8 +41,6 @@ namespace Grpc.Core
/// Return type for bidirectional streaming calls.
/// </summary>
public sealed class AsyncDuplexStreamingCall<TRequest, TResponse>
where TRequest : class
where TResponse : class
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
@ -53,31 +51,6 @@ namespace Grpc.Core
this.responseStream = responseStream;
}
/// <summary>
/// Writes a request to RequestStream.
/// </summary>
public Task Write(TRequest message)
{
return requestStream.Write(message);
}
/// <summary>
/// Closes the RequestStream.
/// </summary>
public Task Close()
{
return requestStream.Close();
}
/// <summary>
/// Reads a response from ResponseStream.
/// </summary>
/// <returns></returns>
public Task<TResponse> ReadNext()
{
return responseStream.ReadNext();
}
/// <summary>
/// Async stream to read streaming responses.
/// </summary>

@ -41,7 +41,6 @@ namespace Grpc.Core
/// Return type for server streaming calls.
/// </summary>
public sealed class AsyncServerStreamingCall<TResponse>
where TResponse : class
{
readonly IAsyncStreamReader<TResponse> responseStream;
@ -50,15 +49,6 @@ namespace Grpc.Core
this.responseStream = responseStream;
}
/// <summary>
/// Reads the next response from ResponseStream
/// </summary>
/// <returns></returns>
public Task<TResponse> ReadNext()
{
return responseStream.ReadNext();
}
/// <summary>
/// Async stream to read streaming responses.
/// </summary>

@ -41,8 +41,6 @@ namespace Grpc.Core
/// Abstraction of a call to be invoked on a client.
/// </summary>
public class Call<TRequest, TResponse>
where TRequest : class
where TResponse : class
{
readonly string name;
readonly Marshaller<TRequest> requestMarshaller;

@ -37,6 +37,9 @@
<Reference Include="System.Collections.Immutable">
<HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncDuplexStreamingCall.cs" />

@ -16,6 +16,7 @@
<tags>gRPC RPC Protocol HTTP/2</tags>
<dependencies>
<dependency id="Microsoft.Bcl.Immutable" version="1.0.34" />
<dependency id="Ix-Async" version="1.2.3" />
<dependency id="grpc.native.csharp_ext" version="0.8.0.0" />
</dependencies>
</metadata>

@ -43,13 +43,7 @@ namespace Grpc.Core
/// A stream of messages to be read.
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IAsyncStreamReader<T>
where T : class
public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse>
{
/// <summary>
/// Reads a single message. Returns null if the last message was already read.
/// A following read can only be started when the previous one finishes.
/// </summary>
Task<T> ReadNext();
}
}

@ -44,10 +44,9 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IAsyncStreamWriter<T>
where T : class
{
/// <summary>
/// Writes a single message. Only one write can be pending at a time.
/// Writes a single asynchronously. Only one write can be pending at a time.
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task Write(T message);

@ -44,11 +44,10 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IClientStreamWriter<T> : IAsyncStreamWriter<T>
where T : class
{
/// <summary>
/// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
/// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
/// </summary>
Task Close();
Task Complete();
}
}

@ -38,8 +38,6 @@ namespace Grpc.Core.Internal
/// Writes requests asynchronously to an underlying AsyncCall object.
/// </summary>
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
where TRequest : class
where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
@ -55,7 +53,7 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
public Task Close()
public Task Complete()
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendCloseFromClient(taskSource.CompletionDelegate);

@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
TResponse current;
public ClientResponseStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
}
public Task<TResponse> ReadNext()
public TResponse Current
{
get
{
if (current == null)
{
throw new InvalidOperationException("No current element is available.");
}
return current;
}
}
public async Task<bool> MoveNext(CancellationToken token)
{
if (token != CancellationToken.None)
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
var taskSource = new AsyncCompletionTaskSource<TResponse>();
call.StartReadMessage(taskSource.CompletionDelegate);
return taskSource.Task;
var result = await taskSource.Task;
this.current = result;
return result != null;
}
public void Dispose()
{
// TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}

@ -32,6 +32,7 @@
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@ -71,9 +72,10 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
var request = await requestStream.ReadNext();
Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(await requestStream.ReadNext() == null);
Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
var result = await handler(context, request);
await responseStream.Write(result);
@ -122,9 +124,10 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
var request = await requestStream.ReadNext();
Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(await requestStream.ReadNext() == null);
Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
await handler(context, request, responseStream);

@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
TRequest current;
public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call)
{
this.call = call;
}
public Task<TRequest> ReadNext()
public TRequest Current
{
get
{
if (current == null)
{
throw new InvalidOperationException("No current element is available.");
}
return current;
}
}
public async Task<bool> MoveNext(CancellationToken token)
{
if (token != CancellationToken.None)
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
var taskSource = new AsyncCompletionTaskSource<TRequest>();
call.StartReadMessage(taskSource.CompletionDelegate);
return taskSource.Task;
var result = await taskSource.Task;
this.current = result;
return result != null;
}
public void Dispose()
{
// TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}

@ -39,9 +39,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
// TODO: we need to make sure that the delegates are not collected before invoked.
//internal delegate void ServerShutdownCallbackDelegate(bool success);
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>

@ -42,7 +42,6 @@ namespace Grpc.Core
/// </summary>
public sealed class ServerCallContext
{
// TODO(jtattermusch): add cancellationToken
// TODO(jtattermusch): add deadline info

@ -49,14 +49,9 @@ namespace Grpc.Core.Utils
public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
where T : class
{
while (true)
while (await streamReader.MoveNext())
{
var elem = await streamReader.ReadNext();
if (elem == null)
{
break;
}
await asyncAction(elem);
await asyncAction(streamReader.Current);
}
}
@ -67,32 +62,27 @@ namespace Grpc.Core.Utils
where T : class
{
var result = new List<T>();
while (true)
while (await streamReader.MoveNext())
{
var elem = await streamReader.ReadNext();
if (elem == null)
{
break;
}
result.Add(elem);
result.Add(streamReader.Current);
}
return result;
}
/// <summary>
/// Writes all elements from given enumerable to the stream.
/// Closes the stream afterwards unless close = false.
/// Completes the stream afterwards unless close = false.
/// </summary>
public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool close = true)
public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
where T : class
{
foreach (var element in elements)
{
await streamWriter.Write(element);
}
if (close)
if (complete)
{
await streamWriter.Close();
await streamWriter.Complete();
}
}

@ -2,5 +2,6 @@
<packages>
<package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" />
<package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" />
</packages>

@ -37,6 +37,10 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages>

@ -35,6 +35,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages>

@ -54,6 +54,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
<Reference Include="System.Net" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Net.Http.Extensions">

@ -256,48 +256,45 @@ namespace Grpc.IntegrationTesting
var call = client.FullDuplexCall();
StreamingOutputCallResponse response;
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(31415, response.Payload.Body.Length);
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
.SetPayload(CreateZerosPayload(8)).Build());
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(9, response.Payload.Body.Length);
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653))
.SetPayload(CreateZerosPayload(1828)).Build());
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(2653, response.Payload.Body.Length);
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
.SetPayload(CreateZerosPayload(45904)).Build());
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(58979, response.Payload.Body.Length);
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Close();
await call.RequestStream.Complete();
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(null, response);
Assert.IsFalse(await call.ResponseStream.MoveNext());
Console.WriteLine("Passed!");
}).Wait();
@ -309,7 +306,7 @@ namespace Grpc.IntegrationTesting
{
Console.WriteLine("running empty_stream");
var call = client.FullDuplexCall();
await call.Close();
await call.RequestStream.Complete();
var responseList = await call.ResponseStream.ToList();
Assert.AreEqual(0, responseList.Count);
@ -392,22 +389,20 @@ namespace Grpc.IntegrationTesting
var cts = new CancellationTokenSource();
var call = client.FullDuplexCall(cts.Token);
StreamingOutputCallResponse response;
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(31415, response.Payload.Body.Length);
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
cts.Cancel();
try
{
response = await call.ResponseStream.ReadNext();
await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException e)

@ -3,6 +3,7 @@
<package id="Google.Apis.Auth" version="1.9.1" targetFramework="net45" />
<package id="Google.Apis.Core" version="1.9.1" targetFramework="net45" />
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl" version="1.1.9" targetFramework="net45" />
<package id="Microsoft.Bcl.Async" version="1.0.168" targetFramework="net45" />
<package id="Microsoft.Bcl.Build" version="1.0.14" targetFramework="net45" />

Loading…
Cancel
Save