polishing CallOptions

pull/2872/head
Jan Tattermusch 9 years ago
parent 2527967e36
commit 9698b5b29f
  1. 31
      src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
  2. 12
      src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
  3. 24
      src/csharp/Grpc.Core/CallInvocationDetails.cs
  4. 86
      src/csharp/Grpc.Core/CallOptions.cs
  5. 18
      src/csharp/Grpc.Core/ClientBase.cs
  6. 58
      src/csharp/Grpc.Core/ContextPropagationToken.cs
  7. 7
      src/csharp/Grpc.Core/Internal/AsyncCall.cs

@ -110,6 +110,14 @@ namespace Grpc.Core.Tests
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
Assert.Throws(typeof(ArgumentException), () =>
{
// Trying to override deadline while propagating deadline from parent call will throw.
Calls.BlockingUnaryCall(helper.CreateUnaryCall(
new CallOptions(deadline: DateTime.UtcNow.AddDays(8),
propagationToken: context.CreatePropagationToken())), "");
});
var callOptions = new CallOptions(propagationToken: context.CreatePropagationToken());
return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
});
@ -118,5 +126,28 @@ namespace Grpc.Core.Tests
await call.RequestStream.CompleteAsync();
Assert.AreEqual("PASS", await call);
}
[Test]
public async Task SuppressDeadlinePropagation()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Assert.AreEqual(DateTime.MaxValue, context.Deadline);
return "PASS";
});
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
Assert.IsTrue(context.CancellationToken.CanBeCanceled);
var callOptions = new CallOptions(propagationToken: context.CreatePropagationToken(new ContextPropagationOptions(propagateDeadline: false)));
return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
});
var cts = new CancellationTokenSource();
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddDays(7))));
await call.RequestStream.CompleteAsync();
Assert.AreEqual("PASS", await call);
}
}
}

@ -153,27 +153,23 @@ namespace Grpc.Core.Tests
return channel;
}
public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = null)
public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = default(CallOptions))
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, UnaryMethod, options);
}
public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = null)
public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = default(CallOptions))
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, ClientStreamingMethod, options);
}
public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = null)
public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = default(CallOptions))
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, ServerStreamingMethod, options);
}
public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = null)
public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = default(CallOptions))
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, DuplexStreamingMethod, options);
}

@ -40,17 +40,22 @@ namespace Grpc.Core
/// <summary>
/// Details about a client-side call to be invoked.
/// </summary>
public class CallInvocationDetails<TRequest, TResponse>
public struct CallInvocationDetails<TRequest, TResponse>
{
readonly Channel channel;
readonly string method;
readonly string host;
readonly Marshaller<TRequest> requestMarshaller;
readonly Marshaller<TResponse> responseMarshaller;
readonly CallOptions options;
CallOptions options;
public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, CallOptions options) :
this(channel, method.FullName, null, method.RequestMarshaller, method.ResponseMarshaller, options)
this(channel, method, null, options)
{
}
public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, string host, CallOptions options) :
this(channel, method.FullName, host, method.RequestMarshaller, method.ResponseMarshaller, options)
{
}
@ -61,7 +66,7 @@ namespace Grpc.Core
this.host = host;
this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller, "requestMarshaller");
this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller, "responseMarshaller");
this.options = Preconditions.CheckNotNull(options, "options");
this.options = options;
}
public Channel Channel
@ -111,5 +116,16 @@ namespace Grpc.Core
return options;
}
}
/// <summary>
/// Returns new instance of <see cref="CallInvocationDetails"/> with
/// <c>Options</c> set to the value provided. Values of all other fields are preserved.
/// </summary>
public CallInvocationDetails<TRequest, TResponse> WithOptions(CallOptions options)
{
var newDetails = this;
newDetails.options = options;
return newDetails;
}
}
}

@ -42,29 +42,28 @@ namespace Grpc.Core
/// <summary>
/// Options for calls made by client.
/// </summary>
public class CallOptions
public struct CallOptions
{
readonly Metadata headers;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
readonly WriteOptions writeOptions;
readonly ContextPropagationToken propagationToken;
Metadata headers;
DateTime? deadline;
CancellationToken cancellationToken;
WriteOptions writeOptions;
ContextPropagationToken propagationToken;
/// <summary>
/// Creates a new instance of <c>CallOptions</c>.
/// Creates a new instance of <c>CallOptions</c> struct.
/// </summary>
/// <param name="headers">Headers to be sent with the call.</param>
/// <param name="deadline">Deadline for the call to finish. null means no deadline.</param>
/// <param name="cancellationToken">Can be used to request cancellation of the call.</param>
/// <param name="writeOptions">Write options that will be used for this call.</param>
/// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param>
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken? cancellationToken = null,
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken),
WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null)
{
// TODO(jtattermusch): consider only creating metadata object once it's really needed.
this.headers = headers ?? new Metadata();
this.deadline = deadline ?? (propagationToken != null ? propagationToken.Deadline : DateTime.MaxValue);
this.cancellationToken = cancellationToken ?? (propagationToken != null ? propagationToken.CancellationToken : CancellationToken.None);
this.headers = headers;
this.deadline = deadline;
this.cancellationToken = cancellationToken;
this.writeOptions = writeOptions;
this.propagationToken = propagationToken;
}
@ -80,7 +79,7 @@ namespace Grpc.Core
/// <summary>
/// Call deadline.
/// </summary>
public DateTime Deadline
public DateTime? Deadline
{
get { return deadline; }
}
@ -114,5 +113,66 @@ namespace Grpc.Core
return this.propagationToken;
}
}
/// <summary>
/// Returns new instance of <see cref="CallOptions"/> with
/// <c>Headers</c> set to the value provided. Values of all other fields are preserved.
/// </summary>
public CallOptions WithHeaders(Metadata headers)
{
var newOptions = this;
newOptions.headers = headers;
return newOptions;
}
/// <summary>
/// Returns new instance of <see cref="CallOptions"/> with
/// <c>Deadline</c> set to the value provided. Values of all other fields are preserved.
/// </summary>
public CallOptions WithDeadline(DateTime deadline)
{
var newOptions = this;
newOptions.deadline = deadline;
return newOptions;
}
/// <summary>
/// Returns new instance of <see cref="CallOptions"/> with
/// <c>CancellationToken</c> set to the value provided. Values of all other fields are preserved.
/// </summary>
public CallOptions WithCancellationToken(CancellationToken cancellationToken)
{
var newOptions = this;
newOptions.cancellationToken = cancellationToken;
return newOptions;
}
/// <summary>
/// Returns a new instance of <see cref="CallOptions"/> with
/// all previously unset values set to their defaults and deadline and cancellation
/// token propagated when appropriate.
/// </summary>
internal CallOptions Normalize()
{
var newOptions = this;
if (propagationToken != null)
{
if (propagationToken.Options.IsPropagateDeadline)
{
Preconditions.CheckArgument(!newOptions.deadline.HasValue,
"Cannot propagate deadline from parent call. The deadline has already been set explicitly.");
newOptions.deadline = propagationToken.ParentDeadline;
}
if (propagationToken.Options.IsPropagateCancellation)
{
Preconditions.CheckArgument(!newOptions.cancellationToken.CanBeCanceled,
"Cannot propagate cancellation token from parent call. The cancellation token has already been set to a non-default value.");
}
}
newOptions.headers = newOptions.headers ?? Metadata.Empty;
newOptions.deadline = newOptions.deadline ?? DateTime.MaxValue;
return newOptions;
}
}
}

@ -62,6 +62,18 @@ namespace Grpc.Core
set;
}
/// <summary>
/// gRPC supports multiple "hosts" being served by a single server.
/// This property can be used to set the target host explicitly.
/// By default, this will be set to <c>null</c> with the meaning
/// "use default host".
/// </summary>
public string Host
{
get;
set;
}
/// <summary>
/// Channel associated with this client.
/// </summary>
@ -83,10 +95,14 @@ namespace Grpc.Core
var interceptor = HeaderInterceptor;
if (interceptor != null)
{
if (options.Headers == null)
{
options = options.WithHeaders(new Metadata());
}
interceptor(options.Headers);
options.Headers.Freeze();
}
return new CallInvocationDetails<TRequest, TResponse>(channel, method, options);
return new CallInvocationDetails<TRequest, TResponse>(channel, method, Host, options);
}
}
}

@ -52,7 +52,7 @@ namespace Grpc.Core
/// <summary>
/// Default propagation mask used by C core.
/// </summary>
const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
private const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
/// <summary>
/// Default propagation mask used by C# - we want to propagate deadline
@ -74,6 +74,9 @@ namespace Grpc.Core
this.options = options ?? ContextPropagationOptions.Default;
}
/// <summary>
/// Gets the native handle of the parent call.
/// </summary>
internal CallSafeHandle ParentCall
{
get
@ -82,7 +85,10 @@ namespace Grpc.Core
}
}
internal DateTime Deadline
/// <summary>
/// Gets the parent call's deadline.
/// </summary>
internal DateTime ParentDeadline
{
get
{
@ -90,7 +96,10 @@ namespace Grpc.Core
}
}
internal CancellationToken CancellationToken
/// <summary>
/// Gets the parent call's cancellation token.
/// </summary>
internal CancellationToken ParentCancellationToken
{
get
{
@ -98,6 +107,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Get the context propagation options.
/// </summary>
internal ContextPropagationOptions Options
{
get
@ -105,16 +117,6 @@ namespace Grpc.Core
return this.options;
}
}
internal bool IsPropagateDeadline
{
get { return false; }
}
internal bool IsPropagateCancellation
{
get { return false; }
}
}
/// <summary>
@ -122,7 +124,37 @@ namespace Grpc.Core
/// </summary>
public class ContextPropagationOptions
{
/// <summary>
/// The context propagation options that will be used by default.
/// </summary>
public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
bool propagateDeadline;
bool propagateCancellation;
/// <summary>
/// Creates new context propagation options.
/// </summary>
/// <param name="propagateDeadline">If set to <c>true</c> parent call's deadline will be propagated to the child call.</param>
/// <param name="propagateCancellation">If set to <c>true</c> parent call's cancellation token will be propagated to the child call.</param>
public ContextPropagationOptions(bool propagateDeadline = true, bool propagateCancellation = true)
{
this.propagateDeadline = propagateDeadline;
this.propagateCancellation = propagateCancellation;
}
/// <value><c>true</c> if parent call's deadline should be propagated to the child call.</value>
public bool IsPropagateDeadline
{
get { return this.propagateDeadline; }
}
/// <value><c>true</c> if parent call's cancellation token should be propagated to the child call.</value>
public bool IsPropagateCancellation
{
get { return this.propagateCancellation; }
}
}
/// <summary>

@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails;
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
@ -318,12 +318,11 @@ namespace Grpc.Core.Internal
private void Initialize(CompletionQueueSafeHandle cq)
{
var propagationToken = details.Options.PropagationToken;
var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance;
var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
RegisterCancellationCallback();

Loading…
Cancel
Save