make everything compile

pull/13459/head
Jan Tattermusch 7 years ago
parent 466d77b26f
commit c5638887bf
  1. 28
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  2. 106
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  3. 58
      src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
  4. 37
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  5. 20
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  6. 22
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  7. 8
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  8. 27
      src/csharp/Grpc.Core/Internal/INativeCall.cs
  9. 12
      src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs

@ -64,7 +64,7 @@ namespace Grpc.Core.Internal.Tests
public void CancelNotificationAfterStartDisposes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -76,8 +76,8 @@ namespace Grpc.Core.Internal.Tests
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
@ -89,7 +89,7 @@ namespace Grpc.Core.Internal.Tests
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true);
// Check that starting a read after cancel notification has been processed is legal.
var moveNextTask = requestStream.MoveNext();
@ -107,10 +107,10 @@ namespace Grpc.Core.Internal.Tests
// if a read completion's success==false, the request stream will silently finish
// and we rely on C core cancelling the call.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(false, null);
fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -120,7 +120,7 @@ namespace Grpc.Core.Internal.Tests
var finishedTask = asyncCallServer.ServerSideCallAsync();
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));
@ -134,10 +134,10 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
Assert.ThrowsAsync(typeof(IOException), async () => await writeTask);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -150,13 +150,13 @@ namespace Grpc.Core.Internal.Tests
var writeTask = responseStream.WriteAsync("request1");
var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
fakeCall.SendCompletionHandler(true);
fakeCall.SendStatusFromServerHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true);
Assert.DoesNotThrowAsync(async () => await writeTask);
Assert.DoesNotThrowAsync(async () => await writeStatusTask);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -170,8 +170,8 @@ namespace Grpc.Core.Internal.Tests
asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1"));
fakeCall.SendStatusFromServerHandler(true);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

@ -73,7 +73,7 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_Success()
{
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -85,7 +85,7 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_NonSuccessStatusCode()
{
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
null,
new Metadata());
@ -97,7 +97,7 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_NullResponsePayload()
{
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
null,
new Metadata());
@ -118,7 +118,7 @@ namespace Grpc.Core.Internal.Tests
public void ClientStreaming_NoRequest_Success()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -130,7 +130,7 @@ namespace Grpc.Core.Internal.Tests
public void ClientStreaming_NoRequest_NonSuccessStatusCode()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
null,
new Metadata());
@ -145,18 +145,18 @@ namespace Grpc.Core.Internal.Tests
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
writeTask.Wait();
var writeTask2 = requestStream.WriteAsync("request2");
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
writeTask2.Wait();
var completeTask = requestStream.CompleteAsync();
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
completeTask.Wait();
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -171,12 +171,12 @@ namespace Grpc.Core.Internal.Tests
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
// The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
@ -195,12 +195,12 @@ namespace Grpc.Core.Internal.Tests
var writeTask = requestStream.WriteAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
@ -215,13 +215,13 @@ namespace Grpc.Core.Internal.Tests
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
// Until the delayed write completion has been triggered,
// we still act as if there was an active write.
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2"));
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
@ -242,7 +242,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -260,7 +260,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -282,9 +282,9 @@ namespace Grpc.Core.Internal.Tests
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -298,7 +298,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -319,7 +319,7 @@ namespace Grpc.Core.Internal.Tests
var writeTask = requestStream.WriteAsync("request1");
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Cancelled),
null,
new Metadata());
@ -342,11 +342,11 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata());
Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@ -359,8 +359,8 @@ namespace Grpc.Core.Internal.Tests
var readTask = responseStream.MoveNext();
// try alternative order of completions
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@ -372,8 +372,8 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
}
@ -385,18 +385,18 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask1 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask2.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask3 = responseStream.MoveNext();
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
}
@ -409,12 +409,12 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask1 = requestStream.CompleteAsync();
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
Assert.DoesNotThrowAsync(async () => await writeTask1);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@ -427,8 +427,8 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@ -445,8 +445,8 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@ -461,14 +461,14 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
// The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
@ -486,9 +486,9 @@ namespace Grpc.Core.Internal.Tests
var writeTask = requestStream.WriteAsync("request1");
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
fakeCall.SendCompletionHandler(false);
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
fakeCall.SendCompletionCallback.OnSendCompletion(false);
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
@ -510,8 +510,8 @@ namespace Grpc.Core.Internal.Tests
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
}
@ -526,13 +526,13 @@ namespace Grpc.Core.Internal.Tests
Assert.IsTrue(fakeCall.IsCancelled);
var readTask1 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}
@ -547,13 +547,13 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}

@ -31,43 +31,43 @@ namespace Grpc.Core.Internal.Tests
/// </summary>
internal class FakeNativeCall : INativeCall
{
public UnaryResponseClientHandler UnaryResponseClientHandler
public IUnaryResponseClientCallback UnaryResponseClientCallback
{
get;
set;
}
public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
public IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback
{
get;
set;
}
public ReceivedMessageHandler ReceivedMessageHandler
public IReceivedMessageCallback ReceivedMessageCallback
{
get;
set;
}
public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
public IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback
{
get;
set;
}
public SendCompletionHandler SendCompletionHandler
public ISendCompletionCallback SendCompletionCallback
{
get;
set;
}
public SendCompletionHandler SendStatusFromServerHandler
public ISendStatusFromServerCompletionCallback SendStatusFromServerCallback
{
get;
set;
}
public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
public IReceivedCloseOnServerCallback ReceivedCloseOnServerCallback
{
get;
set;
@ -100,9 +100,9 @@ namespace Grpc.Core.Internal.Tests
return "PEER";
}
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
UnaryResponseClientHandler = callback;
UnaryResponseClientCallback = callback;
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
@ -110,55 +110,55 @@ namespace Grpc.Core.Internal.Tests
throw new NotImplementedException();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
UnaryResponseClientHandler = callback;
UnaryResponseClientCallback = callback;
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
ReceivedStatusOnClientHandler = callback;
ReceivedStatusOnClientCallback = callback;
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
ReceivedStatusOnClientHandler = callback;
ReceivedStatusOnClientCallback = callback;
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
public void StartReceiveMessage(IReceivedMessageCallback callback)
{
ReceivedMessageHandler = callback;
ReceivedMessageCallback = callback;
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
public void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback)
{
ReceivedResponseHeadersHandler = callback;
ReceivedResponseHeadersCallback = callback;
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
public void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray)
{
SendCompletionHandler = callback;
SendCompletionCallback = callback;
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
public void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
SendCompletionCallback = callback;
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
public void StartSendCloseFromClient(ISendCompletionCallback callback)
{
SendCompletionHandler = callback;
SendCompletionCallback = callback;
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalPayload, WriteFlags writeFlags)
{
SendStatusFromServerHandler = callback;
SendStatusFromServerCallback = callback;
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
public void StartServerSide(IReceivedCloseOnServerCallback callback)
{
ReceivedCloseOnServerHandler = callback;
ReceivedCloseOnServerCallback = callback;
}
public void Dispose()

@ -27,7 +27,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Manages client side native call lifecycle.
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>, IUnaryResponseClientCallback, IReceivedStatusOnClientCallback, IReceivedResponseHeadersCallback
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
@ -138,7 +138,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartUnary(HandleUnaryResponse, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
}
return unaryResponseTcs.Task;
}
@ -162,7 +162,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray, details.Options.Flags);
call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags);
}
return unaryResponseTcs.Task;
@ -188,9 +188,9 @@ namespace Grpc.Core.Internal
streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartServerStreaming(HandleFinished, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
}
call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
}
}
@ -210,9 +210,9 @@ namespace Grpc.Core.Internal
streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray, details.Options.Flags);
call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags);
}
call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
}
}
@ -256,7 +256,7 @@ namespace Grpc.Core.Internal
halfcloseRequested = true;
return TaskUtils.CompletedTask;
}
call.StartSendCloseFromClient(HandleSendFinished);
call.StartSendCloseFromClient(SendCompletionCallback);
halfcloseRequested = true;
streamingWriteTcs = new TaskCompletionSource<object>();
@ -516,5 +516,26 @@ namespace Grpc.Core.Internal
streamingResponseCallFinishedTcs.SetResult(null);
}
IUnaryResponseClientCallback UnaryResponseClientCallback => this;
void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
{
HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders);
}
IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this;
void IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus)
{
HandleFinished(success, receivedStatus);
}
IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback => this;
void IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
HandleReceivedResponseHeaders(success, responseHeaders);
}
}
}

@ -35,7 +35,7 @@ namespace Grpc.Core.Internal
/// Base for handling both client side and server side calls.
/// Manages native call lifecycle and provides convenience methods.
/// </summary>
internal abstract class AsyncCallBase<TWrite, TRead>
internal abstract class AsyncCallBase<TWrite, TRead> : IReceivedMessageCallback, ISendCompletionCallback
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
@ -126,7 +126,7 @@ namespace Grpc.Core.Internal
return earlyResult;
}
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent);
initialMetadataSent = true;
streamingWritesCounter++;
@ -154,7 +154,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
GrpcPreconditions.CheckState(!disposed);
call.StartReceiveMessage(HandleReadFinished);
call.StartReceiveMessage(ReceivedMessageCallback);
streamingReadTcs = new TaskCompletionSource<TRead>();
return streamingReadTcs.Task;
}
@ -342,5 +342,19 @@ namespace Grpc.Core.Internal
}
origTcs.SetResult(msg);
}
protected ISendCompletionCallback SendCompletionCallback => this;
void ISendCompletionCallback.OnSendCompletion(bool success)
{
HandleSendFinished(success);
}
IReceivedMessageCallback ReceivedMessageCallback => this;
void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage)
{
HandleReadFinished(success, receivedMessage);
}
}
}

@ -31,7 +31,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Manages server side native call lifecycle.
/// </summary>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>, IReceivedCloseOnServerCallback, ISendStatusFromServerCompletionCallback
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
@ -70,7 +70,7 @@ namespace Grpc.Core.Internal
started = true;
call.StartServerSide(HandleFinishedServerside);
call.StartServerSide(ReceiveCloseOnServerCallback);
return finishedServersideTcs.Task;
}
}
@ -114,7 +114,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
call.StartSendInitialMetadata(SendCompletionCallback, metadataArray);
}
this.initialMetadataSent = true;
@ -140,7 +140,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent,
call.StartSendStatusFromServer(SendStatusFromServerCompletionCallback, status, metadataArray, !initialMetadataSent,
payload, writeFlags);
}
halfcloseRequested = true;
@ -227,5 +227,19 @@ namespace Grpc.Core.Internal
finishedServersideTcs.SetResult(null);
}
IReceivedCloseOnServerCallback ReceiveCloseOnServerCallback => this;
void IReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(bool success, bool cancelled)
{
HandleFinishedServerside(success, cancelled);
}
ISendStatusFromServerCompletionCallback SendStatusFromServerCompletionCallback => this;
void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success)
{
HandleSendStatusFromServerFinished(success);
}
}
}

@ -44,6 +44,8 @@ namespace Grpc.Core.Internal
(success, context, state) => ((IReceivedResponseHeadersCallback)state).OnReceivedResponseHeaders(success, context.GetReceivedInitialMetadata());
static readonly BatchCompletionDelegate CompletionHandler_ISendCompletionCallback =
(success, context, state) => ((ISendCompletionCallback)state).OnSendCompletion(success);
static readonly BatchCompletionDelegate CompletionHandler_ISendStatusFromServerCompletionCallback =
(success, context, state) => ((ISendStatusFromServerCompletionCallback)state).OnSendStatusFromServerCompletion(success);
static readonly BatchCompletionDelegate CompletionHandler_IReceivedCloseOnServerCallback =
(success, context, state) => ((IReceivedCloseOnServerCallback)state).OnReceivedCloseOnServerHandler(success, context.GetReceivedCloseOnServerCancelled());
@ -81,7 +83,7 @@ namespace Grpc.Core.Internal
.CheckOk();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
using (completionQueue.NewScope())
{
@ -131,14 +133,14 @@ namespace Grpc.Core.Internal
}
}
public void StartSendStatusFromServer(ISendCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalPayload, WriteFlags writeFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback);
var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail);
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();

@ -59,6 +59,11 @@ namespace Grpc.Core.Internal
void OnSendCompletion(bool success);
}
internal interface ISendStatusFromServerCompletionCallback
{
void OnSendStatusFromServerCompletion(bool success);
}
internal interface IReceivedCloseOnServerCallback
{
void OnReceivedCloseOnServerHandler(bool success, bool cancelled);
@ -75,28 +80,28 @@ namespace Grpc.Core.Internal
string GetPeer();
void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartReceiveMessage(ReceivedMessageHandler callback);
void StartReceiveMessage(IReceivedMessageCallback callback);
void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback);
void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback);
void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray);
void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray);
void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
void StartSendCloseFromClient(SendCompletionHandler callback);
void StartSendCloseFromClient(ISendCompletionCallback callback);
void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags);
void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags);
void StartServerSide(ReceivedCloseOnServerHandler callback);
void StartServerSide(IReceivedCloseOnServerCallback callback);
}
}

@ -59,14 +59,14 @@ namespace Grpc.Microbenchmarks
var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry);
var call = CreateFakeCall(cq);
var sendCompletionHandler = new SendCompletionHandler((success) => { });
var sendCompletionCallback = new NopSendCompletionCallback();
var payload = new byte[payloadSize];
var writeFlags = default(WriteFlags);
var stopwatch = Stopwatch.StartNew();
for (int i = 0; i < iterations; i++)
{
call.StartSendMessage(sendCompletionHandler, payload, writeFlags, false);
call.StartSendMessage(sendCompletionCallback, payload, writeFlags, false);
var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey);
callback.OnComplete(true);
}
@ -87,5 +87,13 @@ namespace Grpc.Microbenchmarks
}
return call;
}
private class NopSendCompletionCallback : ISendCompletionCallback
{
public void OnSendCompletion(bool success)
{
// NOP
}
}
}
}

Loading…
Cancel
Save