Make C# ServerCallContext implementation agnostic

pull/17733/head
Jan Tattermusch 6 years ago
parent 40b35dec12
commit 9c51ff9b33
  1. 21
      src/csharp/Grpc.Core.Testing/TestServerCallContext.cs
  2. 1
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  3. 38
      src/csharp/Grpc.Core/Internal/IServerResponseStream.cs
  4. 97
      src/csharp/Grpc.Core/Internal/ServerCallContextExtraData.cs
  5. 10
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  6. 2
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  7. 101
      src/csharp/Grpc.Core/ServerCallContext.cs

@ -37,22 +37,11 @@ namespace Grpc.Core.Testing
Func<Metadata, Task> writeHeadersFunc, Func<WriteOptions> writeOptionsGetter, Action<WriteOptions> writeOptionsSetter)
{
return new ServerCallContext(null, method, host, deadline, requestHeaders, cancellationToken,
writeHeadersFunc, new WriteOptionsHolder(writeOptionsGetter, writeOptionsSetter),
() => peer, () => authContext, () => contextPropagationToken);
}
private class WriteOptionsHolder : IHasWriteOptions
{
Func<WriteOptions> writeOptionsGetter;
Action<WriteOptions> writeOptionsSetter;
public WriteOptionsHolder(Func<WriteOptions> writeOptionsGetter, Action<WriteOptions> writeOptionsSetter)
{
this.writeOptionsGetter = writeOptionsGetter;
this.writeOptionsSetter = writeOptionsSetter;
}
public WriteOptions WriteOptions { get => writeOptionsGetter(); set => writeOptionsSetter(value); }
(ctx, extraData, headers) => writeHeadersFunc(headers),
(ctx, extraData) => writeOptionsGetter(),
(ctx, extraData, options) => writeOptionsSetter(options),
(ctx, extraData) => peer, (ctx, callHandle) => authContext,
(ctx, callHandle, options) => contextPropagationToken);
}
}
}

@ -18,6 +18,7 @@ using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using Grpc.Core;
using Grpc.Core.Utils;
using Grpc.Core.Profiling;

@ -0,0 +1,38 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core.Internal
{
/// <summary>
/// Exposes non-generic members of <c>ServerReponseStream</c>.
/// </summary>
internal interface IServerResponseStream
{
/// <summary>
/// Asynchronously sends response headers for the current call to the client. See <c>ServerCallContext.WriteResponseHeadersAsync</c> for exact semantics.
/// </summary>
Task WriteResponseHeadersAsync(Metadata responseHeaders);
/// <summary>
/// Gets or sets the write options.
/// </summary>
WriteOptions WriteOptions { get; set; }
}
}

@ -0,0 +1,97 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
{
/// <summary>
/// Additional state for <c>ServerCallContext</c>.
/// Storing the extra state outside of <c>ServerCallContext</c> allows it to be implementation-agnostic.
/// </summary>
internal class ServerCallContextExtraData
{
readonly CallSafeHandle callHandle;
readonly IServerResponseStream serverResponseStream;
readonly Lazy<AuthContext> cachedAuthContext;
public ServerCallContextExtraData(CallSafeHandle callHandle, IServerResponseStream serverResponseStream)
{
this.callHandle = callHandle;
this.serverResponseStream = serverResponseStream;
// TODO(jtattermusch): avoid unnecessary allocation of factory function and the lazy object.
this.cachedAuthContext = new Lazy<AuthContext>(GetAuthContextEager);
}
public ServerCallContext NewServerCallContext(ServerRpcNew newRpc, CancellationToken cancellationToken)
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime();
return new ServerCallContext(this, newRpc.Method, newRpc.Host, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken,
ServerCallContext_WriteHeadersFunc, ServerCallContext_WriteOptionsGetter, ServerCallContext_WriteOptionsSetter,
ServerCallContext_PeerGetter, ServerCallContext_AuthContextGetter, ServerCallContext_ContextPropagationTokenFactory);
}
private AuthContext GetAuthContextEager()
{
using (var authContextNative = callHandle.GetAuthContext())
{
return authContextNative.ToAuthContext();
}
}
// Implementors of ServerCallContext's members are pre-allocated to avoid unneccessary delegate allocations.
readonly static Func<ServerCallContext, object, Metadata, Task> ServerCallContext_WriteHeadersFunc = (ctx, extraData, headers) =>
{
return ((ServerCallContextExtraData)extraData).serverResponseStream.WriteResponseHeadersAsync(headers);
};
readonly static Func<ServerCallContext, object, WriteOptions> ServerCallContext_WriteOptionsGetter = (ctx, extraData) =>
{
return ((ServerCallContextExtraData)extraData).serverResponseStream.WriteOptions;
};
readonly static Action<ServerCallContext, object, WriteOptions> ServerCallContext_WriteOptionsSetter = (ctx, extraData, options) =>
{
((ServerCallContextExtraData)extraData).serverResponseStream.WriteOptions = options;
};
readonly static Func<ServerCallContext, object, string> ServerCallContext_PeerGetter = (ctx, extraData) =>
{
// Getting the peer lazily is fine as the native call is guaranteed
// not to be disposed before user-supplied server side handler returns.
// Most users won't need to read this field anyway.
return ((ServerCallContextExtraData)extraData).callHandle.GetPeer();
};
readonly static Func<ServerCallContext, object, AuthContext> ServerCallContext_AuthContextGetter = (ctx, extraData) =>
{
return ((ServerCallContextExtraData)extraData).cachedAuthContext.Value;
};
readonly static Func<ServerCallContext, object, ContextPropagationOptions, ContextPropagationToken> ServerCallContext_ContextPropagationTokenFactory = (ctx, extraData, options) =>
{
var callHandle = ((ServerCallContextExtraData)extraData).callHandle;
return new ContextPropagationToken(callHandle, ctx.Deadline, ctx.CancellationToken, options);
};
}
}

@ -71,7 +71,7 @@ namespace Grpc.Core.Internal
var response = await handler(request, context).ConfigureAwait(false);
status = context.Status;
responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
}
catch (Exception e)
{
if (!(e is RpcException))
@ -345,14 +345,12 @@ namespace Grpc.Core.Internal
return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
}
public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
where TRequest : class
where TResponse : class
public static ServerCallContext NewContext(ServerRpcNew newRpc, IServerResponseStream serverResponseStream, CancellationToken cancellationToken)
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime();
return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
var contextExtraData = new ServerCallContextExtraData(newRpc.Call, serverResponseStream);
return contextExtraData.NewServerCallContext(newRpc, cancellationToken);
}
}
}

@ -23,7 +23,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Writes responses asynchronously to an underlying AsyncCallServer object.
/// </summary>
internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions
internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IServerResponseStream
where TRequest : class
where TResponse : class
{

@ -21,6 +21,7 @@ using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
@ -29,45 +30,49 @@ namespace Grpc.Core
/// </summary>
public class ServerCallContext
{
private readonly CallSafeHandle callHandle;
private readonly object extraData;
private readonly string method;
private readonly string host;
private readonly DateTime deadline;
private readonly Metadata requestHeaders;
private readonly CancellationToken cancellationToken;
private readonly Metadata responseTrailers = new Metadata();
private readonly Func<Metadata, Task> writeHeadersFunc;
private readonly IHasWriteOptions writeOptionsHolder;
private readonly Lazy<AuthContext> authContext;
private readonly Func<string> testingOnlyPeerGetter;
private readonly Func<AuthContext> testingOnlyAuthContextGetter;
private readonly Func<ContextPropagationToken> testingOnlyContextPropagationTokenFactory;
private readonly Func<ServerCallContext, object, Metadata, Task> writeHeadersFunc;
private readonly Func<ServerCallContext, object, WriteOptions> writeOptionsGetter;
private readonly Action<ServerCallContext, object, WriteOptions> writeOptionsSetter;
private Status status = Status.DefaultSuccess;
private readonly Func<ServerCallContext, object, string> peerGetter;
private readonly Func<ServerCallContext, object, AuthContext> authContextGetter;
private readonly Func<ServerCallContext, object, ContextPropagationOptions, ContextPropagationToken> contextPropagationTokenFactory;
internal ServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
: this(callHandle, method, host, deadline, requestHeaders, cancellationToken, writeHeadersFunc, writeOptionsHolder, null, null, null)
{
}
private Status status = Status.DefaultSuccess;
// Additional constructor params should be used for testing only
internal ServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder,
Func<string> testingOnlyPeerGetter, Func<AuthContext> testingOnlyAuthContextGetter, Func<ContextPropagationToken> testingOnlyContextPropagationTokenFactory)
/// <summary>
/// Creates a new instance of <c>ServerCallContext</c>.
/// To allow reuse of ServerCallContext API by different gRPC implementations, the implementation of some members is provided externally.
/// To provide state, this <c>ServerCallContext</c> instance and <c>extraData</c> will be passed to the member implementations.
/// </summary>
internal ServerCallContext(object extraData,
string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
Func<ServerCallContext, object, Metadata, Task> writeHeadersFunc,
Func<ServerCallContext, object, WriteOptions> writeOptionsGetter,
Action<ServerCallContext, object, WriteOptions> writeOptionsSetter,
Func<ServerCallContext, object, string> peerGetter,
Func<ServerCallContext, object, AuthContext> authContextGetter,
Func<ServerCallContext, object, ContextPropagationOptions, ContextPropagationToken> contextPropagationTokenFactory)
{
this.callHandle = callHandle;
this.extraData = extraData;
this.method = method;
this.host = host;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
this.writeHeadersFunc = writeHeadersFunc;
this.writeOptionsHolder = writeOptionsHolder;
this.authContext = new Lazy<AuthContext>(GetAuthContextEager);
this.testingOnlyPeerGetter = testingOnlyPeerGetter;
this.testingOnlyAuthContextGetter = testingOnlyAuthContextGetter;
this.testingOnlyContextPropagationTokenFactory = testingOnlyContextPropagationTokenFactory;
this.writeHeadersFunc = GrpcPreconditions.CheckNotNull(writeHeadersFunc);
this.writeOptionsGetter = GrpcPreconditions.CheckNotNull(writeOptionsGetter);
this.writeOptionsSetter = GrpcPreconditions.CheckNotNull(writeOptionsSetter);
this.peerGetter = GrpcPreconditions.CheckNotNull(peerGetter);
this.authContextGetter = GrpcPreconditions.CheckNotNull(authContextGetter);
this.contextPropagationTokenFactory = GrpcPreconditions.CheckNotNull(contextPropagationTokenFactory);
}
/// <summary>
@ -79,7 +84,7 @@ namespace Grpc.Core
/// <returns>The task that finished once response headers have been written.</returns>
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
return writeHeadersFunc(responseHeaders);
return writeHeadersFunc(this, extraData, responseHeaders);
}
/// <summary>
@ -87,13 +92,9 @@ namespace Grpc.Core
/// </summary>
public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null)
{
if (testingOnlyContextPropagationTokenFactory != null)
{
return testingOnlyContextPropagationTokenFactory();
}
return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
return contextPropagationTokenFactory(this, extraData, options);
}
/// <summary>Name of method called in this RPC.</summary>
public string Method
{
@ -117,14 +118,7 @@ namespace Grpc.Core
{
get
{
if (testingOnlyPeerGetter != null)
{
return testingOnlyPeerGetter();
}
// Getting the peer lazily is fine as the native call is guaranteed
// not to be disposed before user-supplied server side handler returns.
// Most users won't need to read this field anyway.
return this.callHandle.GetPeer();
return peerGetter(this, extraData);
}
}
@ -187,12 +181,12 @@ namespace Grpc.Core
{
get
{
return writeOptionsHolder.WriteOptions;
return writeOptionsGetter(this, extraData);
}
set
{
writeOptionsHolder.WriteOptions = value;
writeOptionsSetter(this, extraData, value);
}
}
@ -204,31 +198,8 @@ namespace Grpc.Core
{
get
{
if (testingOnlyAuthContextGetter != null)
{
return testingOnlyAuthContextGetter();
}
return authContext.Value;
return authContextGetter(this, extraData);
}
}
private AuthContext GetAuthContextEager()
{
using (var authContextNative = callHandle.GetAuthContext())
{
return authContextNative.ToAuthContext();
}
}
}
/// <summary>
/// Allows sharing write options between ServerCallContext and other objects.
/// </summary>
internal interface IHasWriteOptions
{
/// <summary>
/// Gets or sets the write options.
/// </summary>
WriteOptions WriteOptions { get; set; }
}
}

Loading…
Cancel
Save