implemented disposal logic for calls

pull/1648/head
Jan Tattermusch 10 years ago
parent 9f550a3e99
commit 2d2652d61b
  1. 7
      src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
  2. 7
      src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
  3. 7
      src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
  4. 6
      src/csharp/Grpc.Core/Calls.cs
  5. 16
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs

@ -44,11 +44,13 @@ namespace Grpc.Core
{ {
readonly IClientStreamWriter<TRequest> requestStream; readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> result; readonly Task<TResponse> result;
readonly Action disposeAction;
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result) public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction)
{ {
this.requestStream = requestStream; this.requestStream = requestStream;
this.result = result; this.result = result;
this.disposeAction = disposeAction;
} }
/// <summary> /// <summary>
@ -90,8 +92,7 @@ namespace Grpc.Core
/// </summary> /// </summary>
public void Dispose() public void Dispose()
{ {
// TODO(jtattermusch): implement disposeAction.Invoke();
throw new NotImplementedException();
} }
} }
} }

@ -44,11 +44,13 @@ namespace Grpc.Core
{ {
readonly IClientStreamWriter<TRequest> requestStream; readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream; readonly IAsyncStreamReader<TResponse> responseStream;
readonly Action disposeAction;
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream) public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
{ {
this.requestStream = requestStream; this.requestStream = requestStream;
this.responseStream = responseStream; this.responseStream = responseStream;
this.disposeAction = disposeAction;
} }
/// <summary> /// <summary>
@ -81,8 +83,7 @@ namespace Grpc.Core
/// </summary> /// </summary>
public void Dispose() public void Dispose()
{ {
// TODO(jtattermusch): implement disposeAction.Invoke();
throw new NotImplementedException();
} }
} }
} }

@ -43,10 +43,12 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{ {
readonly IAsyncStreamReader<TResponse> responseStream; readonly IAsyncStreamReader<TResponse> responseStream;
readonly Action disposeAction;
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream) public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
{ {
this.responseStream = responseStream; this.responseStream = responseStream;
this.disposeAction = disposeAction;
} }
/// <summary> /// <summary>
@ -68,8 +70,7 @@ namespace Grpc.Core
/// </summary> /// </summary>
public void Dispose() public void Dispose()
{ {
// TODO(jtattermusch): implement disposeAction.Invoke();
throw new NotImplementedException();
} }
} }
} }

@ -73,7 +73,7 @@ namespace Grpc.Core
asyncCall.StartServerStreamingCall(req, call.Headers); asyncCall.StartServerStreamingCall(req, call.Headers);
RegisterCancellationCallback(asyncCall, token); RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream); return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel);
} }
public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@ -85,7 +85,7 @@ namespace Grpc.Core
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers); var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
RegisterCancellationCallback(asyncCall, token); RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask); return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel);
} }
public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@ -98,7 +98,7 @@ namespace Grpc.Core
RegisterCancellationCallback(asyncCall, token); RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream); return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel);
} }
private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token) private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token)

@ -96,7 +96,19 @@ namespace math.Tests
Assert.AreEqual(0, response.Remainder); Assert.AreEqual(0, response.Remainder);
} }
// TODO(jtattermusch): test division by zero [Test]
public void DivByZero()
{
try
{
DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build());
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
}
}
[Test] [Test]
public void DivAsync() public void DivAsync()
@ -119,7 +131,6 @@ namespace math.Tests
var responses = await call.ResponseStream.ToList(); var responses = await call.ResponseStream.ToList();
CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 }, CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 },
responses.ConvertAll((n) => n.Num_)); responses.ConvertAll((n) => n.Num_));
} }
}).Wait(); }).Wait();
} }
@ -154,7 +165,6 @@ namespace math.Tests
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
}; };
using (var call = client.DivMany()) using (var call = client.DivMany())
{ {
await call.RequestStream.WriteAll(divArgsList); await call.RequestStream.WriteAll(divArgsList);

Loading…
Cancel
Save