Merge github.com:grpc/grpc into error

reviewable/pr6737/r3
Craig Tiller 9 years ago
commit ccbc810bae
  1. 2
      Makefile
  2. 2
      doc/connectivity-semantics-and-api.md
  3. 20
      src/core/ext/transport/cronet/transport/cronet_transport.c
  4. 8
      src/core/lib/iomgr/tcp_server_posix.c
  5. 24
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  6. 10
      src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
  7. 8
      src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
  8. 20
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  9. 39
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  10. 10
      src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
  11. 13
      src/csharp/Grpc.Core/Channel.cs
  12. 1
      src/csharp/Grpc.Core/Grpc.Core.csproj
  13. 53
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  14. 102
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  15. 81
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  16. 36
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  17. 94
      src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
  18. 26
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  19. 9
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  20. 8
      src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
  21. 16
      src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
  22. 68
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  23. 11
      src/csharp/Grpc.Core/Internal/NativeMethods.cs
  24. 32
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  25. 8
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  26. 27
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  27. 39
      src/csharp/Grpc.Core/Server.cs
  28. 39
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  29. 11
      src/csharp/ext/grpc_csharp_ext.c
  30. 38
      src/node/ext/server.cc
  31. 43
      src/objective-c/CronetFramework.podspec
  32. 55
      src/objective-c/GRPCClient/GRPCCall+Cronet.h
  33. 54
      src/objective-c/GRPCClient/GRPCCall+Cronet.m
  34. 6
      src/objective-c/GRPCClient/private/GRPCChannel.h
  35. 30
      src/objective-c/GRPCClient/private/GRPCChannel.m
  36. 13
      src/objective-c/GRPCClient/private/GRPCHost.m
  37. 16
      src/objective-c/tests/InteropTests.m
  38. 1
      src/objective-c/tests/Podfile
  39. 2
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  40. 12
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  41. 2
      templates/Makefile.template
  42. 4
      tools/jenkins/run_full_performance.sh

@ -1845,7 +1845,7 @@ $(LIBDIR)/$(CONFIG)/pkgconfig/grpc_unsecure.pc:
$(LIBDIR)/$(CONFIG)/pkgconfig/grpc_zookeeper.pc:
$(E) "[MAKE] Generating $@"
$(Q) mkdir -p $(@D)
$(Q) echo -e "$(GRPC_ZOOKEEPER_PC_FILE)" >$@
$(Q) echo "$(GRPC_ZOOKEEPER_PC_FILE)" | tr , '\n' >$@
$(LIBDIR)/$(CONFIG)/pkgconfig/grpc++.pc:
$(E) "[MAKE] Generating $@"

@ -101,7 +101,7 @@ corresponding reasons. Empty cells denote disallowed transitions.
<td>Shutdown triggered by application.</td>
</tr>
<tr>
<th>FATAL_FAILURE</th>
<th>SHUTDOWN</th>
<td></td>
<td></td>
<td></td>

@ -218,8 +218,11 @@ static void on_write_completed(cronet_bidirectional_stream *stream,
static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
if (s->total_read_bytes > 0) {
// Only copy if there is non-zero number of bytes
memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
}
grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
*s->recv_message = (grpc_byte_buffer *)&s->sbs;
}
@ -347,8 +350,17 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) {
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
}
cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
s->remaining_read_bytes);
if (s->remaining_read_bytes > 0) {
cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
s->remaining_read_bytes);
} else {
// Calling the closing callback directly since this is a 0 byte read
// for an empty message.
process_recv_message(s, NULL);
enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
invoke_closing_callback(s);
set_recv_state(s, CRONET_RECV_CLOSED);
}
}
}
break;

@ -130,6 +130,9 @@ struct grpc_tcp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
/* next pollset to assign a channel to */
size_t next_pollset_to_assign;
};
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
@ -148,6 +151,7 @@ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
s->head = NULL;
s->tail = NULL;
s->nports = 0;
s->next_pollset_to_assign = 0;
*server = s;
return GRPC_ERROR_NONE;
}
@ -328,7 +332,9 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
goto error;
}
read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd);
read_notifier_pollset =
sp->server->pollsets[(sp->server->next_pollset_to_assign++) %
sp->server->pollset_count];
/* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) {

@ -235,8 +235,16 @@ namespace Grpc.Core.Tests
await barrier.Task; // make sure the handler has started.
cts.Cancel();
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await call.ResponseAsync;
Assert.Fail();
}
catch (RpcException ex)
{
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
}
[Test]
@ -265,9 +273,15 @@ namespace Grpc.Core.Tests
await handlerStartedBarrier.Task;
cts.Cancel();
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
try
{
await call.ResponseAsync;
Assert.Fail();
}
catch (RpcException ex)
{
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
Assert.AreEqual("SUCCESS", await successTcs.Task);
}

@ -105,7 +105,15 @@ namespace Grpc.Core.Tests
var parentCall = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
await readyToCancelTcs.Task;
cts.Cancel();
Assert.ThrowsAsync(typeof(RpcException), async () => await parentCall);
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await parentCall;
Assert.Fail();
}
catch (RpcException)
{
}
Assert.AreEqual("CHILD_CALL_CANCELLED", await successTcs.Task);
}

@ -32,7 +32,7 @@
#endregion
using System;
using System.Threading;
using System.Linq;
using Grpc.Core;
using NUnit.Framework;
@ -44,7 +44,11 @@ namespace Grpc.Core.Tests
public void InitializeAndShutdownGrpcEnvironment()
{
var env = GrpcEnvironment.AddRef();
Assert.IsNotNull(env.CompletionQueue);
Assert.IsTrue(env.CompletionQueues.Count > 0);
for (int i = 0; i < env.CompletionQueues.Count; i++)
{
Assert.IsNotNull(env.CompletionQueues.ElementAt(i));
}
GrpcEnvironment.Release();
}

@ -53,8 +53,6 @@ namespace Grpc.Core.Internal.Tests
[SetUp]
public void Init()
{
var environment = GrpcEnvironment.AddRef();
// Create a fake server just so we have an instance to refer to.
// The server won't actually be used at all.
server = new Server()
@ -66,7 +64,6 @@ namespace Grpc.Core.Internal.Tests
fakeCall = new FakeNativeCall();
asyncCallServer = new AsyncCallServer<string, string>(
Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer,
environment,
server);
asyncCallServer.InitializeForTesting(fakeCall);
}
@ -75,7 +72,6 @@ namespace Grpc.Core.Internal.Tests
public void Cleanup()
{
server.ShutdownAsync().Wait();
GrpcEnvironment.Release();
}
[Test]
@ -136,7 +132,6 @@ namespace Grpc.Core.Internal.Tests
public void WriteAfterCancelNotificationFails()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
@ -181,6 +176,21 @@ namespace Grpc.Core.Internal.Tests
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void WriteAfterWriteStatusThrowsInvalidOperationException()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1"));
fakeCall.SendStatusFromServerHandler(true);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask)
{
Assert.IsTrue(fakeCall.IsDisposed);

@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@ -82,7 +81,7 @@ namespace Grpc.Core.Internal.Tests
Assert.ThrowsAsync(typeof(InvalidOperationException),
async () => await asyncCall.ReadMessageAsync());
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
() => asyncCall.SendMessageAsync("abc", new WriteFlags()));
}
[Test]
@ -103,7 +102,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@ -148,7 +147,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@ -193,7 +192,7 @@ namespace Grpc.Core.Internal.Tests
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
@ -211,7 +210,9 @@ namespace Grpc.Core.Internal.Tests
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
@ -223,11 +224,13 @@ namespace Grpc.Core.Internal.Tests
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
}
@ -267,7 +270,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -275,11 +278,12 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Cancelled),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
@ -290,7 +294,7 @@ namespace Grpc.Core.Internal.Tests
{
asyncCall.StartServerStreamingCall("request1");
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
() => asyncCall.SendMessageAsync("abc", new WriteFlags()));
}
[Test]
@ -390,12 +394,13 @@ namespace Grpc.Core.Internal.Tests
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
public void DuplexStreaming_CompleteAfterReceivingStatusFails()
public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -411,7 +416,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -419,7 +424,9 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);

@ -134,7 +134,15 @@ namespace Grpc.Core.Tests
{
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
Assert.ThrowsAsync<IOException>(async () => await requestStream.MoveNext());
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await requestStream.MoveNext();
Assert.Fail();
}
catch (IOException)
{
}
return "RESPONSE";
});

@ -31,7 +31,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -56,6 +55,7 @@ namespace Grpc.Core
readonly string target;
readonly GrpcEnvironment environment;
readonly CompletionQueueSafeHandle completionQueue;
readonly ChannelSafeHandle handle;
readonly Dictionary<string, ChannelOption> options;
@ -75,6 +75,7 @@ namespace Grpc.Core
EnsureUserAgentChannelOption(this.options);
this.environment = GrpcEnvironment.AddRef();
this.completionQueue = this.environment.PickCompletionQueue();
using (var nativeCredentials = credentials.ToNativeCredentials())
using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values))
{
@ -135,7 +136,7 @@ namespace Grpc.Core
tcs.SetCanceled();
}
});
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler);
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler);
return tcs.Task;
}
@ -231,6 +232,14 @@ namespace Grpc.Core
}
}
internal CompletionQueueSafeHandle CompletionQueue
{
get
{
return this.completionQueue;
}
}
internal void AddCallReference(object call)
{
activeCallCounter.Increment();

@ -86,7 +86,6 @@
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="ChannelCredentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
<Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
<Compile Include="Internal\AsyncCall.cs" />

@ -32,8 +32,9 @@
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@ -51,12 +52,13 @@ namespace Grpc.Core
static GrpcEnvironment instance;
static int refCount;
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
static ILogger logger = new ConsoleLogger();
readonly GrpcThreadPool threadPool;
readonly CompletionRegistry completionRegistry;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
bool isClosed;
/// <summary>
@ -140,37 +142,52 @@ namespace Grpc.Core
}
}
/// <summary>
/// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events.
/// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
/// Most users should rely on the default value provided by gRPC library.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.
/// </summary>
public static void SetCompletionQueueCount(int completionQueueCount)
{
lock (staticLock)
{
GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
customCompletionQueueCount = completionQueueCount;
}
}
/// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault());
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
threadPool.Start();
}
/// <summary>
/// Gets the completion registry used by this gRPC environment.
/// Gets the completion queues used by this gRPC environment.
/// </summary>
internal CompletionRegistry CompletionRegistry
internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
return this.completionRegistry;
return this.threadPool.CompletionQueues;
}
}
/// <summary>
/// Gets the completion queue used by this gRPC environment.
/// Picks a completion queue in a round-robin fashion.
/// Shouldn't be invoked on a per-call basis (used at per-channel basis).
/// </summary>
internal CompletionQueueSafeHandle CompletionQueue
internal CompletionQueueSafeHandle PickCompletionQueue()
{
get
{
return this.threadPool.CompletionQueue;
}
var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
return this.threadPool.CompletionQueues.ElementAt(cqIndex);
}
/// <summary>
@ -230,5 +247,15 @@ namespace Grpc.Core
// more work, but seems to work reasonably well for a start.
return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
}
private int GetCompletionQueueCountOrDefault()
{
if (customCompletionQueueCount.HasValue)
{
return customCompletionQueueCount.Value;
}
// by default, create a completion queue for each thread
return GetThreadPoolSizeOrDefault();
}
}
}

@ -32,12 +32,7 @@
#endregion
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
@ -57,9 +52,11 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
// TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
// Response headers set here once received.
TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
@ -67,7 +64,7 @@ namespace Grpc.Core.Internal
ClientSideStatus? finishedStatus;
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
@ -144,7 +141,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@ -171,7 +168,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
readingDone = true;
@ -195,7 +192,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
@ -220,7 +217,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
@ -232,11 +229,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
{
StartSendMessageInternal(msg, writeFlags, completionDelegate);
return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@ -250,29 +246,32 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends halfclose, indicating client is done with streaming requests.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
public Task SendCloseFromClientAsync()
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed(allowFinished: true);
GrpcPreconditions.CheckState(started);
if (!disposed && !finished)
var earlyResult = CheckSendPreconditionsClientSide();
if (earlyResult != null)
{
call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
return earlyResult;
}
else
if (disposed || finished)
{
// In case the call has already been finished by the serverside,
// the halfclose has already been done implicitly, so we only
// emit the notification for the completion delegate.
Task.Run(() => HandleSendCloseFromClientFinished(true));
// the halfclose has already been done implicitly, so just return
// completed task here.
halfcloseRequested = true;
return Task.FromResult<object>(null);
}
call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
streamingWriteTcs = new TaskCompletionSource<object>();
return streamingWriteTcs.Task;
}
}
@ -342,6 +341,45 @@ namespace Grpc.Core.Internal
get { return true; }
}
protected override Task CheckSendAllowedOrEarlyResult()
{
var earlyResult = CheckSendPreconditionsClientSide();
if (earlyResult != null)
{
return earlyResult;
}
if (finishedStatus.HasValue)
{
// throwing RpcException if we already received status on client
// side makes the most sense.
// Note that this throws even for StatusCode.OK.
// Writing after the call has finished is not a programming error because server can close
// the call anytime, so don't throw directly, but let the write task finish with an error.
var tcs = new TaskCompletionSource<object>();
tcs.SetException(new RpcException(finishedStatus.Value.Status));
return tcs.Task;
}
return null;
}
private Task CheckSendPreconditionsClientSide()
{
GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
if (cancelRequested)
{
// Return a cancelled task.
var tcs = new TaskCompletionSource<object>();
tcs.SetCanceled();
return tcs.Task;
}
return null;
}
private void Initialize(CompletionQueueSafeHandle cq)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
@ -368,7 +406,7 @@ namespace Grpc.Core.Internal
var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{
var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result;
@ -400,6 +438,7 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
// TODO(jtattermusch): handle success==false
responseHeadersTcs.SetResult(responseHeaders);
}
@ -443,19 +482,6 @@ namespace Grpc.Core.Internal
}
}
protected override void CheckSendingAllowed(bool allowFinished)
{
base.CheckSendingAllowed(true);
// throwing RpcException if we already received status on client
// side makes the most sense.
// Note that this throws even for StatusCode.OK.
if (!allowFinished && finishedStatus.HasValue)
{
throw new RpcException(finishedStatus.Value.Status);
}
}
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>

@ -58,7 +58,6 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
protected readonly GrpcEnvironment environment;
protected readonly object myLock = new object();
protected INativeCall call;
@ -67,8 +66,8 @@ namespace Grpc.Core.Internal
protected bool started;
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
@ -78,11 +77,10 @@ namespace Grpc.Core.Internal
protected bool initialMetadataSent;
protected long streamingWritesCounter; // Number of streaming send operations started so far.
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = GrpcPreconditions.CheckNotNull(serializer);
this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
this.environment = GrpcPreconditions.CheckNotNull(environment);
}
/// <summary>
@ -128,28 +126,31 @@ namespace Grpc.Core.Internal
/// <summary>
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)
{
byte[] payload = UnsafeSerialize(msg);
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed(allowFinished: false);
GrpcPreconditions.CheckState(started);
var earlyResult = CheckSendAllowedOrEarlyResult();
if (earlyResult != null)
{
return earlyResult;
}
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
streamingWritesCounter++;
streamingWriteTcs = new TaskCompletionSource<object>();
return streamingWriteTcs.Task;
}
}
/// <summary>
/// Initiates reading a message. Only one read operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
protected Task<TRead> ReadMessageInternalAsync()
{
@ -159,7 +160,7 @@ namespace Grpc.Core.Internal
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
// and maintain its state.
// and maintains its state.
GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
return streamingReadTcs.Task;
}
@ -183,7 +184,7 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
@ -213,24 +214,11 @@ namespace Grpc.Core.Internal
{
}
protected virtual void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();
GrpcPreconditions.CheckState(!disposed || allowFinished);
GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished.");
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
protected void CheckNotCancelled()
{
if (cancelRequested)
{
throw new OperationCanceledException("Remote call has been cancelled.");
}
}
/// <summary>
/// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
/// logic by directly returning the write operation result task. Normally, null is returned.
/// </summary>
protected abstract Task CheckSendAllowedOrEarlyResult();
protected byte[] UnsafeSerialize(TWrite msg)
{
@ -259,39 +247,27 @@ namespace Grpc.Core.Internal
}
}
protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error)
{
try
{
completionDelegate(value, error);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking completion delegate.");
}
}
/// <summary>
/// Handles send completion.
/// </summary>
protected void HandleSendFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
origTcs.SetException(new InvalidOperationException("Send failed"));
}
else
{
FireCompletion(origCompletionDelegate, null, null);
origTcs.SetResult(null);
}
}
@ -300,22 +276,23 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendCloseFromClientFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed."));
// TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs).
origTcs.SetException(new InvalidOperationException("Sending close from client has failed."));
}
else
{
FireCompletion(origCompletionDelegate, null, null);
origTcs.SetResult(null);
}
}

@ -51,14 +51,14 @@ namespace Grpc.Core.Internal
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly Server server;
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment)
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer)
{
this.server = GrpcPreconditions.CheckNotNull(server);
}
public void Initialize(CallSafeHandle call)
public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
{
call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);
call.Initialize(completionQueue);
server.AddCallReference(this);
InitializeInternal(call);
@ -91,11 +91,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
{
StartSendMessageInternal(msg, writeFlags, completionDelegate);
return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@ -110,20 +109,22 @@ namespace Grpc.Core.Internal
/// Initiates sending a initial metadata.
/// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
/// to make things simpler.
/// completionDelegate is invoked upon completion.
/// </summary>
public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
public Task SendInitialMetadataAsync(Metadata headers)
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(headers, "metadata");
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
CheckSendingAllowed(allowFinished: false);
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
var earlyResult = CheckSendAllowedOrEarlyResult();
if (earlyResult != null)
{
return earlyResult;
}
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
@ -131,7 +132,8 @@ namespace Grpc.Core.Internal
}
this.initialMetadataSent = true;
sendCompletionDelegate = completionDelegate;
streamingWriteTcs = new TaskCompletionSource<object>();
return streamingWriteTcs.Task;
}
}
@ -196,6 +198,16 @@ namespace Grpc.Core.Internal
server.RemoveCallReference(this);
}
protected override Task CheckSendAllowedOrEarlyResult()
{
GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
GrpcPreconditions.CheckState(!finished, "Already finished.");
GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
GrpcPreconditions.CheckState(!disposed);
return null;
}
/// <summary>
/// Handles the server side close completion.
/// </summary>

@ -1,94 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// If error != null, there's been an error or operation has been cancelled.
/// </summary>
internal delegate void AsyncCompletionDelegate<T>(T result, Exception error);
/// <summary>
/// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
/// </summary>
internal class AsyncCompletionTaskSource<T>
{
readonly TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
readonly AsyncCompletionDelegate<T> completionDelegate;
public AsyncCompletionTaskSource()
{
completionDelegate = new AsyncCompletionDelegate<T>(HandleCompletion);
}
public Task<T> Task
{
get
{
return tcs.Task;
}
}
public AsyncCompletionDelegate<T> CompletionDelegate
{
get
{
return completionDelegate;
}
}
private void HandleCompletion(T value, Exception error)
{
if (error == null)
{
tcs.SetResult(value);
return;
}
if (error is OperationCanceledException)
{
tcs.SetCanceled();
return;
}
tcs.SetException(error);
}
}
}

@ -47,16 +47,14 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
CompletionQueueSafeHandle completionQueue;
private CallSafeHandle()
{
}
public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue)
public void Initialize(CompletionQueueSafeHandle completionQueue)
{
this.completionRegistry = completionRegistry;
this.completionQueue = completionQueue;
}
@ -70,7 +68,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@ -90,7 +88,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
}
@ -100,7 +98,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
}
@ -110,7 +108,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
}
@ -120,7 +118,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
}
@ -130,7 +128,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}
@ -142,7 +140,7 @@ namespace Grpc.Core.Internal
{
var ctx = BatchContextSafeHandle.Create();
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
@ -153,7 +151,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}
@ -163,7 +161,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}
@ -173,7 +171,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}
@ -183,7 +181,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}

@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
{
using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall"))
{
@ -72,7 +72,7 @@ namespace Grpc.Core.Internal
{
result.SetCredentials(credentials);
}
result.Initialize(registry, cq);
result.Initialize(cq);
return result;
}
}
@ -82,11 +82,10 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
}
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}

@ -50,16 +50,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
return call.SendMessageAsync(message, GetWriteFlags());
}
public Task CompleteAsync()
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
return taskSource.Task;
return call.SendCloseFromClientAsync();
}
public WriteOptions WriteOptions

@ -45,6 +45,7 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
AtomicCounter shutdownRefcount = new AtomicCounter(1);
CompletionRegistry completionRegistry;
private CompletionQueueSafeHandle()
{
@ -53,7 +54,13 @@ namespace Grpc.Core.Internal
public static CompletionQueueSafeHandle Create()
{
return Native.grpcsharp_completion_queue_create();
}
public static CompletionQueueSafeHandle Create(CompletionRegistry completionRegistry)
{
var cq = Native.grpcsharp_completion_queue_create();
cq.completionRegistry = completionRegistry;
return cq;
}
public CompletionQueueEvent Next()
@ -83,6 +90,15 @@ namespace Grpc.Core.Internal
DecrementShutdownRefcount();
}
/// <summary>
/// Completion registry associated with this completion queue.
/// Doesn't need to be set if only using Pluck() operations.
/// </summary>
public CompletionRegistry CompletionRegistry
{
get { return completionRegistry; }
}
protected override bool ReleaseHandle()
{
Native.grpcsharp_completion_queue_destroy(handle);

@ -33,15 +33,15 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Pool of threads polling on the same completion queue.
/// Pool of threads polling on a set of completions queues.
/// </summary>
internal class GrpcThreadPool
{
@ -51,25 +51,31 @@ namespace Grpc.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
readonly int completionQueueCount;
CompletionQueueSafeHandle cq;
IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
/// <summary>
/// Creates a thread pool threads polling on a set of completions queues.
/// </summary>
/// <param name="environment">Environment.</param>
/// <param name="poolSize">Pool size.</param>
/// <param name="completionQueueCount">Completion queue count.</param>
public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
{
this.environment = environment;
this.poolSize = poolSize;
this.completionQueueCount = completionQueueCount;
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used.");
}
public void Start()
{
lock (myLock)
{
if (cq != null)
{
throw new InvalidOperationException("Already started.");
}
cq = CompletionQueueSafeHandle.Create();
GrpcPreconditions.CheckState(completionQueues == null, "Already started.");
completionQueues = CreateCompletionQueueList(environment, completionQueueCount);
for (int i = 0; i < poolSize; i++)
{
@ -82,37 +88,48 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
cq.Shutdown();
foreach (var cq in completionQueues)
{
cq.Shutdown();
}
foreach (var thread in threads)
{
thread.Join();
}
cq.Dispose();
foreach (var cq in completionQueues)
{
cq.Dispose();
}
}
}
internal CompletionQueueSafeHandle CompletionQueue
internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
return cq;
return completionQueues;
}
}
private Thread CreateAndStartThread(int i)
private Thread CreateAndStartThread(int threadIndex)
{
var thread = new Thread(new ThreadStart(RunHandlerLoop));
var cqIndex = threadIndex % completionQueues.Count;
var cq = completionQueues.ElementAt(cqIndex);
var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
thread.IsBackground = false;
thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();
thread.Name = "grpc " + i;
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
private void RunHandlerLoop()
private void RunHandlerLoop(CompletionQueueSafeHandle cq)
{
CompletionQueueEvent ev;
do
@ -124,7 +141,7 @@ namespace Grpc.Core.Internal
IntPtr tag = ev.tag;
try
{
var callback = environment.CompletionRegistry.Extract(tag);
var callback = cq.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
@ -135,5 +152,16 @@ namespace Grpc.Core.Internal
}
while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
}
private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
{
var list = new List<CompletionQueueSafeHandle>();
for (int i = 0; i < completionQueueCount; i++)
{
var completionRegistry = new CompletionRegistry(environment);
list.Add(CompletionQueueSafeHandle.Create(completionRegistry));
}
return list.AsReadOnly();
}
}
}

@ -137,6 +137,7 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release;
public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create;
public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue;
public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port;
public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port;
public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start;
@ -244,6 +245,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = GetMethodDelegate<Delegates.grpcsharp_server_credentials_release_delegate>(library);
this.grpcsharp_server_create = GetMethodDelegate<Delegates.grpcsharp_server_create_delegate>(library);
this.grpcsharp_server_register_completion_queue = GetMethodDelegate<Delegates.grpcsharp_server_register_completion_queue_delegate>(library);
this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_insecure_http2_port_delegate>(library);
this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_secure_http2_port_delegate>(library);
this.grpcsharp_server_start = GetMethodDelegate<Delegates.grpcsharp_server_start_delegate>(library);
@ -348,6 +350,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = PInvokeMethods.grpcsharp_server_credentials_release;
this.grpcsharp_server_create = PInvokeMethods.grpcsharp_server_create;
this.grpcsharp_server_register_completion_queue = PInvokeMethods.grpcsharp_server_register_completion_queue;
this.grpcsharp_server_add_insecure_http2_port = PInvokeMethods.grpcsharp_server_add_insecure_http2_port;
this.grpcsharp_server_add_secure_http2_port = PInvokeMethods.grpcsharp_server_add_secure_http2_port;
this.grpcsharp_server_start = PInvokeMethods.grpcsharp_server_start;
@ -493,7 +496,8 @@ namespace Grpc.Core.Internal
public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials);
public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args);
public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq);
public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
@ -773,7 +777,10 @@ namespace Grpc.Core.Internal
// ServerSafeHandle
[DllImport("grpc_csharp_ext.dll")]
public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args);
[DllImport("grpc_csharp_ext.dll")]
public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr);

@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@ -62,14 +62,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -121,14 +121,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -179,14 +179,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -237,14 +237,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -281,13 +281,13 @@ namespace Grpc.Core.Internal
{
public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload, environment, newRpc.Server);
(payload) => payload, (payload) => payload, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);

@ -52,16 +52,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
return call.SendMessageAsync(message, GetWriteFlags());
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
return taskSource.Task;
return call.SendInitialMetadataAsync(responseHeaders);
}
public WriteOptions WriteOptions

@ -31,12 +31,6 @@
#endregion
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
@ -50,12 +44,17 @@ namespace Grpc.Core.Internal
{
}
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return Native.grpcsharp_server_create(cq, args);
return Native.grpcsharp_server_create(args);
}
public void RegisterCompletionQueue(CompletionQueueSafeHandle cq)
{
Native.grpcsharp_server_register_completion_queue(this, cq);
}
public int AddInsecurePort(string addr)
@ -73,18 +72,18 @@ namespace Grpc.Core.Internal
Native.grpcsharp_server_start(this);
}
public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
}
protected override bool ReleaseHandle()

@ -34,8 +34,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
@ -48,7 +47,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
const int InitialAllowRpcTokenCount = 10;
const int InitialAllowRpcTokenCountPerCq = 10;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@ -80,7 +79,12 @@ namespace Grpc.Core
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
this.handle = ServerSafeHandle.NewServer(channelArgs);
}
foreach (var cq in environment.CompletionQueues)
{
this.handle.RegisterCompletionQueue(cq);
}
}
@ -133,9 +137,12 @@ namespace Grpc.Core
// Starting with more than one AllowOneRpc tokens can significantly increase
// unary RPC throughput.
for (int i = 0; i < InitialAllowRpcTokenCount; i++)
for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
{
AllowOneRpc();
foreach (var cq in environment.CompletionQueues)
{
AllowOneRpc(cq);
}
}
}
}
@ -154,7 +161,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(HandleServerShutdown, environment);
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
@ -174,7 +182,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(HandleServerShutdown, environment);
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
handle.CancelAllCalls();
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
@ -244,11 +253,11 @@ namespace Grpc.Core
/// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
private void AllowOneRpc()
private void AllowOneRpc(CompletionQueueSafeHandle cq)
{
if (!shutdownRequested)
{
handle.RequestCall(HandleNewServerRpc, environment);
handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
}
}
@ -265,7 +274,7 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
private async Task HandleCallAsync(ServerRpcNew newRpc)
private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
try
{
@ -274,7 +283,7 @@ namespace Grpc.Core
{
callHandler = NoSuchMethodCallHandler.Instance;
}
await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false);
await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
}
catch (Exception e)
{
@ -285,9 +294,9 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{
Task.Run(() => AllowOneRpc());
Task.Run(() => AllowOneRpc(cq));
if (success)
{
@ -296,7 +305,7 @@ namespace Grpc.Core
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
HandleCallAsync(newRpc); // we don't need to await.
HandleCallAsync(newRpc, cq); // we don't need to await.
}
}
}

@ -471,8 +471,16 @@ namespace Grpc.IntegrationTesting
cts.Cancel();
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException ex)
{
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
}
Console.WriteLine("Passed!");
}
@ -497,9 +505,16 @@ namespace Grpc.IntegrationTesting
// Deadline was reached before write has started. Eat the exception and continue.
}
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
try
{
await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException ex)
{
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
}
Console.WriteLine("Passed!");
}
@ -577,9 +592,17 @@ namespace Grpc.IntegrationTesting
await call.RequestStream.WriteAsync(request);
await call.RequestStream.CompleteAsync();
var e = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.ToListAsync());
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
Assert.AreEqual(echoStatus.Message, e.Status.Detail);
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await call.ResponseStream.ToListAsync();
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
Assert.AreEqual(echoStatus.Message, e.Status.Detail);
}
}
Console.WriteLine("Passed!");

@ -806,11 +806,14 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_set_credentials(
/* Server */
GPR_EXPORT grpc_server *GPR_CALLTYPE
grpcsharp_server_create(grpc_completion_queue *cq,
const grpc_channel_args *args) {
grpc_server *server = grpc_server_create(args, NULL);
grpcsharp_server_create(const grpc_channel_args *args) {
return grpc_server_create(args, NULL);
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq) {
grpc_server_register_completion_queue(server, cq, NULL);
return server;
}
GPR_EXPORT int32_t GPR_CALLTYPE

@ -35,15 +35,15 @@
#include "server.h"
#include <node.h>
#include <nan.h>
#include <node.h>
#include <vector>
#include "call.h"
#include "completion_queue_async_worker.h"
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "grpc/support/log.h"
#include "call.h"
#include "completion_queue_async_worker.h"
#include "server_credentials.h"
#include "timeval.h"
@ -100,8 +100,8 @@ class NewCallOp : public Op {
Nan::Set(obj, Nan::New("host").ToLocalChecked(),
Nan::New(details.host).ToLocalChecked());
Nan::Set(obj, Nan::New("deadline").ToLocalChecked(),
Nan::New<Date>(
TimespecToMilliseconds(details.deadline)).ToLocalChecked());
Nan::New<Date>(TimespecToMilliseconds(details.deadline))
.ToLocalChecked());
Nan::Set(obj, Nan::New("metadata").ToLocalChecked(),
ParseMetadata(&request_metadata));
return scope.Escape(obj);
@ -117,14 +117,13 @@ class NewCallOp : public Op {
grpc_metadata_array request_metadata;
protected:
std::string GetTypeString() const {
return "new_call";
}
std::string GetTypeString() const { return "new_call"; }
};
Server::Server(grpc_server *server) : wrapped_server(server) {
shutdown_queue = grpc_completion_queue_create(NULL);
grpc_server_register_completion_queue(server, shutdown_queue, NULL);
grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
NULL);
}
Server::~Server() {
@ -156,8 +155,7 @@ bool Server::HasInstance(Local<Value> val) {
}
void Server::ShutdownServer() {
grpc_server_shutdown_and_notify(this->wrapped_server,
this->shutdown_queue,
grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue,
NULL);
grpc_server_cancel_all_calls(this->wrapped_server);
grpc_completion_queue_pluck(this->shutdown_queue, NULL,
@ -170,8 +168,8 @@ NAN_METHOD(Server::New) {
if (!info.IsConstructCall()) {
const int argc = 1;
Local<Value> argv[argc] = {info[0]};
MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance(
argc, argv);
MaybeLocal<Object> maybe_instance =
constructor->GetFunction()->NewInstance(argc, argv);
if (maybe_instance.IsEmpty()) {
// There's probably a pending exception
return;
@ -185,8 +183,9 @@ NAN_METHOD(Server::New) {
grpc_channel_args *channel_args;
if (!ParseChannelArgs(info[0], &channel_args)) {
DeallocateChannelArgs(channel_args);
return Nan::ThrowTypeError("Server options must be an object with "
"string keys and integer or string values");
return Nan::ThrowTypeError(
"Server options must be an object with "
"string keys and integer or string values");
}
wrapped_server = grpc_server_create(channel_args, NULL);
DeallocateChannelArgs(channel_args);
@ -218,8 +217,7 @@ NAN_METHOD(Server::RequestCall) {
NAN_METHOD(Server::AddHttp2Port) {
if (!HasInstance(info.This())) {
return Nan::ThrowTypeError(
"addHttp2Port can only be called on a Server");
return Nan::ThrowTypeError("addHttp2Port can only be called on a Server");
}
if (!info[0]->IsString()) {
return Nan::ThrowTypeError(
@ -239,8 +237,7 @@ NAN_METHOD(Server::AddHttp2Port) {
*Utf8String(info[0]));
} else {
port = grpc_server_add_secure_http2_port(server->wrapped_server,
*Utf8String(info[0]),
creds);
*Utf8String(info[0]), creds);
}
info.GetReturnValue().Set(Nan::New<Number>(port));
}
@ -262,8 +259,7 @@ NAN_METHOD(Server::TryShutdown) {
Server *server = ObjectWrap::Unwrap<Server>(info.This());
unique_ptr<OpVec> ops(new OpVec());
grpc_server_shutdown_and_notify(
server->wrapped_server,
CompletionQueueAsyncWorker::GetQueue(),
server->wrapped_server, CompletionQueueAsyncWorker::GetQueue(),
new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr)));
CompletionQueueAsyncWorker::Next();

@ -0,0 +1,43 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Pod::Spec.new do |s|
s.name = "CronetFramework"
s.version = "0.0.2"
s.summary = "Cronet, precompiled and used as a framework."
s.homepage = "http://chromium.org"
s.license = { :type => 'BSD' }
s.vendored_framework = "Cronet.framework"
s.author = "The Chromium Authors"
s.ios.deployment_target = "8.0"
s.source = { :http => 'https://storage.googleapis.com/grpc-precompiled-binaries/cronet/Cronet.framework.zip' }
s.preserve_paths = "Cronet.framework"
s.public_header_files = "Cronet.framework/Headers/**/*{.h}"
end

@ -0,0 +1,55 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import <Cronet/Cronet.h>
#import "GRPCCall.h"
/**
* Methods for using cronet transport.
*/
@interface GRPCCall (Cronet)
/**
* This method should be called before issuing the first RPC. It should be
* called only once. Create an instance of Cronet engine in your app elsewhere
* and pass the instance pointer in the cronet_engine parameter. Once set,
* all subsequent RPCs will use Cronet transport. The method is not thread
* safe.
*/
+(void)useCronetWithEngine:(cronet_engine *)engine;
+(cronet_engine *)cronetEngine;
+(BOOL)isUsingCronet;
@end

@ -0,0 +1,54 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import "GRPCCall+Cronet.h"
static BOOL useCronet = NO;
static cronet_engine *globalCronetEngine;
@implementation GRPCCall (Cronet)
+ (void)useCronetWithEngine:(cronet_engine *)engine {
useCronet = YES;
globalCronetEngine = engine;
}
+ (cronet_engine *)cronetEngine {
return globalCronetEngine;
}
+ (BOOL)isUsingCronet {
return useCronet;
}
@end

@ -55,6 +55,12 @@ struct grpc_channel_credentials;
*/
+ (nullable GRPCChannel *)secureChannelWithHost:(nonnull NSString *)host;
/**
* Creates a secure channel to the specified @c host using Cronet as a transport mechanism.
*/
+ (nullable GRPCChannel *)secureCronetChannelWithHost:(NSString *)host
channelArgs:(NSDictionary *)channelArgs;
/**
* Creates a secure channel to the specified @c host using the specified @c credentials and
* @c channelArgs. Only in tests should @c GRPC_SSL_TARGET_NAME_OVERRIDE_ARG channel arg be set.

@ -34,10 +34,13 @@
#import "GRPCChannel.h"
#include <grpc/grpc_security.h>
#include <grpc/grpc_cronet.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#import <Cronet/Cronet.h>
#import <GRPCClient/GRPCCall+Cronet.h>
#import "GRPCCompletionQueue.h"
void freeChannelArgs(grpc_channel_args *channel_args) {
@ -99,6 +102,22 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) {
grpc_channel_args *_channelArgs;
}
- (instancetype)initWithHost:(NSString *)host
cronetEngine:(cronet_engine *)cronetEngine
channelArgs:(NSDictionary *)channelArgs {
if (!host) {
[NSException raise:NSInvalidArgumentException format:@"host argument missing"];
}
if (self = [super init]) {
_channelArgs = buildChannelArgs(channelArgs);
_host = [host copy];
_unmanagedChannel = grpc_cronet_secure_channel_create(cronetEngine, _host.UTF8String, _channelArgs,
NULL);
}
return self;
}
- (instancetype)initWithHost:(NSString *)host
secure:(BOOL)secure
@ -133,6 +152,17 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) {
freeChannelArgs(_channelArgs);
}
+ (GRPCChannel *)secureCronetChannelWithHost:(NSString *)host
channelArgs:(NSDictionary *)channelArgs {
cronet_engine *engine = [GRPCCall cronetEngine];
if (!engine) {
[NSException raise:NSInvalidArgumentException
format:@"cronet_engine is NULL. Set it first."];
return nil;
}
return [[GRPCChannel alloc] initWithHost:host cronetEngine:engine channelArgs:channelArgs];
}
+ (GRPCChannel *)secureChannelWithHost:(NSString *)host {
return [[GRPCChannel alloc] initWithHost:host secure:YES credentials:NULL channelArgs:NULL];
}

@ -37,6 +37,7 @@
#include <grpc/grpc_security.h>
#import <GRPCClient/GRPCCall.h>
#import <GRPCClient/GRPCCall+ChannelArg.h>
#import <GRPCClient/GRPCCall+Cronet.h>
#import "GRPCChannel.h"
#import "GRPCCompletionQueue.h"
@ -200,15 +201,21 @@ NS_ASSUME_NONNULL_BEGIN
- (GRPCChannel *)newChannel {
NSDictionary *args = [self channelArgs];
BOOL useCronet = [GRPCCall isUsingCronet];
if (_secure) {
GRPCChannel *channel;
@synchronized(self) {
if (_channelCreds == nil) {
[self setTLSPEMRootCerts:nil withPrivateKey:nil withCertChain:nil error:nil];
}
channel = [GRPCChannel secureChannelWithHost:_address
credentials:_channelCreds
channelArgs:args];
if (useCronet) {
channel = [GRPCChannel secureCronetChannelWithHost:_address
channelArgs:args];
} else {
channel = [GRPCChannel secureChannelWithHost:_address
credentials:_channelCreds
channelArgs:args];
}
}
return channel;
} else {

@ -35,7 +35,9 @@
#include <grpc/status.h>
#import <Cronet/Cronet.h>
#import <GRPCClient/GRPCCall+Tests.h>
#import <GRPCClient/GRPCCall+Cronet.h>
#import <ProtoRPC/ProtoRPC.h>
#import <RemoteTest/Empty.pbobjc.h>
#import <RemoteTest/Messages.pbobjc.h>
@ -78,6 +80,8 @@
#pragma mark Tests
static cronet_engine *cronetEngine = NULL;
@implementation InteropTests {
RMTTestService *_service;
}
@ -88,6 +92,15 @@
- (void)setUp {
_service = self.class.host ? [RMTTestService serviceWithHost:self.class.host] : nil;
#ifdef GRPC_COMPILE_WITH_CRONET
if (cronetEngine == NULL) {
// Cronet setup
[Cronet setHttp2Enabled:YES];
[Cronet start];
cronetEngine = [Cronet getGlobalEngine];
[GRPCCall useCronetWithEngine:cronetEngine];
}
#endif
}
- (void)testEmptyUnaryRPC {
@ -245,6 +258,8 @@
[self waitForExpectationsWithTimeout:4 handler:nil];
}
#ifndef GRPC_COMPILE_WITH_CRONET
// TODO(makdharma@): Fix this test
- (void)testEmptyStreamRPC {
XCTAssertNotNil(self.class.host);
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"];
@ -258,6 +273,7 @@
}];
[self waitForExpectationsWithTimeout:2 handler:nil];
}
#endif
- (void)testCancelAfterBeginRPC {
XCTAssertNotNil(self.class.host);

@ -3,6 +3,7 @@ platform :ios, '8.0'
pod 'Protobuf', :path => "../../../third_party/protobuf"
pod 'BoringSSL', :podspec => ".."
pod 'CronetFramework', :podspec => ".."
pod 'gRPC', :path => "../../.."
pod 'RemoteTest', :path => "RemoteTestClient"

@ -336,6 +336,8 @@ cdef extern from "grpc/_cython/loader.h":
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved) nogil
void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil
int grpc_server_add_insecure_http2_port(
grpc_server *server, const char *addr) nogil
void grpc_server_start(grpc_server *server) nogil

@ -81,11 +81,20 @@ cdef class Server:
self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue)
def register_non_listening_completion_queue(
self, CompletionQueue queue not None):
if self.is_started:
raise ValueError("cannot register completion queues after start")
with nogil:
grpc_server_register_non_listening_completion_queue(
self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue)
def start(self):
if self.is_started:
raise ValueError("the server has already started")
self.backup_shutdown_queue = CompletionQueue()
self.register_completion_queue(self.backup_shutdown_queue)
self.register_non_listening_completion_queue(self.backup_shutdown_queue)
self.is_started = True
with nogil:
grpc_server_start(self.c_server)
@ -169,4 +178,3 @@ cdef class Server:
time.sleep(0)
with nogil:
grpc_server_destroy(self.c_server)

@ -1207,7 +1207,7 @@
$(LIBDIR)/$(CONFIG)/pkgconfig/grpc_zookeeper.pc:
$(E) "[MAKE] Generating $@"
$(Q) mkdir -p $(@D)
$(Q) echo -e "$(GRPC_ZOOKEEPER_PC_FILE)" >$@
$(Q) echo "$(GRPC_ZOOKEEPER_PC_FILE)" | tr , '\n' >$@
$(LIBDIR)/$(CONFIG)/pkgconfig/grpc++.pc:
$(E) "[MAKE] Generating $@"

@ -40,7 +40,7 @@ tools/run_tests/run_performance_tests.py \
--netperf \
--category all \
--bq_result_table performance_test.performance_experiment \
--remote_worker_host grpc-performance-server-8core grpc-performance-client-8core \
--remote_worker_host grpc-performance-server-8core grpc-performance-client-8core grpc-performance-client2-8core \
|| EXIT_CODE=1
# scalability with 32cores (and upload to a different BQ table)
@ -49,7 +49,7 @@ tools/run_tests/run_performance_tests.py \
--netperf \
--category scalable \
--bq_result_table performance_test.performance_experiment_32core \
--remote_worker_host grpc-performance-server-32core grpc-performance-client-32core \
--remote_worker_host grpc-performance-server-32core grpc-performance-client-32core grpc-performance-client2-32core \
|| EXIT_CODE=1
exit $EXIT_CODE

Loading…
Cancel
Save