Merge pull request #2866 from jtattermusch/context_api

C# Context API
pull/2574/head
Jan Tattermusch 9 years ago
commit 295224605f
  1. 4
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  2. 2
      src/csharp/Grpc.Core.Tests/CompressionTest.cs
  3. 122
      src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
  4. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  5. 8
      src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
  6. 5
      src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
  7. 24
      src/csharp/Grpc.Core/CallOptions.cs
  8. 139
      src/csharp/Grpc.Core/ContextPropagationToken.cs
  9. 1
      src/csharp/Grpc.Core/Grpc.Core.csproj
  10. 6
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  11. 2
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  12. 6
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  13. 3
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  14. 10
      src/csharp/Grpc.Core/Metadata.cs
  15. 12
      src/csharp/Grpc.Core/ServerCallContext.cs
  16. 6
      src/csharp/ext/grpc_csharp_ext.c

@ -190,8 +190,8 @@ namespace Grpc.Core.Tests
var headers = new Metadata
{
new Metadata.Entry("ascii-header", "abcdefg"),
new Metadata.Entry("binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff }),
{ "ascii-header", "abcdefg" },
{ "binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff } }
};
var call = Calls.AsyncUnaryCall(helper.CreateUnaryCall(new CallOptions(headers: headers)), "ABC");
await call;

@ -94,7 +94,7 @@ namespace Grpc.Core.Tests
context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await context.WriteResponseHeadersAsync(new Metadata { new Metadata.Entry("ascii-header", "abcdefg") });
await context.WriteResponseHeadersAsync(new Metadata { { "ascii-header", "abcdefg" } });
await responseStream.WriteAsync("X");

@ -0,0 +1,122 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ContextPropagationTest
{
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper();
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public async Task PropagateCancellation()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
// check that we didn't obtain the default cancellation token.
Assert.IsTrue(context.CancellationToken.CanBeCanceled);
return "PASS";
});
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
var propagationToken = context.CreatePropagationToken();
Assert.IsNotNull(propagationToken.ParentCall);
var callOptions = new CallOptions(propagationToken: propagationToken);
return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
});
var cts = new CancellationTokenSource();
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
await call.RequestStream.CompleteAsync();
Assert.AreEqual("PASS", await call);
}
[Test]
public async Task PropagateDeadline()
{
var deadline = DateTime.UtcNow.AddDays(7);
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Assert.IsTrue(context.Deadline < deadline.AddMinutes(1));
Assert.IsTrue(context.Deadline > deadline.AddMinutes(-1));
return "PASS";
});
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
var callOptions = new CallOptions(propagationToken: context.CreatePropagationToken());
return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(deadline: deadline)));
await call.RequestStream.CompleteAsync();
Assert.AreEqual("PASS", await call);
}
}
}

@ -80,6 +80,7 @@
<Compile Include="MockServiceHelper.cs" />
<Compile Include="ResponseHeadersTest.cs" />
<Compile Include="CompressionTest.cs" />
<Compile Include="ContextPropagationTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -53,8 +53,8 @@ namespace Grpc.Core.Internal.Tests
{
var metadata = new Metadata
{
new Metadata.Entry("host", "somehost"),
new Metadata.Entry("header2", "header value"),
{ "host", "somehost" },
{ "header2", "header value" },
};
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
@ -65,8 +65,8 @@ namespace Grpc.Core.Internal.Tests
{
var metadata = new Metadata
{
new Metadata.Entry("host", "somehost"),
new Metadata.Entry("header2", "header value"),
{ "host", "somehost" },
{ "header2", "header value" }
};
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);

@ -63,10 +63,7 @@ namespace Grpc.Core.Tests
server.Start();
channel = helper.GetChannel();
headers = new Metadata
{
new Metadata.Entry("ascii-header", "abcdefg"),
};
headers = new Metadata { { "ascii-header", "abcdefg" } };
}
[TearDown]

@ -48,6 +48,7 @@ namespace Grpc.Core
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
readonly WriteOptions writeOptions;
readonly ContextPropagationToken propagationToken;
/// <summary>
/// Creates a new instance of <c>CallOptions</c>.
@ -56,14 +57,16 @@ namespace Grpc.Core
/// <param name="deadline">Deadline for the call to finish. null means no deadline.</param>
/// <param name="cancellationToken">Can be used to request cancellation of the call.</param>
/// <param name="writeOptions">Write options that will be used for this call.</param>
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken), WriteOptions writeOptions = null)
/// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param>
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken? cancellationToken = null,
WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null)
{
// TODO(jtattermusch): consider only creating metadata object once it's really needed.
this.headers = headers != null ? headers : new Metadata();
// TODO(jtattermusch): allow null value of deadline?
this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue;
this.cancellationToken = cancellationToken;
this.headers = headers ?? new Metadata();
this.deadline = deadline ?? (propagationToken != null ? propagationToken.Deadline : DateTime.MaxValue);
this.cancellationToken = cancellationToken ?? (propagationToken != null ? propagationToken.CancellationToken : CancellationToken.None);
this.writeOptions = writeOptions;
this.propagationToken = propagationToken;
}
/// <summary>
@ -100,5 +103,16 @@ namespace Grpc.Core
return this.writeOptions;
}
}
/// <summary>
/// Token for propagating parent call context.
/// </summary>
public ContextPropagationToken PropagationToken
{
get
{
return this.propagationToken;
}
}
}
}

@ -0,0 +1,139 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Threading;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
/// Token for propagating context of server side handlers to child calls.
/// In situations when a backend is making calls to another backend,
/// it makes sense to propagate properties like deadline and cancellation
/// token of the server call to the child call.
/// C core provides some other contexts (like tracing context) that
/// are not accessible to C# layer, but this token still allows propagating them.
/// </summary>
public class ContextPropagationToken
{
/// <summary>
/// Default propagation mask used by C core.
/// </summary>
const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
/// <summary>
/// Default propagation mask used by C# - we want to propagate deadline
/// and cancellation token by our own means.
/// </summary>
internal const ContextPropagationFlags DefaultMask = DefaultCoreMask
& ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation;
readonly CallSafeHandle parentCall;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
readonly ContextPropagationOptions options;
internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options)
{
this.parentCall = Preconditions.CheckNotNull(parentCall);
this.deadline = deadline;
this.cancellationToken = cancellationToken;
this.options = options ?? ContextPropagationOptions.Default;
}
internal CallSafeHandle ParentCall
{
get
{
return this.parentCall;
}
}
internal DateTime Deadline
{
get
{
return this.deadline;
}
}
internal CancellationToken CancellationToken
{
get
{
return this.cancellationToken;
}
}
internal ContextPropagationOptions Options
{
get
{
return this.options;
}
}
internal bool IsPropagateDeadline
{
get { return false; }
}
internal bool IsPropagateCancellation
{
get { return false; }
}
}
/// <summary>
/// Options for <see cref="ContextPropagationToken"/>.
/// </summary>
public class ContextPropagationOptions
{
public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
}
/// <summary>
/// Context propagation flags from grpc/grpc.h.
/// </summary>
[Flags]
internal enum ContextPropagationFlags
{
Deadline = 1,
CensusStatsContext = 2,
CensusTracingContext = 4,
Cancellation = 8
}
}

@ -117,6 +117,7 @@
<Compile Include="CallOptions.cs" />
<Compile Include="CompressionLevel.cs" />
<Compile Include="WriteOptions.cs" />
<Compile Include="ContextPropagationToken.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />

@ -324,7 +324,11 @@ namespace Grpc.Core.Internal
private void Initialize(CompletionQueueSafeHandle cq)
{
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, cq,
var propagationToken = details.Options.PropagationToken;
var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance;
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);

@ -42,6 +42,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;

@ -47,7 +47,7 @@ namespace Grpc.Core.Internal
static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect);
@ -76,9 +76,9 @@ namespace Grpc.Core.Internal
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
var result = grpcsharp_channel_create_call(this, cq, method, host, deadline);
var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}

@ -310,8 +310,7 @@ namespace Grpc.Core.Internal
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
return new ServerCallContext(
newRpc.Method, newRpc.Host, peer, realtimeDeadline,
return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}

@ -114,6 +114,16 @@ namespace Grpc.Core
entries.Add(item);
}
public void Add(string key, string value)
{
Add(new Entry(key, value));
}
public void Add(string key, byte[] valueBytes)
{
Add(new Entry(key, valueBytes));
}
public void Clear()
{
CheckWriteable();

@ -45,6 +45,7 @@ namespace Grpc.Core
/// </summary>
public class ServerCallContext
{
private readonly CallSafeHandle callHandle;
private readonly string method;
private readonly string host;
private readonly string peer;
@ -57,9 +58,10 @@ namespace Grpc.Core
private Func<Metadata, Task> writeHeadersFunc;
private IHasWriteOptions writeOptionsHolder;
public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
internal ServerCallContext(CallSafeHandle callHandle, string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
{
this.callHandle = callHandle;
this.method = method;
this.host = host;
this.peer = peer;
@ -75,6 +77,14 @@ namespace Grpc.Core
return writeHeadersFunc(responseHeaders);
}
/// <summary>
/// Creates a propagation token to be used to propagate call context to a child call.
/// </summary>
public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null)
{
return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
}
/// <summary>Name of method called in this RPC.</summary>
public string Method
{

@ -376,10 +376,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) {
}
GPR_EXPORT grpc_call *GPR_CALLTYPE
grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq,
grpcsharp_channel_create_call(grpc_channel *channel, grpc_call *parent_call,
gpr_uint32 propagation_mask,
grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
return grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
return grpc_channel_create_call(channel, parent_call, propagation_mask, cq,
method, host, deadline);
}

Loading…
Cancel
Save