Merge remote-tracking branch 'upstream/master'

pull/2945/head
Hongyu Chen 10 years ago
commit 5ae050dcc3
  1. 6
      src/core/security/google_default_credentials.c
  2. 296
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  3. 128
      src/csharp/Grpc.Core.Tests/CompressionTest.cs
  4. 122
      src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
  5. 4
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  6. 8
      src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
  7. 248
      src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
  8. 136
      src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
  9. 152
      src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
  10. 37
      src/csharp/Grpc.Core/CallOptions.cs
  11. 63
      src/csharp/Grpc.Core/CompressionLevel.cs
  12. 139
      src/csharp/Grpc.Core/ContextPropagationToken.cs
  13. 3
      src/csharp/Grpc.Core/Grpc.Core.csproj
  14. 8
      src/csharp/Grpc.Core/IAsyncStreamWriter.cs
  15. 64
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  16. 10
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  17. 35
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  18. 41
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  19. 6
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  20. 23
      src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
  21. 17
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  22. 31
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  23. 10
      src/csharp/Grpc.Core/Metadata.cs
  24. 56
      src/csharp/Grpc.Core/ServerCallContext.cs
  25. 82
      src/csharp/Grpc.Core/WriteOptions.cs
  26. 24
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  27. 22
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  28. 69
      src/csharp/ext/grpc_csharp_ext.c
  29. 41
      src/objective-c/GRPCClient/GRPCCall.m
  30. 13
      src/objective-c/GRPCClient/private/GRPCSecureChannel.m
  31. 9
      src/objective-c/RxLibrary/GRXBufferedPipe.h
  32. 10
      src/objective-c/RxLibrary/GRXForwardingWriter.h
  33. 6
      src/objective-c/RxLibrary/GRXForwardingWriter.m
  34. 13
      src/objective-c/RxLibrary/GRXImmediateWriter.h
  35. 91
      src/objective-c/RxLibrary/GRXWriter.h
  36. 4
      src/objective-c/tests/GRPCClientTests.m
  37. 3
      src/objective-c/tests/InteropTests.h
  38. 9
      src/objective-c/tests/InteropTests.m
  39. 59
      src/objective-c/tests/InteropTestsLocalCleartext.m
  40. 4
      src/objective-c/tests/InteropTestsLocalSSL.m
  41. 10
      src/objective-c/tests/Tests.xcodeproj/project.pbxproj

@ -84,6 +84,8 @@ static void on_compute_engine_detection_http_response(
gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
static int is_stack_running_on_compute_engine(void) {
compute_engine_detector detector;
grpc_httpcli_request request;
@ -114,12 +116,12 @@ static int is_stack_running_on_compute_engine(void) {
while (!detector.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&detector.pollset, &worker,
gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
grpc_httpcli_context_destroy(&context);
grpc_pollset_destroy(&detector.pollset);
grpc_pollset_shutdown(&detector.pollset, destroy_pollset, &detector.pollset);
return detector.success;
}

@ -46,47 +46,18 @@ namespace Grpc.Core.Tests
public class ClientServerTest
{
const string Host = "127.0.0.1";
const string ServiceName = "tests.Test";
static readonly Method<string, string> EchoMethod = new Method<string, string>(
MethodType.Unary,
ServiceName,
"Echo",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
static readonly Method<string, string> ConcatAndEchoMethod = new Method<string, string>(
MethodType.ClientStreaming,
ServiceName,
"ConcatAndEcho",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
static readonly Method<string, string> NonexistentMethod = new Method<string, string>(
MethodType.Unary,
ServiceName,
"NonexistentMethod",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(EchoMethod, EchoHandler)
.AddMethod(ConcatAndEchoMethod, ConcatAndEchoHandler)
.Build();
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
server = new Server
{
Services = { ServiceDefinition },
Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
};
helper = new MockServiceHelper(Host);
server = helper.GetServer();
server.Start();
channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
channel = helper.GetChannel();
}
[TearDown]
@ -103,86 +74,79 @@ namespace Grpc.Core.Tests
}
[Test]
public void UnaryCall()
public async Task UnaryCall()
{
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(callDetails, "ABC"));
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
return request;
});
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
Assert.AreEqual("ABC", await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public void UnaryCall_ServerHandlerThrows()
{
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
try
helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
Calls.BlockingUnaryCall(callDetails, "THROW");
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
}
throw new Exception("This was thrown on purpose by a test");
});
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unknown, ex2.Status.StatusCode);
}
[Test]
public void UnaryCall_ServerHandlerThrowsRpcException()
{
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
try
{
Calls.BlockingUnaryCall(callDetails, "THROW_UNAUTHENTICATED");
Assert.Fail();
}
catch (RpcException e)
helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode);
}
throw new RpcException(new Status(StatusCode.Unauthenticated, ""));
});
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
}
[Test]
public void UnaryCall_ServerHandlerSetsStatus()
{
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
try
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Calls.BlockingUnaryCall(callDetails, "SET_UNAUTHENTICATED");
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode);
}
}
context.Status = new Status(StatusCode.Unauthenticated, "");
return "";
});
[Test]
public async Task AsyncUnaryCall()
{
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
var result = await Calls.AsyncUnaryCall(callDetails, "ABC");
Assert.AreEqual("ABC", result);
}
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
[Test]
public async Task AsyncUnaryCall_ServerHandlerThrows()
{
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
try
{
await Calls.AsyncUnaryCall(callDetails, "THROW");
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
}
var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
}
[Test]
public async Task ClientStreamingCall()
{
var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallOptions());
var call = Calls.AsyncClientStreamingCall(callDetails);
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
string result = "";
await requestStream.ForEach(async (request) =>
{
result += request;
});
await Task.Delay(100);
return result;
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
await call.RequestStream.WriteAll(new string[] { "A", "B", "C" });
Assert.AreEqual("ABC", await call.ResponseAsync);
}
@ -190,36 +154,47 @@ namespace Grpc.Core.Tests
[Test]
public async Task ClientStreamingCall_CancelAfterBegin()
{
var barrier = new TaskCompletionSource<object>();
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
barrier.SetResult(null);
await requestStream.ToList();
return "";
});
var cts = new CancellationTokenSource();
var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallOptions(cancellationToken: cts.Token));
var call = Calls.AsyncClientStreamingCall(callDetails);
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
// TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
await Task.Delay(1000);
await barrier.Task; // make sure the handler has started.
cts.Cancel();
try
{
await call.ResponseAsync;
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
}
var ex = Assert.Throws<RpcException>(async () => await call.ResponseAsync);
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
[Test]
public void AsyncUnaryCall_EchoMetadata()
public async Task AsyncUnaryCall_EchoMetadata()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
foreach (Metadata.Entry metadataEntry in context.RequestHeaders)
{
if (metadataEntry.Key != "user-agent")
{
context.ResponseTrailers.Add(metadataEntry);
}
}
return "";
});
var headers = new Metadata
{
new Metadata.Entry("ascii-header", "abcdefg"),
new Metadata.Entry("binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff }),
{ "ascii-header", "abcdefg" },
{ "binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff } }
};
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions(headers: headers));
var call = Calls.AsyncUnaryCall(callDetails, "ABC");
Assert.AreEqual("ABC", call.ResponseAsync.Result);
var call = Calls.AsyncUnaryCall(helper.CreateUnaryCall(new CallOptions(headers: headers)), "ABC");
await call;
Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode);
@ -236,15 +211,18 @@ namespace Grpc.Core.Tests
public void UnaryCall_DisposedChannel()
{
channel.Dispose();
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(callDetails, "ABC"));
Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public void UnaryCallPerformance()
{
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
return request;
});
var callDetails = helper.CreateUnaryCall();
BenchmarkUtil.RunBenchmark(100, 100,
() => { Calls.BlockingUnaryCall(callDetails, "ABC"); });
}
@ -252,44 +230,57 @@ namespace Grpc.Core.Tests
[Test]
public void UnknownMethodHandler()
{
var callDetails = new CallInvocationDetails<string, string>(channel, NonexistentMethod, new CallOptions());
try
{
Calls.BlockingUnaryCall(callDetails, "ABC");
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unimplemented, e.Status.StatusCode);
}
var nonexistentMethod = new Method<string, string>(
MethodType.Unary,
MockServiceHelper.ServiceName,
"NonExistentMethod",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
var callDetails = new CallInvocationDetails<string, string>(channel, nonexistentMethod, new CallOptions());
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(callDetails, "abc"));
Assert.AreEqual(StatusCode.Unimplemented, ex.Status.StatusCode);
}
[Test]
public void UserAgentStringPresent()
{
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
string userAgent = Calls.BlockingUnaryCall(callDetails, "RETURN-USER-AGENT");
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value;
});
string userAgent = Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc");
Assert.IsTrue(userAgent.StartsWith("grpc-csharp/"));
}
[Test]
public void PeerInfoPresent()
{
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
string peer = Calls.BlockingUnaryCall(callDetails, "RETURN-PEER");
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
return context.Peer;
});
string peer = Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc");
Assert.IsTrue(peer.Contains(Host));
}
[Test]
public async Task Channel_WaitForStateChangedAsync()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
return request;
});
Assert.Throws(typeof(TaskCanceledException),
async () => await channel.WaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(10)));
var stateChangedTask = channel.WaitForStateChangedAsync(channel.State);
var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
await Calls.AsyncUnaryCall(callDetails, "abc");
await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc");
await stateChangedTask;
Assert.AreEqual(ChannelState.Ready, channel.State);
@ -300,62 +291,9 @@ namespace Grpc.Core.Tests
{
await channel.ConnectAsync();
Assert.AreEqual(ChannelState.Ready, channel.State);
await channel.ConnectAsync(DateTime.UtcNow.AddMilliseconds(1000));
Assert.AreEqual(ChannelState.Ready, channel.State);
}
private static async Task<string> EchoHandler(string request, ServerCallContext context)
{
foreach (Metadata.Entry metadataEntry in context.RequestHeaders)
{
if (metadataEntry.Key != "user-agent")
{
context.ResponseTrailers.Add(metadataEntry);
}
}
if (request == "RETURN-USER-AGENT")
{
return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value;
}
if (request == "RETURN-PEER")
{
return context.Peer;
}
if (request == "THROW")
{
throw new Exception("This was thrown on purpose by a test");
}
if (request == "THROW_UNAUTHENTICATED")
{
throw new RpcException(new Status(StatusCode.Unauthenticated, ""));
}
if (request == "SET_UNAUTHENTICATED")
{
context.Status = new Status(StatusCode.Unauthenticated, "");
}
return request;
}
private static async Task<string> ConcatAndEchoHandler(IAsyncStreamReader<string> requestStream, ServerCallContext context)
{
string result = "";
await requestStream.ForEach(async (request) =>
{
if (request == "THROW")
{
throw new Exception("This was thrown on purpose by a test");
}
result += request;
});
// simulate processing takes some time.
await Task.Delay(250);
return result;
}
}
}

@ -0,0 +1,128 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class CompressionTest
{
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper();
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void WriteOptions_Unary()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
return request;
});
var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress));
Calls.BlockingUnaryCall(helper.CreateUnaryCall(callOptions), "abc");
}
[Test]
public async Task WriteOptions_DuplexStreaming()
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
await requestStream.ToList();
context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await context.WriteResponseHeadersAsync(new Metadata { { "ascii-header", "abcdefg" } });
await responseStream.WriteAsync("X");
responseStream.WriteOptions = null;
await responseStream.WriteAsync("Y");
responseStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await responseStream.WriteAsync("Z");
});
var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress));
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(callOptions));
// check that write options from call options are propagated to request stream.
Assert.IsTrue((call.RequestStream.WriteOptions.Flags & WriteFlags.NoCompress) != 0);
call.RequestStream.WriteOptions = new WriteOptions();
await call.RequestStream.WriteAsync("A");
call.RequestStream.WriteOptions = null;
await call.RequestStream.WriteAsync("B");
call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await call.RequestStream.WriteAsync("C");
await call.RequestStream.CompleteAsync();
await call.ResponseStream.ToList();
}
}
}

@ -0,0 +1,122 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ContextPropagationTest
{
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper();
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public async Task PropagateCancellation()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
// check that we didn't obtain the default cancellation token.
Assert.IsTrue(context.CancellationToken.CanBeCanceled);
return "PASS";
});
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
var propagationToken = context.CreatePropagationToken();
Assert.IsNotNull(propagationToken.ParentCall);
var callOptions = new CallOptions(propagationToken: propagationToken);
return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
});
var cts = new CancellationTokenSource();
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
await call.RequestStream.CompleteAsync();
Assert.AreEqual("PASS", await call);
}
[Test]
public async Task PropagateDeadline()
{
var deadline = DateTime.UtcNow.AddDays(7);
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Assert.IsTrue(context.Deadline < deadline.AddMinutes(1));
Assert.IsTrue(context.Deadline > deadline.AddMinutes(-1));
return "PASS";
});
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
var callOptions = new CallOptions(propagationToken: context.CreatePropagationToken());
return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(deadline: deadline)));
await call.RequestStream.CompleteAsync();
Assert.AreEqual("PASS", await call);
}
}
}

@ -77,6 +77,10 @@
<Compile Include="TimeoutsTest.cs" />
<Compile Include="NUnitVersionTest.cs" />
<Compile Include="ChannelTest.cs" />
<Compile Include="MockServiceHelper.cs" />
<Compile Include="ResponseHeadersTest.cs" />
<Compile Include="CompressionTest.cs" />
<Compile Include="ContextPropagationTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -53,8 +53,8 @@ namespace Grpc.Core.Internal.Tests
{
var metadata = new Metadata
{
new Metadata.Entry("host", "somehost"),
new Metadata.Entry("header2", "header value"),
{ "host", "somehost" },
{ "header2", "header value" },
};
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
@ -65,8 +65,8 @@ namespace Grpc.Core.Internal.Tests
{
var metadata = new Metadata
{
new Metadata.Entry("host", "somehost"),
new Metadata.Entry("header2", "header value"),
{ "host", "somehost" },
{ "header2", "header value" }
};
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);

@ -0,0 +1,248 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
/// <summary>
/// Allows setting up a mock service in the client-server tests easily.
/// </summary>
public class MockServiceHelper
{
public const string ServiceName = "tests.Test";
public static readonly Method<string, string> UnaryMethod = new Method<string, string>(
MethodType.Unary,
ServiceName,
"Unary",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
public static readonly Method<string, string> ClientStreamingMethod = new Method<string, string>(
MethodType.ClientStreaming,
ServiceName,
"ClientStreaming",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
public static readonly Method<string, string> ServerStreamingMethod = new Method<string, string>(
MethodType.ServerStreaming,
ServiceName,
"ServerStreaming",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
public static readonly Method<string, string> DuplexStreamingMethod = new Method<string, string>(
MethodType.DuplexStreaming,
ServiceName,
"DuplexStreaming",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
readonly string host;
readonly ServerServiceDefinition serviceDefinition;
UnaryServerMethod<string, string> unaryHandler;
ClientStreamingServerMethod<string, string> clientStreamingHandler;
ServerStreamingServerMethod<string, string> serverStreamingHandler;
DuplexStreamingServerMethod<string, string> duplexStreamingHandler;
Server server;
Channel channel;
public MockServiceHelper(string host = null)
{
this.host = host ?? "localhost";
serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(UnaryMethod, (request, context) => unaryHandler(request, context))
.AddMethod(ClientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context))
.AddMethod(ServerStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context))
.AddMethod(DuplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context))
.Build();
var defaultStatus = new Status(StatusCode.Unknown, "Default mock implementation. Please provide your own.");
unaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
context.Status = defaultStatus;
return "";
});
clientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
context.Status = defaultStatus;
return "";
});
serverStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
{
context.Status = defaultStatus;
});
duplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
context.Status = defaultStatus;
});
}
/// <summary>
/// Returns the default server for this service and creates one if not yet created.
/// </summary>
public Server GetServer()
{
if (server == null)
{
server = new Server
{
Services = { serviceDefinition },
Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
};
}
return server;
}
/// <summary>
/// Returns the default channel for this service and creates one if not yet created.
/// </summary>
public Channel GetChannel()
{
if (channel == null)
{
channel = new Channel(Host, GetServer().Ports.Single().BoundPort, Credentials.Insecure);
}
return channel;
}
public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = null)
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, UnaryMethod, options);
}
public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = null)
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, ClientStreamingMethod, options);
}
public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = null)
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, ServerStreamingMethod, options);
}
public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = null)
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, DuplexStreamingMethod, options);
}
public string Host
{
get
{
return this.host;
}
}
public ServerServiceDefinition ServiceDefinition
{
get
{
return this.serviceDefinition;
}
}
public UnaryServerMethod<string, string> UnaryHandler
{
get
{
return this.unaryHandler;
}
set
{
unaryHandler = value;
}
}
public ClientStreamingServerMethod<string, string> ClientStreamingHandler
{
get
{
return this.clientStreamingHandler;
}
set
{
clientStreamingHandler = value;
}
}
public ServerStreamingServerMethod<string, string> ServerStreamingHandler
{
get
{
return this.serverStreamingHandler;
}
set
{
serverStreamingHandler = value;
}
}
public DuplexStreamingServerMethod<string, string> DuplexStreamingHandler
{
get
{
return this.duplexStreamingHandler;
}
set
{
duplexStreamingHandler = value;
}
}
}
}

@ -0,0 +1,136 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
/// <summary>
/// Tests for response headers support.
/// </summary>
public class ResponseHeadersTest
{
MockServiceHelper helper;
Server server;
Channel channel;
Metadata headers;
[SetUp]
public void Init()
{
helper = new MockServiceHelper();
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
headers = new Metadata { { "ascii-header", "abcdefg" } };
}
[TearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void WriteResponseHeaders_NullNotAllowed()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Assert.Throws(typeof(NullReferenceException), async () => await context.WriteResponseHeadersAsync(null));
return "PASS";
});
Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), ""));
}
[Test]
public void WriteResponseHeaders_AllowedOnlyOnce()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
await context.WriteResponseHeadersAsync(headers);
try
{
await context.WriteResponseHeadersAsync(headers);
Assert.Fail();
}
catch (InvalidOperationException expected)
{
}
return "PASS";
});
Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), ""));
}
[Test]
public async Task WriteResponseHeaders_NotAllowedAfterWrite()
{
helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
{
await responseStream.WriteAsync("A");
try
{
await context.WriteResponseHeadersAsync(headers);
Assert.Fail();
}
catch (InvalidOperationException expected)
{
}
await responseStream.WriteAsync("B");
});
var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
var responses = await call.ResponseStream.ToList();
CollectionAssert.AreEqual(new[] { "A", "B" }, responses);
}
}
}

@ -48,38 +48,18 @@ namespace Grpc.Core.Tests
/// </summary>
public class TimeoutsTest
{
const string Host = "localhost";
const string ServiceName = "tests.Test";
static readonly Method<string, string> TestMethod = new Method<string, string>(
MethodType.Unary,
ServiceName,
"Test",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(TestMethod, TestMethodHandler)
.Build();
// provides a way how to retrieve an out-of-band result value from server handler
static TaskCompletionSource<string> stringFromServerHandlerTcs;
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
server = new Server
{
Services = { ServiceDefinition },
Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
};
server.Start();
channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
helper = new MockServiceHelper();
stringFromServerHandlerTcs = new TaskCompletionSource<string>();
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
@ -98,115 +78,83 @@ namespace Grpc.Core.Tests
[Test]
public void InfiniteDeadline()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Assert.AreEqual(DateTime.MaxValue, context.Deadline);
return "PASS";
});
// no deadline specified, check server sees infinite deadline
var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions());
Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE"));
Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
// DateTime.MaxValue deadline specified, check server sees infinite deadline
var callDetails2 = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions());
Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails2, "RETURN_DEADLINE"));
Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.MaxValue)), "abc"));
}
[Test]
public void DeadlineTransferredToServer()
{
var remainingTimeClient = TimeSpan.FromDays(7);
var deadline = DateTime.UtcNow + remainingTimeClient;
Thread.Sleep(1000);
var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline));
var serverDeadlineTicksString = Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE");
var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc);
// A fairly relaxed check that the deadline set by client and deadline seen by server
// are in agreement. C core takes care of the work with transferring deadline over the wire,
// so we don't need an exact check here.
Assert.IsTrue(Math.Abs((deadline - serverDeadline).TotalMilliseconds) < 5000);
var clientDeadline = DateTime.UtcNow + TimeSpan.FromDays(7);
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
// A fairly relaxed check that the deadline set by client and deadline seen by server
// are in agreement. C core takes care of the work with transferring deadline over the wire,
// so we don't need an exact check here.
Assert.IsTrue(Math.Abs((clientDeadline - context.Deadline).TotalMilliseconds) < 5000);
return "PASS";
});
Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: clientDeadline)), "abc");
}
[Test]
public void DeadlineInThePast()
{
var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: DateTime.MinValue));
try
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Calls.BlockingUnaryCall(callDetails, "TIMEOUT");
Assert.Fail();
}
catch (RpcException e)
{
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
await Task.Delay(60000);
return "FAIL";
});
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.MinValue)), "abc"));
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
[Test]
public void DeadlineExceededStatusOnTimeout()
{
var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline));
try
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Calls.BlockingUnaryCall(callDetails, "TIMEOUT");
Assert.Fail();
}
catch (RpcException e)
{
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
await Task.Delay(60000);
return "FAIL";
});
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)))), "abc"));
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
[Test]
public void ServerReceivesCancellationOnTimeout()
public async Task ServerReceivesCancellationOnTimeout()
{
var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline));
var serverReceivedCancellationTcs = new TaskCompletionSource<bool>();
try
{
Calls.BlockingUnaryCall(callDetails, "CHECK_CANCELLATION_RECEIVED");
Assert.Fail();
}
catch (RpcException e)
{
// We can't guarantee the status code is always DeadlineExceeded. See issue #2685.
Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
Assert.AreEqual("CANCELLED", stringFromServerHandlerTcs.Task.Result);
}
private static async Task<string> TestMethodHandler(string request, ServerCallContext context)
{
if (request == "TIMEOUT")
{
await Task.Delay(60000);
return "";
}
if (request == "RETURN_DEADLINE")
{
if (context.Deadline == DateTime.MaxValue)
{
return "DATETIME_MAXVALUE";
}
return context.Deadline.Ticks.ToString();
}
if (request == "CHECK_CANCELLATION_RECEIVED")
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
// wait until cancellation token is fired.
var tcs = new TaskCompletionSource<object>();
context.CancellationToken.Register(() => { tcs.SetResult(null); });
await tcs.Task;
stringFromServerHandlerTcs.SetResult("CANCELLED");
serverReceivedCancellationTcs.SetResult(true);
return "";
}
});
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)))), "abc"));
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
return "";
Assert.IsTrue(await serverReceivedCancellationTcs.Task);
}
}
}

@ -47,6 +47,8 @@ namespace Grpc.Core
readonly Metadata headers;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
readonly WriteOptions writeOptions;
readonly ContextPropagationToken propagationToken;
/// <summary>
/// Creates a new instance of <c>CallOptions</c>.
@ -54,12 +56,17 @@ namespace Grpc.Core
/// <param name="headers">Headers to be sent with the call.</param>
/// <param name="deadline">Deadline for the call to finish. null means no deadline.</param>
/// <param name="cancellationToken">Can be used to request cancellation of the call.</param>
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
/// <param name="writeOptions">Write options that will be used for this call.</param>
/// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param>
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken? cancellationToken = null,
WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null)
{
// TODO(jtattermusch): consider only creating metadata object once it's really needed.
this.headers = headers != null ? headers : new Metadata();
this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue;
this.cancellationToken = cancellationToken;
this.headers = headers ?? new Metadata();
this.deadline = deadline ?? (propagationToken != null ? propagationToken.Deadline : DateTime.MaxValue);
this.cancellationToken = cancellationToken ?? (propagationToken != null ? propagationToken.CancellationToken : CancellationToken.None);
this.writeOptions = writeOptions;
this.propagationToken = propagationToken;
}
/// <summary>
@ -85,5 +92,27 @@ namespace Grpc.Core
{
get { return cancellationToken; }
}
/// <summary>
/// Write options that will be used for this call.
/// </summary>
public WriteOptions WriteOptions
{
get
{
return this.writeOptions;
}
}
/// <summary>
/// Token for propagating parent call context.
/// </summary>
public ContextPropagationToken PropagationToken
{
get
{
return this.propagationToken;
}
}
}
}

@ -0,0 +1,63 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
namespace Grpc.Core
{
/// <summary>
/// Compression level based on grpc_compression_level from grpc/compression.h
/// </summary>
public enum CompressionLevel
{
/// <summary>
/// No compression.
/// </summary>
None = 0,
/// <summary>
/// Low compression.
/// </summary>
Low,
/// <summary>
/// Medium compression.
/// </summary>
Medium,
/// <summary>
/// High compression.
/// </summary>
High,
}
}

@ -0,0 +1,139 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Threading;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
/// Token for propagating context of server side handlers to child calls.
/// In situations when a backend is making calls to another backend,
/// it makes sense to propagate properties like deadline and cancellation
/// token of the server call to the child call.
/// C core provides some other contexts (like tracing context) that
/// are not accessible to C# layer, but this token still allows propagating them.
/// </summary>
public class ContextPropagationToken
{
/// <summary>
/// Default propagation mask used by C core.
/// </summary>
const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
/// <summary>
/// Default propagation mask used by C# - we want to propagate deadline
/// and cancellation token by our own means.
/// </summary>
internal const ContextPropagationFlags DefaultMask = DefaultCoreMask
& ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation;
readonly CallSafeHandle parentCall;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
readonly ContextPropagationOptions options;
internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options)
{
this.parentCall = Preconditions.CheckNotNull(parentCall);
this.deadline = deadline;
this.cancellationToken = cancellationToken;
this.options = options ?? ContextPropagationOptions.Default;
}
internal CallSafeHandle ParentCall
{
get
{
return this.parentCall;
}
}
internal DateTime Deadline
{
get
{
return this.deadline;
}
}
internal CancellationToken CancellationToken
{
get
{
return this.cancellationToken;
}
}
internal ContextPropagationOptions Options
{
get
{
return this.options;
}
}
internal bool IsPropagateDeadline
{
get { return false; }
}
internal bool IsPropagateCancellation
{
get { return false; }
}
}
/// <summary>
/// Options for <see cref="ContextPropagationToken"/>.
/// </summary>
public class ContextPropagationOptions
{
public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
}
/// <summary>
/// Context propagation flags from grpc/grpc.h.
/// </summary>
[Flags]
internal enum ContextPropagationFlags
{
Deadline = 1,
CensusStatsContext = 2,
CensusTracingContext = 4,
Cancellation = 8
}
}

@ -115,6 +115,9 @@
<Compile Include="ChannelState.cs" />
<Compile Include="CallInvocationDetails.cs" />
<Compile Include="CallOptions.cs" />
<Compile Include="CompressionLevel.cs" />
<Compile Include="WriteOptions.cs" />
<Compile Include="ContextPropagationToken.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />

@ -50,5 +50,13 @@ namespace Grpc.Core
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task WriteAsync(T message);
/// <summary>
/// Write options that will be used for the next write.
/// If null, default options will be used.
/// Once set, this property maintains its value across subsequent
/// writes.
/// <value>The write options.</value>
WriteOptions WriteOptions { get; set; }
}
}

@ -50,7 +50,7 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
readonly CallInvocationDetails<TRequest, TResponse> callDetails;
readonly CallInvocationDetails<TRequest, TResponse> details;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@ -63,7 +63,8 @@ namespace Grpc.Core.Internal
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.callDetails = callDetails;
this.details = callDetails;
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
@ -89,11 +90,11 @@ namespace Grpc.Core.Internal
readingDone = true;
}
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
call.StartUnary(payload, ctx, metadataArray);
call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
@ -130,7 +131,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@ -138,9 +139,9 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartUnary(payload, HandleUnaryResponse, metadataArray);
call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@ -157,12 +158,12 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
Initialize(details.Channel.Environment.CompletionQueue);
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
@ -181,16 +182,16 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartServerStreaming(payload, HandleFinished, metadataArray);
call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
}
}
@ -206,9 +207,9 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
Initialize(details.Channel.Environment.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
@ -219,9 +220,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
StartSendMessageInternal(msg, completionDelegate);
StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@ -278,6 +279,14 @@ namespace Grpc.Core.Internal
}
}
public CallInvocationDetails<TRequest, TResponse> Details
{
get
{
return this.details;
}
}
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@ -310,14 +319,18 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
private void Initialize(CompletionQueueSafeHandle cq)
{
var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq,
callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Options.Deadline));
callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
var propagationToken = details.Options.PropagationToken;
var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance;
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
RegisterCancellationCallback();
}
@ -325,13 +338,22 @@ namespace Grpc.Core.Internal
// Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
private void RegisterCancellationCallback()
{
var token = callDetails.Options.CancellationToken;
var token = details.Options.CancellationToken;
if (token.CanBeCanceled)
{
token.Register(() => this.Cancel());
}
}
/// <summary>
/// Gets WriteFlags set in callDetails.Options.WriteOptions
/// </summary>
private WriteFlags GetWriteFlagsForCall()
{
var writeOptions = details.Options.WriteOptions;
return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
}
/// <summary>
/// Handler for unary response completion.
/// </summary>

@ -71,6 +71,9 @@ namespace Grpc.Core.Internal
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
protected bool initialMetadataSent;
protected long streamingWritesCounter;
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
@ -123,7 +126,7 @@ namespace Grpc.Core.Internal
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate)
protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
@ -132,8 +135,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendMessage(payload, HandleSendFinished);
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
streamingWritesCounter++;
}
}

@ -83,9 +83,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate)
public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
StartSendMessageInternal(msg, completionDelegate);
StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@ -97,6 +97,35 @@ namespace Grpc.Core.Internal
StartReadMessageInternal(completionDelegate);
}
/// <summary>
/// Initiates sending a initial metadata.
/// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
/// to make things simpler.
/// completionDelegate is invoked upon completion.
/// </summary>
public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
Preconditions.CheckNotNull(headers, "metadata");
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
Preconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
CheckSendingAllowed();
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
}
this.initialMetadataSent = true;
sendCompletionDelegate = completionDelegate;
}
}
/// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
@ -111,7 +140,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;

@ -42,6 +42,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
@ -53,7 +55,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@ -62,7 +64,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
@ -70,7 +72,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@ -78,7 +80,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@ -88,6 +90,10 @@ namespace Grpc.Core.Internal
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
@ -103,17 +109,17 @@ namespace Grpc.Core.Internal
this.completionRegistry = completionRegistry;
}
public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@ -124,11 +130,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
@ -138,11 +144,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
@ -152,11 +158,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@ -173,6 +179,13 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
public void Cancel()
{
grpcsharp_call_cancel(this).CheckOk();

@ -47,7 +47,7 @@ namespace Grpc.Core.Internal
static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect);
@ -76,9 +76,9 @@ namespace Grpc.Core.Internal
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
var result = grpcsharp_channel_create_call(this, cq, method, host, deadline);
var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}

@ -40,16 +40,18 @@ namespace Grpc.Core.Internal
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
{
readonly AsyncCall<TRequest, TResponse> call;
WriteOptions writeOptions;
public ClientRequestStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
this.writeOptions = call.Details.Options.WriteOptions;
}
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, taskSource.CompletionDelegate);
call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@ -59,5 +61,24 @@ namespace Grpc.Core.Internal
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
return taskSource.Task;
}
public WriteOptions WriteOptions
{
get
{
return this.writeOptions;
}
set
{
writeOptions = value;
}
}
private WriteFlags GetWriteFlags()
{
var options = writeOptions;
return options != null ? options.Flags : default(WriteFlags);
}
}
}

@ -75,7 +75,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@ -131,7 +131,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@ -187,7 +187,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@ -247,7 +247,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@ -304,13 +304,14 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken)
public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
where TRequest : class
where TResponse : class
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
return new ServerCallContext(
newRpc.Method, newRpc.Host, peer, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken);
return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}
}

@ -38,11 +38,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Writes responses asynchronously to an underlying AsyncCallServer object.
/// </summary>
internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>
internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions
where TRequest : class
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
WriteOptions writeOptions;
public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call)
{
@ -52,7 +53,7 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, taskSource.CompletionDelegate);
call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@ -62,5 +63,31 @@ namespace Grpc.Core.Internal
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
return taskSource.Task;
}
public WriteOptions WriteOptions
{
get
{
return writeOptions;
}
set
{
writeOptions = value;
}
}
private WriteFlags GetWriteFlags()
{
var options = writeOptions;
return options != null ? options.Flags : default(WriteFlags);
}
}
}

@ -114,6 +114,16 @@ namespace Grpc.Core
entries.Add(item);
}
public void Add(string key, string value)
{
Add(new Entry(key, value));
}
public void Add(string key, byte[] valueBytes)
{
Add(new Entry(key, valueBytes));
}
public void Clear()
{
CheckWriteable();

@ -36,15 +36,16 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core
{
/// <summary>
/// Context for a server-side call.
/// </summary>
public sealed class ServerCallContext
public class ServerCallContext
{
// TODO(jtattermusch): expose method to send initial metadata back to client
private readonly CallSafeHandle callHandle;
private readonly string method;
private readonly string host;
private readonly string peer;
@ -54,15 +55,34 @@ namespace Grpc.Core
private readonly Metadata responseTrailers = new Metadata();
private Status status = Status.DefaultSuccess;
private Func<Metadata, Task> writeHeadersFunc;
private IHasWriteOptions writeOptionsHolder;
public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken)
internal ServerCallContext(CallSafeHandle callHandle, string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
{
this.callHandle = callHandle;
this.method = method;
this.host = host;
this.peer = peer;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
this.writeHeadersFunc = writeHeadersFunc;
this.writeOptionsHolder = writeOptionsHolder;
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
return writeHeadersFunc(responseHeaders);
}
/// <summary>
/// Creates a propagation token to be used to propagate call context to a child call.
/// </summary>
public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null)
{
return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
}
/// <summary>Name of method called in this RPC.</summary>
@ -110,7 +130,7 @@ namespace Grpc.Core
}
}
///<summary>Cancellation token signals when call is cancelled.</summary>
/// <summary>Cancellation token signals when call is cancelled.</summary>
public CancellationToken CancellationToken
{
get
@ -141,5 +161,31 @@ namespace Grpc.Core
status = value;
}
}
/// <summary>
/// Allows setting write options for the following write.
/// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience.
/// Both properties are backed by the same underlying value.
/// </summary>
public WriteOptions WriteOptions
{
get
{
return writeOptionsHolder.WriteOptions;
}
set
{
writeOptionsHolder.WriteOptions = value;
}
}
}
/// <summary>
/// Allows sharing write options between ServerCallContext and other objects.
/// </summary>
public interface IHasWriteOptions
{
WriteOptions WriteOptions { get; set; }
}
}

@ -0,0 +1,82 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
namespace Grpc.Core
{
/// <summary>
/// Flags for write operations.
/// </summary>
[Flags]
public enum WriteFlags
{
/// <summary>
/// Hint that the write may be buffered and need not go out on the wire immediately.
/// gRPC is free to buffer the message until the next non-buffered
/// write, or until write stream completion, but it need not buffer completely or at all.
/// </summary>
BufferHint = 0x1,
/// <summary>
/// Force compression to be disabled for a particular write.
/// </summary>
NoCompress = 0x2
}
/// <summary>
/// Options for write operations.
/// </summary>
public class WriteOptions
{
/// <summary>
/// Default write options.
/// </summary>
public static readonly WriteOptions Default = new WriteOptions();
private WriteFlags flags;
public WriteOptions(WriteFlags flags = default(WriteFlags))
{
this.flags = flags;
}
public WriteFlags Flags
{
get
{
return this.flags;
}
}
}
}

@ -92,15 +92,8 @@ namespace math.Tests
[Test]
public void DivByZero()
{
try
{
DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build());
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
}
var ex = Assert.Throws<RpcException>(() => client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build()));
Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
}
[Test]
@ -158,15 +151,10 @@ namespace math.Tests
using (var call = client.Fib(new FibArgs.Builder { Limit = 0 }.Build(),
deadline: DateTime.UtcNow.AddMilliseconds(500)))
{
try
{
await call.ResponseStream.ToList();
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
}
var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.ToList());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
}

@ -404,15 +404,8 @@ namespace Grpc.IntegrationTesting
await Task.Delay(1000);
cts.Cancel();
try
{
var response = await call.ResponseAsync;
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
}
var ex = Assert.Throws<RpcException>(async () => await call.ResponseAsync);
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
Console.WriteLine("Passed!");
}
@ -435,15 +428,8 @@ namespace Grpc.IntegrationTesting
cts.Cancel();
try
{
await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
}
var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
Console.WriteLine("Passed!");
}

@ -376,10 +376,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) {
}
GPR_EXPORT grpc_call *GPR_CALLTYPE
grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq,
grpcsharp_channel_create_call(grpc_channel *channel, grpc_call *parent_call,
gpr_uint32 propagation_mask,
grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
return grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
return grpc_channel_create_call(channel, parent_call, propagation_mask, cq,
method, host, deadline);
}
@ -497,7 +499,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[6];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -511,7 +513,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[1].flags = 0;
ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
@ -578,7 +580,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[5];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -592,7 +594,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[1].flags = 0;
ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
@ -651,15 +653,22 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len) {
const char *send_buffer, size_t send_buffer_len,
gpr_uint32 write_flags,
gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpc_op ops[2];
size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
ops[0].flags = 0;
ops[0].flags = write_flags;
ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].data.send_initial_metadata.count = 0;
ops[1].data.send_initial_metadata.metadata = NULL;
ops[1].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
return grpc_call_start_batch(call, ops, nops, ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@ -675,9 +684,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
const char *status_details, grpc_metadata_array *trailing_metadata) {
const char *status_details, grpc_metadata_array *trailing_metadata,
gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpc_op ops[2];
size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@ -689,8 +700,12 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].data.send_initial_metadata.count = 0;
ops[1].data.send_initial_metadata.metadata = NULL;
ops[1].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
return grpc_call_start_batch(call, ops, nops, ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@ -706,16 +721,28 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[2];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
grpc_op ops[1];
ops[0].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops[0].data.recv_close_on_server.cancelled =
(&ctx->recv_close_on_server_cancelled);
ops[0].flags = 0;
ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops[1].data.recv_close_on_server.cancelled =
(&ctx->recv_close_on_server_cancelled);
ops[1].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_initial_metadata(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}

@ -74,11 +74,20 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRXConcurrentWriteable *_responseWriteable;
// The network thread wants the requestWriter to resume (when the server is ready for more input),
// or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
// it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
// We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
// writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
// pause the writer immediately on writeValue:, so we need our locking to be recursive.
GRXWriter *_requestWriter;
// To create a retain cycle when a call is started, up until it finishes. See
// |startWithWriteable:| and |finishWithError:|.
GRPCCall *_self;
// |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
// reference to the call object if all they're interested in is the handler being executed when
// the response arrives.
GRPCCall *_retainSelf;
NSMutableDictionary *_requestMetadata;
NSMutableDictionary *_responseMetadata;
@ -136,11 +145,12 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
- (void)finishWithError:(NSError *)errorOrNil {
// If the call isn't retained anywhere else, it can be deallocated now.
_self = nil;
_retainSelf = nil;
// If there were still request messages coming, stop them.
_requestWriter.state = GRXWriterStateFinished;
_requestWriter = nil;
@synchronized(_requestWriter) {
_requestWriter.state = GRXWriterStateFinished;
}
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
@ -240,12 +250,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf->_requestWriter.state = GRXWriterStateStarted;
@synchronized(strongSelf->_requestWriter) {
strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
}
};
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc]
initWithMessage:message
handler:resumingHandler]] errorHandler:errorHandler];
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
handler:resumingHandler]]
errorHandler:errorHandler];
}
- (void)writeValue:(id)value {
@ -253,7 +265,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
_requestWriter.state = GRXWriterStatePaused;
@synchronized(_requestWriter) {
_requestWriter.state = GRXWriterStatePaused;
}
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
@ -273,7 +287,6 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
_requestWriter = nil;
if (errorOrNil) {
[self cancel];
} else {
@ -327,7 +340,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
}];
// Now that the RPC has been initiated, request writes can start.
[_requestWriter startWithWriteable:self];
@synchronized(_requestWriter) {
[_requestWriter startWithWriteable:self];
}
}
#pragma mark GRXWriter implementation
@ -338,7 +353,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// before being autoreleased).
// Care is taken not to retain self strongly in any of the blocks used in this implementation, so
// that the life of the instance is determined by this retain cycle.
_self = self;
_retainSelf = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
[self sendHeaders:_requestMetadata];

@ -38,15 +38,18 @@
// Returns NULL if the file at path couldn't be read. In that case, if errorPtr isn't NULL,
// *errorPtr will be an object describing what went wrong.
static grpc_credentials *CertificatesAtPath(NSString *path, NSError **errorPtr) {
NSString *certsContent = [NSString stringWithContentsOfFile:path
encoding:NSASCIIStringEncoding
// Files in PEM format can have non-ASCII characters in their comments (e.g. for the name of the
// issuer). Load them as UTF8 and produce an ASCII equivalent.
NSString *contentInUTF8 = [NSString stringWithContentsOfFile:path
encoding:NSUTF8StringEncoding
error:errorPtr];
if (!certsContent) {
NSData *contentInASCII = [contentInUTF8 dataUsingEncoding:NSASCIIStringEncoding
allowLossyConversion:YES];
if (!contentInASCII.bytes) {
// Passing NULL to grpc_ssl_credentials_create produces behavior we don't want, so return.
return NULL;
}
const char * asCString = [certsContent cStringUsingEncoding:NSASCIIStringEncoding];
return grpc_ssl_credentials_create(asCString, NULL);
return grpc_ssl_credentials_create(contentInASCII.bytes, NULL);
}
@implementation GRPCSecureChannel

@ -36,13 +36,11 @@
#import "GRXWriteable.h"
#import "GRXWriter.h"
// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
// to -startWithWriteable:).
// A buffered pipe is a Writer that also acts as a Writeable.
// Once it is started, whatever values are written into it (via -writeValue:) will be propagated
// immediately, unless flow control prevents it.
// If it is throttled and keeps receiving values, as well as if it receives values before being
// started, it will buffer them and propagate them in order as soon as its state becomes
// GRXWriterStateStarted.
// started, it will buffer them and propagate them in order as soon as its state becomes Started.
// If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
// propagate the error immediately.
//
@ -51,6 +49,9 @@
// pipe will keep buffering all data written to it, your application could run out of memory and
// crash. If you want to react to flow control signals to prevent that, instead of using this class
// you can implement an object that conforms to GRXWriter.
//
// Thread-safety:
// The methods of an object of this class should not be called concurrently from different threads.
@interface GRXBufferedPipe : GRXWriter<GRXWriteable>
// Convenience constructor.

@ -33,11 +33,17 @@
#import "GRXWriter.h"
// A "proxy" class that simply forwards values, completion, and errors from its
// input writer to its writeable.
// A "proxy" class that simply forwards values, completion, and errors from its input writer to its
// writeable.
// It is useful as a superclass for pipes that act as a transformation of their
// input writer, and for classes that represent objects with input and
// output sequences of values, like an RPC.
//
// Thread-safety:
// All messages sent to this object need to be serialized. When it is started, the writer it wraps
// is started in the same thread. Manual state changes are propagated to the wrapped writer in the
// same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be
// serialized with any message sent to this object.
@interface GRXForwardingWriter : GRXWriter
- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;
@end

@ -48,7 +48,11 @@
// Designated initializer
- (instancetype)initWithWriter:(GRXWriter *)writer {
if (!writer) {
[NSException raise:NSInvalidArgumentException format:@"writer can't be nil."];
return nil;
}
if (writer.state != GRXWriterStateNotStarted) {
[NSException raise:NSInvalidArgumentException
format:@"The writer argument must not have already started."];
}
if ((self = [super init])) {
_writer = writer;

@ -36,10 +36,17 @@
#import "GRXWriter.h"
// Utility to construct GRXWriter instances from values that are immediately available when
// required. The returned writers all support pausing and early termination.
// required.
//
// Unless the writeable callback pauses them or stops them early, these writers will do all their
// interactions with the writeable before the start method returns.
// Thread-safety:
//
// An object of this class shouldn't be messaged concurrently by more than one thread. It will start
// messaging the writeable before |startWithWriteable:| returns, in the same thread. That is the
// only place where the writer can be paused or stopped prematurely.
//
// If a paused writer of this class is resumed, it will start messaging the writeable, in the same
// thread, before |setState:| returns. Because the object can't be legally accessed concurrently,
// that's the only place where it can be paused again (or stopped).
@interface GRXImmediateWriter : GRXWriter
// Returns a writer that pulls values from the passed NSEnumerator instance and pushes them to

@ -35,84 +35,73 @@
#import "GRXWriteable.h"
// States of a writer.
typedef NS_ENUM(NSInteger, GRXWriterState) {
// The writer has not yet been given a writeable to which it can push its
// values. To have an writer transition to the Started state, send it a
// startWithWriteable: message.
// The writer has not yet been given a writeable to which it can push its values. To have a writer
// transition to the Started state, send it a startWithWriteable: message.
//
// An writer's state cannot be manually set to this value.
// A writer's state cannot be manually set to this value.
GRXWriterStateNotStarted,
// The writer might push values to the writeable at any moment.
GRXWriterStateStarted,
// The writer is temporarily paused, and won't send any more values to the
// writeable unless its state is set back to Started. The writer might still
// transition to the Finished state at any moment, and is allowed to send
// writesFinishedWithError: to its writeable.
//
// Not all implementations of writer have to support pausing, and thus
// trying to set an writer's state to this value might have no effect.
// The writer is temporarily paused, and won't send any more values to the writeable unless its
// state is set back to Started. The writer might still transition to the Finished state at any
// moment, and is allowed to send writesFinishedWithError: to its writeable.
GRXWriterStatePaused,
// The writer has released its writeable and won't interact with it anymore.
//
// One seldomly wants to set an writer's state to this value, as its
// writeable isn't notified with a writesFinishedWithError: message. Instead, sending
// finishWithError: to the writer will make it notify the writeable and then
// transition to this state.
// One seldomly wants to set a writer's state to this value, as its writeable isn't notified with
// a writesFinishedWithError: message. Instead, sending finishWithError: to the writer will make
// it notify the writeable and then transition to this state.
GRXWriterStateFinished
};
// An object that conforms to this protocol can produce, on demand, a sequence
// of values. The sequence may be produced asynchronously, and it may consist of
// any number of elements, including none or an infinite number.
// An GRXWriter object can produce, on demand, a sequence of values. The sequence may be produced
// asynchronously, and it may consist of any number of elements, including none or an infinite
// number.
//
// GRXWriter is the active dual of NSEnumerator. The difference between them is thus whether the
// object plays an active or passive role during usage: A user of NSEnumerator pulls values off it,
// and passes the values to a writeable. A user of GRXWriter, though, just gives it a writeable, and
// the GRXWriter instance pushes values to the writeable. This makes this protocol suitable to
// represent a sequence of future values, as well as collections with internal iteration.
//
// GRXWriter is the active dual of NSEnumerator. The difference between them
// is thus whether the object plays an active or passive role during usage: A
// user of NSEnumerator pulls values off it, and passes the values to a writeable.
// A user of GRXWriter, though, just gives it a writeable, and the
// GRXWriter instance pushes values to the writeable. This makes this protocol
// suitable to represent a sequence of future values, as well as collections
// with internal iteration.
// An instance of GRXWriter can start producing values after a writeable is passed to it. It can
// also be commanded to finish the sequence immediately (with an optional error). Finally, it can be
// asked to pause, and resumed later. All GRXWriter objects support pausing and early termination.
//
// An instance of GRXWriter can start producing values after a writeable is
// passed to it. It can also be commanded to finish the sequence immediately
// (with an optional error). Finally, it can be asked to pause, but the
// conforming instance is not required to oblige.
// Thread-safety:
//
// Unless otherwise indicated by a conforming class, no messages should be sent
// concurrently to a GRXWriter. I.e., conforming classes aren't required to
// be thread-safe.
// State transitions take immediate effect if the object is used from a single thread. Subclasses
// might offer stronger guarantees.
//
// Unless otherwise indicated by a conforming subclass, no messages should be sent concurrently to a
// GRXWriter. I.e., conforming classes aren't required to be thread-safe.
@interface GRXWriter : NSObject
// This property can be used to query the current state of the writer, which
// determines how it might currently use its writeable. Some state transitions can
// be triggered by setting this property to the corresponding value, and that's
// useful for advanced use cases like pausing an writer. For more details,
// see the documentation of the enum.
// This property can be used to query the current state of the writer, which determines how it might
// currently use its writeable. Some state transitions can be triggered by setting this property to
// the corresponding value, and that's useful for advanced use cases like pausing an writer. For
// more details, see the documentation of the enum further down.
@property(nonatomic) GRXWriterState state;
// Start sending messages to the writeable. Messages may be sent before the method
// returns, or they may be sent later in the future. See GRXWriteable.h for the
// different messages a writeable can receive.
// Transition to the Started state, and start sending messages to the writeable (a reference to it
// is retained). Messages to the writeable may be sent before the method returns, or they may be
// sent later in the future. See GRXWriteable.h for the different messages a writeable can receive.
//
// If this writer draws its values from an external source (e.g. from the
// filesystem or from a server), calling this method will commonly trigger side
// effects (like network connections).
// If this writer draws its values from an external source (e.g. from the filesystem or from a
// server), calling this method will commonly trigger side effects (like network connections).
//
// This method might only be called on writers in the NotStarted state.
- (void)startWithWriteable:(id<GRXWriteable>)writeable;
// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send
// any more messages to it.
//
// This method might only be called on writers in the Started or Paused
// state.
// Send writesFinishedWithError:errorOrNil to the writeable. Then release the reference to it and
// transition to the Finished state.
//
// TODO(jcanizales): Consider adding some guarantee about the immediacy of that
// stopping. I know I've relied on it in part of the code that uses this, but
// can't remember the details in the presence of concurrency.
// This method might only be called on writers in the Started or Paused state.
- (void)finishWithError:(NSError *)errorOrNil;
@end

@ -114,7 +114,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
[self waitForExpectationsWithTimeout:4 handler:nil];
[self waitForExpectationsWithTimeout:8 handler:nil];
}
- (void)testSimpleProtoRPC {
@ -146,7 +146,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
[self waitForExpectationsWithTimeout:4 handler:nil];
[self waitForExpectationsWithTimeout:8 handler:nil];
}
- (void)testMetadata {

@ -37,8 +37,7 @@
// https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
@interface InteropTests : XCTestCase
// Returns @"localhost:5050".
// Returns @"grpc-test.sandbox.google.com".
// Override in a subclass to perform the same tests against a different address.
// For interop tests, use @"grpc-test.sandbox.google.com".
+ (NSString *)host;
@end

@ -78,20 +78,17 @@
#pragma mark Tests
static NSString * const kLocalCleartextHost = @"localhost:5050";
static NSString * const kRemoteSSLHost = @"grpc-test.sandbox.google.com";
@implementation InteropTests {
RMTTestService *_service;
}
+ (NSString *)host {
return kLocalCleartextHost;
return kRemoteSSLHost;
}
- (void)setUp {
// Register test server as non-SSL.
[GRPCCall useInsecureConnectionsForHost:kLocalCleartextHost];
_service = [[RMTTestService alloc] initWithHost:self.class.host];
}
@ -131,7 +128,7 @@ static NSString * const kLocalCleartextHost = @"localhost:5050";
[expectation fulfill];
}];
[self waitForExpectationsWithTimeout:8 handler:nil];
[self waitForExpectationsWithTimeout:16 handler:nil];
}
- (void)testClientStreamingRPC {

@ -0,0 +1,59 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
// Repeat of the tests in InteropTests.m, but sending the RPCs to a local cleartext server instead
// of the remote SSL one.
#import <GRPCClient/GRPCCall+Tests.h>
#import "InteropTests.h"
static NSString * const kLocalCleartextHost = @"localhost:5050";
@interface InteropTestsLocalCleartext : InteropTests
@end
@implementation InteropTestsLocalCleartext
+ (NSString *)host {
return kLocalCleartextHost;
}
- (void)setUp {
// Register test server as non-SSL.
[GRPCCall useInsecureConnectionsForHost:kLocalCleartextHost];
[super setUp];
}
@end

@ -31,8 +31,8 @@
*
*/
// Repeat of the tests in InteropTests.m, but using SSL to communicate with the local server instead
// of cleartext.
// Repeat of the tests in InteropTests.m, but sending the RPCs to a local SSL server instead of the
// remote one.
#import <GRPCClient/GRPCCall+Tests.h>

@ -13,6 +13,7 @@
63423F511B151B77006CF63C /* RxLibraryUnitTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 63423F501B151B77006CF63C /* RxLibraryUnitTests.m */; };
635697CD1B14FC11007A7283 /* Tests.m in Sources */ = {isa = PBXBuildFile; fileRef = 635697CC1B14FC11007A7283 /* Tests.m */; };
635ED2EC1B1A3BC400FDE5C3 /* InteropTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */; };
63715F561B780C020029CB0B /* InteropTestsLocalCleartext.m in Sources */ = {isa = PBXBuildFile; fileRef = 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */; };
63E240CE1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m in Sources */ = {isa = PBXBuildFile; fileRef = 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */; };
63E240D01B6C63DC005F3B0E /* TestCertificates.bundle in Resources */ = {isa = PBXBuildFile; fileRef = 63E240CF1B6C63DC005F3B0E /* TestCertificates.bundle */; };
7D8A186224D39101F90230F6 /* libPods.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 35F2B6BF3BAE8F0DC4AFD76E /* libPods.a */; };
@ -51,6 +52,7 @@
635697CC1B14FC11007A7283 /* Tests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = Tests.m; sourceTree = "<group>"; };
635697D81B14FC11007A7283 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTests.m; sourceTree = "<group>"; };
63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTestsLocalCleartext.m; sourceTree = "<group>"; };
63E240CC1B6C4D3A005F3B0E /* InteropTests.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = InteropTests.h; sourceTree = "<group>"; };
63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTestsLocalSSL.m; sourceTree = "<group>"; };
63E240CF1B6C63DC005F3B0E /* TestCertificates.bundle */ = {isa = PBXFileReference; lastKnownFileType = "wrapper.plug-in"; path = TestCertificates.bundle; sourceTree = "<group>"; };
@ -117,14 +119,15 @@
635697C91B14FC11007A7283 /* Tests */ = {
isa = PBXGroup;
children = (
63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */,
6312AE4D1B1BF49B00341DEE /* GRPCClientTests.m */,
63175DFE1B1B9FAF00027841 /* LocalClearTextTests.m */,
63E240CC1B6C4D3A005F3B0E /* InteropTests.h */,
635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */,
63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */,
63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */,
63423F501B151B77006CF63C /* RxLibraryUnitTests.m */,
63175DFE1B1B9FAF00027841 /* LocalClearTextTests.m */,
635697CC1B14FC11007A7283 /* Tests.m */,
635697D71B14FC11007A7283 /* Supporting Files */,
63E240CC1B6C4D3A005F3B0E /* InteropTests.h */,
);
name = Tests;
sourceTree = SOURCE_ROOT;
@ -261,6 +264,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
63715F561B780C020029CB0B /* InteropTestsLocalCleartext.m in Sources */,
63175DFF1B1B9FAF00027841 /* LocalClearTextTests.m in Sources */,
63423F511B151B77006CF63C /* RxLibraryUnitTests.m in Sources */,
63E240CE1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m in Sources */,

Loading…
Cancel
Save