modify client call interface to allow reading status and trailers

pull/2597/head
Jan Tattermusch 9 years ago
parent 1cf8d429e3
commit ed4b7a7c29
  1. 8
      src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
  2. 25
      src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
  3. 24
      src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
  4. 106
      src/csharp/Grpc.Core/AsyncUnaryCall.cs
  5. 6
      src/csharp/Grpc.Core/Calls.cs
  6. 8
      src/csharp/Grpc.Core/Grpc.Core.csproj
  7. 40
      src/csharp/Grpc.Core/Internal/AsyncCall.cs

@ -44,12 +44,16 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> result;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction)
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.result = result;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
}
@ -85,7 +89,7 @@ namespace Grpc.Core
}
/// <summary>
/// Provides means to provide after the call.
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything.
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.

@ -44,15 +44,20 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
}
/// <summary>
/// Async stream to read streaming responses.
/// </summary>
@ -75,6 +80,24 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
}
/// <summary>
/// Gets the call trailing metadata if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
}
/// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything.

@ -43,11 +43,15 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.responseStream = responseStream;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
}
@ -62,6 +66,24 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
}
/// <summary>
/// Gets the call trailing metadata if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
}
/// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (response stream has been fully read), doesn't do anything.

@ -0,0 +1,106 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Grpc.Core
{
/// <summary>
/// Return type for single request - single response call.
/// </summary>
public sealed class AsyncUnaryCall<TResponse> : IDisposable
{
readonly Task<TResponse> result;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncUnaryCall(Task<TResponse> result, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.result = result;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
}
/// <summary>
/// Asynchronous call result.
/// </summary>
public Task<TResponse> Result
{
get
{
return this.result;
}
}
/// <summary>
/// Allows awaiting this object directly.
/// </summary>
public TaskAwaiter<TResponse> GetAwaiter()
{
return result.GetAwaiter();
}
/// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
}
/// <summary>
/// Gets the call trailing metadata if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
}
/// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything.
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
public void Dispose()
{
disposeAction.Invoke();
}
}
}

@ -73,7 +73,7 @@ namespace Grpc.Core
asyncCall.StartServerStreamingCall(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@ -85,7 +85,7 @@ namespace Grpc.Core
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@ -98,7 +98,7 @@ namespace Grpc.Core
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token)

@ -33,13 +33,12 @@
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Collections.Immutable, Version=1.1.36.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
<Reference Include="System.Collections.Immutable">
<HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncDuplexStreamingCall.cs" />
@ -102,6 +101,7 @@
<Compile Include="Internal\CompletionRegistry.cs" />
<Compile Include="Internal\BatchContextSafeHandle.cs" />
<Compile Include="ChannelOptions.cs" />
<Compile Include="AsyncUnaryCall.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />

@ -52,8 +52,8 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
// Set after status is received. Only used for streaming response calls.
Status? finishedStatus;
// Set after status is received. Used for both unary and streaming response calls.
ClientSideStatus? finishedStatus;
bool readObserverCompleted; // True if readObserver has already been completed.
@ -248,6 +248,32 @@ namespace Grpc.Core.Internal
}
}
/// <summary>
/// Gets the resulting status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Status GetStatus()
{
lock (myLock)
{
Preconditions.CheckState(finishedStatus.HasValue, "Status can only be accessed once the call has finished.");
return finishedStatus.Value.Status;
}
}
/// <summary>
/// Gets the trailing metadata if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Metadata GetTrailers()
{
lock (myLock)
{
Preconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished.");
return finishedStatus.Value.Trailers;
}
}
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@ -265,7 +291,7 @@ namespace Grpc.Core.Internal
if (shouldComplete)
{
var status = finishedStatus.Value;
var status = finishedStatus.Value.Status;
if (status.StatusCode != StatusCode.OK)
{
FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
@ -288,9 +314,13 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
{
var fullStatus = ctx.GetReceivedStatusOnClient();
lock (myLock)
{
finished = true;
finishedStatus = fullStatus;
halfclosed = true;
ReleaseResourcesIfPossible();
@ -302,7 +332,6 @@ namespace Grpc.Core.Internal
return;
}
var fullStatus = ctx.GetReceivedStatusOnClient();
var status = fullStatus.Status;
if (status.StatusCode != StatusCode.OK)
@ -324,13 +353,12 @@ namespace Grpc.Core.Internal
private void HandleFinished(bool success, BatchContextSafeHandle ctx)
{
var fullStatus = ctx.GetReceivedStatusOnClient();
var status = fullStatus.Status;
AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
lock (myLock)
{
finished = true;
finishedStatus = status;
finishedStatus = fullStatus;
origReadCompletionDelegate = readCompletionDelegate;

Loading…
Cancel
Save