Merge pull request #21365 from JamesNK/mgravell/avoid-delegate-spawn

c# - reduce delegate allocations in the Async*Call API
pull/21388/head
Jan Tattermusch 5 years ago committed by GitHub
commit f818a0356e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 98
      src/csharp/Grpc.Core.Api/AsyncCallState.cs
  2. 41
      src/csharp/Grpc.Core.Api/AsyncClientStreamingCall.cs
  3. 41
      src/csharp/Grpc.Core.Api/AsyncDuplexStreamingCall.cs
  4. 38
      src/csharp/Grpc.Core.Api/AsyncServerStreamingCall.cs
  5. 38
      src/csharp/Grpc.Core.Api/AsyncUnaryCall.cs
  6. 82
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallStateTest.cs
  7. 29
      src/csharp/Grpc.Core/Calls.cs
  8. 1
      src/csharp/tests.json

@ -0,0 +1,98 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading.Tasks;
namespace Grpc.Core
{
/// <summary>
/// Provides an abstraction over the callback providers
/// used by AsyncUnaryCall, AsyncDuplexStreamingCall, etc
/// </summary>
internal struct AsyncCallState
{
readonly object responseHeadersAsync; // Task<Metadata> or Func<object, Task<Metadata>>
readonly object getStatusFunc; // Func<Status> or Func<object, Status>
readonly object getTrailersFunc; // Func<Metadata> or Func<object, Metadata>
readonly object disposeAction; // Action or Action<object>
readonly object callbackState; // arg0 for the callbacks above, if needed
internal AsyncCallState(
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object callbackState)
{
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callbackState = callbackState;
}
internal AsyncCallState(
Task<Metadata> responseHeadersAsync,
Func<Status> getStatusFunc,
Func<Metadata> getTrailersFunc,
Action disposeAction)
{
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callbackState = null;
}
internal Task<Metadata> ResponseHeadersAsync()
{
var withState = responseHeadersAsync as Func<object, Task<Metadata>>;
return withState != null ? withState(callbackState)
: (Task<Metadata>)responseHeadersAsync;
}
internal Status GetStatus()
{
var withState = getStatusFunc as Func<object, Status>;
return withState != null ? withState(callbackState)
: ((Func<Status>)getStatusFunc)();
}
internal Metadata GetTrailers()
{
var withState = getTrailersFunc as Func<object, Metadata>;
return withState != null ? withState(callbackState)
: ((Func<Metadata>)getTrailersFunc)();
}
internal void Dispose()
{
var withState = disposeAction as Action<object>;
if (withState != null)
{
withState(callbackState);
}
else
{
((Action)disposeAction)();
}
}
}
}

@ -31,10 +31,7 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> responseAsync;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;
/// <summary>
/// Creates a new AsyncClientStreamingCall object with the specified properties.
@ -54,10 +51,30 @@ namespace Grpc.Core
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}
/// <summary>
/// Creates a new AsyncClientStreamingCall object with the specified properties.
/// </summary>
/// <param name="requestStream">Stream of request values.</param>
/// <param name="responseAsync">The response of the asynchronous call.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream,
Task<TResponse> responseAsync,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}
/// <summary>
@ -78,7 +95,7 @@ namespace Grpc.Core
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}
@ -108,7 +125,7 @@ namespace Grpc.Core
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}
/// <summary>
@ -117,7 +134,7 @@ namespace Grpc.Core
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}
/// <summary>
@ -132,7 +149,7 @@ namespace Grpc.Core
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}

@ -30,10 +30,7 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
@ -53,10 +50,30 @@ namespace Grpc.Core
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
/// </summary>
/// <param name="requestStream">Stream of request values.</param>
/// <param name="responseStream">Stream of response values.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream,
IAsyncStreamReader<TResponse> responseStream,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}
/// <summary>
@ -88,7 +105,7 @@ namespace Grpc.Core
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}
@ -98,7 +115,7 @@ namespace Grpc.Core
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}
/// <summary>
@ -107,7 +124,7 @@ namespace Grpc.Core
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}
/// <summary>
@ -122,7 +139,7 @@ namespace Grpc.Core
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}

@ -28,10 +28,7 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
@ -48,10 +45,27 @@ namespace Grpc.Core
Action disposeAction)
{
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
/// </summary>
/// <param name="responseStream">Stream of response values.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.responseStream = responseStream;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}
/// <summary>
@ -72,7 +86,7 @@ namespace Grpc.Core
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}
@ -82,7 +96,7 @@ namespace Grpc.Core
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}
/// <summary>
@ -91,7 +105,7 @@ namespace Grpc.Core
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}
/// <summary>
@ -106,7 +120,7 @@ namespace Grpc.Core
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}

@ -29,10 +29,7 @@ namespace Grpc.Core
public sealed class AsyncUnaryCall<TResponse> : IDisposable
{
readonly Task<TResponse> responseAsync;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;
/// <summary>
@ -50,10 +47,27 @@ namespace Grpc.Core
Action disposeAction)
{
this.responseAsync = responseAsync;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}
/// <summary>
/// Creates a new AsyncUnaryCall object with the specified properties.
/// </summary>
/// <param name="responseAsync">The response of the asynchronous call.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncUnaryCall(Task<TResponse> responseAsync,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.responseAsync = responseAsync;
callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}
/// <summary>
@ -74,7 +88,7 @@ namespace Grpc.Core
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}
@ -92,7 +106,7 @@ namespace Grpc.Core
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}
/// <summary>
@ -101,7 +115,7 @@ namespace Grpc.Core
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}
/// <summary>
@ -116,7 +130,7 @@ namespace Grpc.Core
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}

@ -0,0 +1,82 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System.Threading.Tasks;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class AsyncCallStateTest
{
[Test]
public void Stateless()
{
bool disposed = false;
Task<Metadata> responseHeaders = Task.FromResult(new Metadata());
Metadata trailers = new Metadata();
var state = new AsyncCallState(responseHeaders, () => new Status(StatusCode.DataLoss, "oops"),
() => trailers, () => disposed = true);
Assert.AreSame(responseHeaders, state.ResponseHeadersAsync());
var status = state.GetStatus();
Assert.AreEqual(StatusCode.DataLoss, status.StatusCode);
Assert.AreEqual("oops", status.Detail);
Assert.AreSame(trailers, state.GetTrailers());
Assert.False(disposed);
state.Dispose();
Assert.True(disposed);
}
class State
{
public bool disposed = false;
public Task<Metadata> responseHeaders = Task.FromResult(new Metadata());
public Metadata trailers = new Metadata();
public Status status = new Status(StatusCode.DataLoss, "oops");
public void Dispose() { disposed = true; }
}
[Test]
public void WithState()
{
var callbackState = new State();
var state = new AsyncCallState(
obj => ((State)obj).responseHeaders,
obj => ((State)obj).status,
obj => ((State)obj).trailers,
obj => ((State)obj).Dispose(),
callbackState);
Assert.AreSame(callbackState.responseHeaders, state.ResponseHeadersAsync());
var status = state.GetStatus();
Assert.AreEqual(StatusCode.DataLoss, status.StatusCode);
Assert.AreEqual("oops", status.Detail);
Assert.AreSame(callbackState.trailers, state.GetTrailers());
Assert.False(callbackState.disposed);
state.Dispose();
Assert.True(callbackState.disposed);
}
}
}

@ -16,6 +16,7 @@
#endregion
using System;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@ -59,7 +60,10 @@ namespace Grpc.Core
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
var asyncResult = asyncCall.UnaryCallAsync(req);
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncUnaryCall<TResponse>(asyncResult,
Callbacks<TRequest, TResponse>.GetHeaders, Callbacks<TRequest, TResponse>.GetStatus,
Callbacks<TRequest, TResponse>.GetTrailers, Callbacks<TRequest, TResponse>.Cancel,
asyncCall);
}
/// <summary>
@ -78,7 +82,10 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
asyncCall.StartServerStreamingCall(req);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncServerStreamingCall<TResponse>(responseStream,
Callbacks<TRequest, TResponse>.GetHeaders, Callbacks<TRequest, TResponse>.GetStatus,
Callbacks<TRequest, TResponse>.GetTrailers, Callbacks<TRequest, TResponse>.Cancel,
asyncCall);
}
/// <summary>
@ -96,7 +103,10 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask,
Callbacks<TRequest, TResponse>.GetHeaders, Callbacks<TRequest, TResponse>.GetStatus,
Callbacks<TRequest, TResponse>.GetTrailers, Callbacks<TRequest, TResponse>.Cancel,
asyncCall);
}
/// <summary>
@ -116,7 +126,18 @@ namespace Grpc.Core
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream,
Callbacks<TRequest, TResponse>.GetHeaders, Callbacks<TRequest, TResponse>.GetStatus,
Callbacks<TRequest, TResponse>.GetTrailers, Callbacks<TRequest, TResponse>.Cancel,
asyncCall);
}
private static class Callbacks<TRequest, TResponse>
{
internal static readonly Func<object, Task<Metadata>> GetHeaders = state => ((AsyncCall<TRequest, TResponse>)state).ResponseHeadersAsync;
internal static readonly Func<object, Status> GetStatus = state => ((AsyncCall<TRequest, TResponse>)state).GetStatus();
internal static readonly Func<object, Metadata> GetTrailers = state => ((AsyncCall<TRequest, TResponse>)state).GetTrailers();
internal static readonly Action<object> Cancel = state => ((AsyncCall<TRequest, TResponse>)state).Cancel();
}
}
}

@ -3,6 +3,7 @@
"Grpc.Core.Interceptors.Tests.ClientInterceptorTest",
"Grpc.Core.Interceptors.Tests.ServerInterceptorTest",
"Grpc.Core.Internal.Tests.AsyncCallServerTest",
"Grpc.Core.Internal.Tests.AsyncCallStateTest",
"Grpc.Core.Internal.Tests.AsyncCallTest",
"Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
"Grpc.Core.Internal.Tests.CompletionQueueEventTest",

Loading…
Cancel
Save