Merge pull request #13459 from jtattermusch/csharp_avoid_delegate_allocs

Avoid unnecessary delegate alloc in CallSafeHandle and AsyncCall*
pull/13462/head
Jan Tattermusch 7 years ago committed by GitHub
commit 39e3325550
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  2. 106
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  3. 58
      src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
  4. 28
      src/csharp/Grpc.Core/Channel.cs
  5. 37
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  6. 20
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  7. 22
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  8. 32
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  9. 61
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  10. 4
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  11. 6
      src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
  12. 57
      src/csharp/Grpc.Core/Internal/INativeCall.cs
  13. 6
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  14. 2
      src/csharp/Grpc.Core/Server.cs
  15. 12
      src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs

@ -64,7 +64,7 @@ namespace Grpc.Core.Internal.Tests
public void CancelNotificationAfterStartDisposes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -76,8 +76,8 @@ namespace Grpc.Core.Internal.Tests
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
@ -89,7 +89,7 @@ namespace Grpc.Core.Internal.Tests
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
// Check that starting a read after cancel notification has been processed is legal.
var moveNextTask = requestStream.MoveNext();
@ -107,10 +107,10 @@ namespace Grpc.Core.Internal.Tests
// if a read completion's success==false, the request stream will silently finish
// and we rely on C core cancelling the call.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(false, null);
fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -120,7 +120,7 @@ namespace Grpc.Core.Internal.Tests
var finishedTask = asyncCallServer.ServerSideCallAsync();
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));
@ -134,10 +134,10 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
Assert.ThrowsAsync(typeof(IOException), async () => await writeTask);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -150,13 +150,13 @@ namespace Grpc.Core.Internal.Tests
var writeTask = responseStream.WriteAsync("request1");
var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
fakeCall.SendCompletionHandler(true);
fakeCall.SendStatusFromServerHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true);
Assert.DoesNotThrowAsync(async () => await writeTask);
Assert.DoesNotThrowAsync(async () => await writeStatusTask);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -170,8 +170,8 @@ namespace Grpc.Core.Internal.Tests
asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1"));
fakeCall.SendStatusFromServerHandler(true);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

@ -73,7 +73,7 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_Success()
{
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -85,7 +85,7 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_NonSuccessStatusCode()
{
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
null,
new Metadata());
@ -97,7 +97,7 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_NullResponsePayload()
{
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
null,
new Metadata());
@ -118,7 +118,7 @@ namespace Grpc.Core.Internal.Tests
public void ClientStreaming_NoRequest_Success()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -130,7 +130,7 @@ namespace Grpc.Core.Internal.Tests
public void ClientStreaming_NoRequest_NonSuccessStatusCode()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
null,
new Metadata());
@ -145,18 +145,18 @@ namespace Grpc.Core.Internal.Tests
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
writeTask.Wait();
var writeTask2 = requestStream.WriteAsync("request2");
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
writeTask2.Wait();
var completeTask = requestStream.CompleteAsync();
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
completeTask.Wait();
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -171,12 +171,12 @@ namespace Grpc.Core.Internal.Tests
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
// The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
@ -195,12 +195,12 @@ namespace Grpc.Core.Internal.Tests
var writeTask = requestStream.WriteAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
@ -215,13 +215,13 @@ namespace Grpc.Core.Internal.Tests
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
// Until the delayed write completion has been triggered,
// we still act as if there was an active write.
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2"));
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
@ -242,7 +242,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -260,7 +260,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -282,9 +282,9 @@ namespace Grpc.Core.Internal.Tests
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -298,7 +298,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@ -319,7 +319,7 @@ namespace Grpc.Core.Internal.Tests
var writeTask = requestStream.WriteAsync("request1");
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Cancelled),
null,
new Metadata());
@ -342,11 +342,11 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata());
Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@ -359,8 +359,8 @@ namespace Grpc.Core.Internal.Tests
var readTask = responseStream.MoveNext();
// try alternative order of completions
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@ -372,8 +372,8 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
}
@ -385,18 +385,18 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask1 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask2.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask3 = responseStream.MoveNext();
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
}
@ -409,12 +409,12 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask1 = requestStream.CompleteAsync();
fakeCall.SendCompletionHandler(true);
fakeCall.SendCompletionCallback.OnSendCompletion(true);
Assert.DoesNotThrowAsync(async () => await writeTask1);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@ -427,8 +427,8 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@ -445,8 +445,8 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@ -461,14 +461,14 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
fakeCall.SendCompletionCallback.OnSendCompletion(false);
// The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
@ -486,9 +486,9 @@ namespace Grpc.Core.Internal.Tests
var writeTask = requestStream.WriteAsync("request1");
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
fakeCall.SendCompletionHandler(false);
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
fakeCall.SendCompletionCallback.OnSendCompletion(false);
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
@ -510,8 +510,8 @@ namespace Grpc.Core.Internal.Tests
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
}
@ -526,13 +526,13 @@ namespace Grpc.Core.Internal.Tests
Assert.IsTrue(fakeCall.IsCancelled);
var readTask1 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}
@ -547,13 +547,13 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}

@ -31,43 +31,43 @@ namespace Grpc.Core.Internal.Tests
/// </summary>
internal class FakeNativeCall : INativeCall
{
public UnaryResponseClientHandler UnaryResponseClientHandler
public IUnaryResponseClientCallback UnaryResponseClientCallback
{
get;
set;
}
public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
public IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback
{
get;
set;
}
public ReceivedMessageHandler ReceivedMessageHandler
public IReceivedMessageCallback ReceivedMessageCallback
{
get;
set;
}
public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
public IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback
{
get;
set;
}
public SendCompletionHandler SendCompletionHandler
public ISendCompletionCallback SendCompletionCallback
{
get;
set;
}
public SendCompletionHandler SendStatusFromServerHandler
public ISendStatusFromServerCompletionCallback SendStatusFromServerCallback
{
get;
set;
}
public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
public IReceivedCloseOnServerCallback ReceivedCloseOnServerCallback
{
get;
set;
@ -100,9 +100,9 @@ namespace Grpc.Core.Internal.Tests
return "PEER";
}
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
UnaryResponseClientHandler = callback;
UnaryResponseClientCallback = callback;
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
@ -110,55 +110,55 @@ namespace Grpc.Core.Internal.Tests
throw new NotImplementedException();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
UnaryResponseClientHandler = callback;
UnaryResponseClientCallback = callback;
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
ReceivedStatusOnClientHandler = callback;
ReceivedStatusOnClientCallback = callback;
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
ReceivedStatusOnClientHandler = callback;
ReceivedStatusOnClientCallback = callback;
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
public void StartReceiveMessage(IReceivedMessageCallback callback)
{
ReceivedMessageHandler = callback;
ReceivedMessageCallback = callback;
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
public void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback)
{
ReceivedResponseHeadersHandler = callback;
ReceivedResponseHeadersCallback = callback;
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
public void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray)
{
SendCompletionHandler = callback;
SendCompletionCallback = callback;
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
public void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
SendCompletionCallback = callback;
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
public void StartSendCloseFromClient(ISendCompletionCallback callback)
{
SendCompletionHandler = callback;
SendCompletionCallback = callback;
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalPayload, WriteFlags writeFlags)
{
SendStatusFromServerHandler = callback;
SendStatusFromServerCallback = callback;
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
public void StartServerSide(IReceivedCloseOnServerCallback callback)
{
ReceivedCloseOnServerHandler = callback;
ReceivedCloseOnServerCallback = callback;
}
public void Dispose()

@ -127,6 +127,20 @@ namespace Grpc.Core
}
}
// cached handler for watch connectivity state
static readonly BatchCompletionDelegate WatchConnectivityStateHandler = (success, ctx, state) =>
{
var tcs = (TaskCompletionSource<object>) state;
if (success)
{
tcs.SetResult(null);
}
else
{
tcs.SetCanceled();
}
};
/// <summary>
/// Returned tasks completes once channel state has become different from
/// given lastObservedState.
@ -138,18 +152,8 @@ namespace Grpc.Core
"Shutdown is a terminal state. No further state changes can occur.");
var tcs = new TaskCompletionSource<object>();
var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture;
var handler = new BatchCompletionDelegate((success, ctx) =>
{
if (success)
{
tcs.SetResult(null);
}
else
{
tcs.SetCanceled();
}
});
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler);
// pass "tcs" as "state" for WatchConnectivityStateHandler.
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs);
return tcs.Task;
}

@ -27,7 +27,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Manages client side native call lifecycle.
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>, IUnaryResponseClientCallback, IReceivedStatusOnClientCallback, IReceivedResponseHeadersCallback
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
@ -138,7 +138,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartUnary(HandleUnaryResponse, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
}
return unaryResponseTcs.Task;
}
@ -162,7 +162,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray, details.Options.Flags);
call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags);
}
return unaryResponseTcs.Task;
@ -188,9 +188,9 @@ namespace Grpc.Core.Internal
streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartServerStreaming(HandleFinished, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
}
call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
}
}
@ -210,9 +210,9 @@ namespace Grpc.Core.Internal
streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray, details.Options.Flags);
call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags);
}
call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
}
}
@ -256,7 +256,7 @@ namespace Grpc.Core.Internal
halfcloseRequested = true;
return TaskUtils.CompletedTask;
}
call.StartSendCloseFromClient(HandleSendFinished);
call.StartSendCloseFromClient(SendCompletionCallback);
halfcloseRequested = true;
streamingWriteTcs = new TaskCompletionSource<object>();
@ -516,5 +516,26 @@ namespace Grpc.Core.Internal
streamingResponseCallFinishedTcs.SetResult(null);
}
IUnaryResponseClientCallback UnaryResponseClientCallback => this;
void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
{
HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders);
}
IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this;
void IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus)
{
HandleFinished(success, receivedStatus);
}
IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback => this;
void IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
HandleReceivedResponseHeaders(success, responseHeaders);
}
}
}

@ -35,7 +35,7 @@ namespace Grpc.Core.Internal
/// Base for handling both client side and server side calls.
/// Manages native call lifecycle and provides convenience methods.
/// </summary>
internal abstract class AsyncCallBase<TWrite, TRead>
internal abstract class AsyncCallBase<TWrite, TRead> : IReceivedMessageCallback, ISendCompletionCallback
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
@ -126,7 +126,7 @@ namespace Grpc.Core.Internal
return earlyResult;
}
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent);
initialMetadataSent = true;
streamingWritesCounter++;
@ -154,7 +154,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
GrpcPreconditions.CheckState(!disposed);
call.StartReceiveMessage(HandleReadFinished);
call.StartReceiveMessage(ReceivedMessageCallback);
streamingReadTcs = new TaskCompletionSource<TRead>();
return streamingReadTcs.Task;
}
@ -342,5 +342,19 @@ namespace Grpc.Core.Internal
}
origTcs.SetResult(msg);
}
protected ISendCompletionCallback SendCompletionCallback => this;
void ISendCompletionCallback.OnSendCompletion(bool success)
{
HandleSendFinished(success);
}
IReceivedMessageCallback ReceivedMessageCallback => this;
void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage)
{
HandleReadFinished(success, receivedMessage);
}
}
}

@ -31,7 +31,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Manages server side native call lifecycle.
/// </summary>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>, IReceivedCloseOnServerCallback, ISendStatusFromServerCompletionCallback
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
@ -70,7 +70,7 @@ namespace Grpc.Core.Internal
started = true;
call.StartServerSide(HandleFinishedServerside);
call.StartServerSide(ReceiveCloseOnServerCallback);
return finishedServersideTcs.Task;
}
}
@ -114,7 +114,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
call.StartSendInitialMetadata(SendCompletionCallback, metadataArray);
}
this.initialMetadataSent = true;
@ -140,7 +140,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent,
call.StartSendStatusFromServer(SendStatusFromServerCompletionCallback, status, metadataArray, !initialMetadataSent,
payload, writeFlags);
}
halfcloseRequested = true;
@ -227,5 +227,19 @@ namespace Grpc.Core.Internal
finishedServersideTcs.SetResult(null);
}
IReceivedCloseOnServerCallback ReceiveCloseOnServerCallback => this;
void IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled)
{
HandleFinishedServerside(success, cancelled);
}
ISendStatusFromServerCompletionCallback SendStatusFromServerCompletionCallback => this;
void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success)
{
HandleSendStatusFromServerFinished(success);
}
}
}

@ -21,6 +21,7 @@ using System.Runtime.InteropServices;
using System.Text;
using Grpc.Core;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
@ -37,6 +38,8 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>();
CompletionCallbackData completionCallbackData;
private BatchContextSafeHandle()
{
}
@ -54,7 +57,12 @@ namespace Grpc.Core.Internal
}
}
public BatchCompletionDelegate CompletionCallback { get; set; }
public void SetCompletionCallback(BatchCompletionDelegate callback, object state)
{
GrpcPreconditions.CheckState(completionCallbackData.Callback == null);
GrpcPreconditions.CheckNotNull(callback, nameof(callback));
completionCallbackData = new CompletionCallbackData(callback, state);
}
// Gets data of recv_initial_metadata completion.
public Metadata GetReceivedInitialMetadata()
@ -62,13 +70,13 @@ namespace Grpc.Core.Internal
IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_recv_initial_metadata(this);
return MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
}
// Gets data of recv_status_on_client completion.
public ClientSideStatus GetReceivedStatusOnClient()
{
UIntPtr detailsLength;
IntPtr detailsPtr = Native.grpcsharp_batch_context_recv_status_on_client_details(this, out detailsLength);
string details = MarshalUtils.PtrToStringUTF8(detailsPtr, (int) detailsLength.ToUInt32());
string details = MarshalUtils.PtrToStringUTF8(detailsPtr, (int)detailsLength.ToUInt32());
var status = new Status(Native.grpcsharp_batch_context_recv_status_on_client_status(this), details);
IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_recv_status_on_client_trailing_metadata(this);
@ -95,7 +103,7 @@ namespace Grpc.Core.Internal
{
return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
protected override bool ReleaseHandle()
{
Native.grpcsharp_batch_context_destroy(handle);
@ -106,7 +114,7 @@ namespace Grpc.Core.Internal
{
try
{
CompletionCallback(success, this);
completionCallbackData.Callback(success, this, completionCallbackData.State);
}
catch (Exception e)
{
@ -114,9 +122,21 @@ namespace Grpc.Core.Internal
}
finally
{
CompletionCallback = null;
completionCallbackData = default(CompletionCallbackData);
Dispose();
}
}
struct CompletionCallbackData
{
public CompletionCallbackData(BatchCompletionDelegate callback, object state)
{
this.Callback = callback;
this.State = state;
}
public BatchCompletionDelegate Callback { get; }
public object State { get; }
}
}
}

@ -32,6 +32,23 @@ namespace Grpc.Core.Internal
public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
static readonly NativeMethods Native = NativeMethods.Get();
// Completion handlers are pre-allocated to avoid unneccessary delegate allocations.
// The "state" field is used to store the actual callback to invoke.
static readonly BatchCompletionDelegate CompletionHandler_IUnaryResponseClientCallback =
(success, context, state) => ((IUnaryResponseClientCallback)state).OnUnaryResponseClient(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata());
static readonly BatchCompletionDelegate CompletionHandler_IReceivedStatusOnClientCallback =
(success, context, state) => ((IReceivedStatusOnClientCallback)state).OnReceivedStatusOnClient(success, context.GetReceivedStatusOnClient());
static readonly BatchCompletionDelegate CompletionHandler_IReceivedMessageCallback =
(success, context, state) => ((IReceivedMessageCallback)state).OnReceivedMessage(success, context.GetReceivedMessage());
static readonly BatchCompletionDelegate CompletionHandler_IReceivedResponseHeadersCallback =
(success, context, state) => ((IReceivedResponseHeadersCallback)state).OnReceivedResponseHeaders(success, context.GetReceivedInitialMetadata());
static readonly BatchCompletionDelegate CompletionHandler_ISendCompletionCallback =
(success, context, state) => ((ISendCompletionCallback)state).OnSendCompletion(success);
static readonly BatchCompletionDelegate CompletionHandler_ISendStatusFromServerCompletionCallback =
(success, context, state) => ((ISendStatusFromServerCompletionCallback)state).OnSendStatusFromServerCompletion(success);
static readonly BatchCompletionDelegate CompletionHandler_IReceivedCloseOnServerCallback =
(success, context, state) => ((IReceivedCloseOnServerCallback)state).OnReceivedCloseOnServer(success, context.GetReceivedCloseOnServerCancelled());
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionQueueSafeHandle completionQueue;
@ -49,12 +66,12 @@ namespace Grpc.Core.Internal
Native.grpcsharp_call_set_credentials(this, credentials).CheckOk();
}
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback);
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags)
.CheckOk();
}
@ -66,106 +83,106 @@ namespace Grpc.Core.Internal
.CheckOk();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback);
Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray, callFlags).CheckOk();
}
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback);
Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags).CheckOk();
}
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback);
Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, callFlags).CheckOk();
}
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
public void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk();
}
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
public void StartSendCloseFromClient(ISendCompletionCallback callback)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalPayload, WriteFlags writeFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback);
var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail);
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
public void StartReceiveMessage(IReceivedMessageCallback callback)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedMessageCallback, callback);
Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
public void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedResponseHeadersCallback, callback);
Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
public void StartServerSide(IReceivedCloseOnServerCallback callback)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedCloseOnServerCallback, callback);
Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
public void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}

@ -64,10 +64,10 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
}
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback, object callbackState)
{
var ctx = BatchContextSafeHandle.Create();
cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback, callbackState);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}

@ -25,7 +25,7 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx, object state);
internal delegate void RequestCallCompletionDelegate(bool success, RequestCallContextSafeHandle ctx);
@ -53,9 +53,9 @@ namespace Grpc.Core.Internal
}
}
public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback, object state)
{
ctx.CompletionCallback = callback;
ctx.SetCompletionCallback(callback, state);
Register(ctx.Handle, ctx);
}

@ -20,18 +20,41 @@ using Grpc.Core;
namespace Grpc.Core.Internal
{
internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders);
internal interface IUnaryResponseClientCallback
{
void OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders);
}
// Received status for streaming response calls.
internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus);
internal interface IReceivedStatusOnClientCallback
{
void OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus);
}
internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage);
internal interface IReceivedMessageCallback
{
void OnReceivedMessage(bool success, byte[] receivedMessage);
}
internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders);
internal interface IReceivedResponseHeadersCallback
{
void OnReceivedResponseHeaders(bool success, Metadata responseHeaders);
}
internal delegate void SendCompletionHandler(bool success);
internal interface ISendCompletionCallback
{
void OnSendCompletion(bool success);
}
internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled);
internal interface ISendStatusFromServerCompletionCallback
{
void OnSendStatusFromServerCompletion(bool success);
}
internal interface IReceivedCloseOnServerCallback
{
void OnReceivedCloseOnServer(bool success, bool cancelled);
}
/// <summary>
/// Abstraction of a native call object.
@ -44,28 +67,28 @@ namespace Grpc.Core.Internal
string GetPeer();
void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartReceiveMessage(ReceivedMessageHandler callback);
void StartReceiveMessage(IReceivedMessageCallback callback);
void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback);
void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback);
void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray);
void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray);
void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
void StartSendCloseFromClient(SendCompletionHandler callback);
void StartSendCloseFromClient(ISendCompletionCallback callback);
void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags);
void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags);
void StartServerSide(ReceivedCloseOnServerHandler callback);
void StartServerSide(IReceivedCloseOnServerCallback callback);
}
}

@ -59,13 +59,15 @@ namespace Grpc.Core.Internal
{
Native.grpcsharp_server_start(this);
}
public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
// TODO(jtattermusch): delegate allocation by caller can be avoided by utilizing the "state" object,
// but server shutdown isn't worth optimizing right now.
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback, null);
Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
}

@ -387,7 +387,7 @@ namespace Grpc.Core
/// <summary>
/// Handles native callback.
/// </summary>
private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx)
private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state)
{
shutdownTcs.SetResult(null);
}

@ -59,14 +59,14 @@ namespace Grpc.Microbenchmarks
var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry);
var call = CreateFakeCall(cq);
var sendCompletionHandler = new SendCompletionHandler((success) => { });
var sendCompletionCallback = new NopSendCompletionCallback();
var payload = new byte[payloadSize];
var writeFlags = default(WriteFlags);
var stopwatch = Stopwatch.StartNew();
for (int i = 0; i < iterations; i++)
{
call.StartSendMessage(sendCompletionHandler, payload, writeFlags, false);
call.StartSendMessage(sendCompletionCallback, payload, writeFlags, false);
var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey);
callback.OnComplete(true);
}
@ -87,5 +87,13 @@ namespace Grpc.Microbenchmarks
}
return call;
}
private class NopSendCompletionCallback : ISendCompletionCallback
{
public void OnSendCompletion(bool success)
{
// NOP
}
}
}
}

Loading…
Cancel
Save