Merge remote-tracking branch 'upstream/master' into resolver_channel_args

pull/8462/head
Mark D. Roth 8 years ago
commit 93ca8071f2
  1. 6
      Makefile
  2. 1
      build.yaml
  3. 4
      src/csharp/Grpc.Auth/GoogleAuthInterceptors.cs
  4. 1
      src/csharp/Grpc.Core/DefaultCallInvoker.cs
  5. 1
      src/csharp/Grpc.Core/Grpc.Core.csproj
  6. 1
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  7. 15
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  8. 29
      src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
  9. 2
      src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs
  10. 7
      src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs
  11. 41
      src/csharp/Grpc.Core/Internal/NativeMethods.cs
  12. 85
      src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
  13. 6
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  14. 6
      src/csharp/Grpc.Core/Internal/UnmanagedLibrary.cs
  15. 70
      src/csharp/Grpc.Core/Metadata.cs
  16. 57
      src/csharp/Grpc.Core/Server.cs
  17. 5
      src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
  18. 64
      src/csharp/ext/grpc_csharp_ext.c
  19. 27
      test/cpp/interop/interop_test.cc
  20. 25
      tools/run_tests/run_performance_tests.py
  21. 1
      tools/run_tests/sources_and_headers.json

@ -12205,16 +12205,16 @@ $(BINDIR)/$(CONFIG)/interop_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/interop_test: $(PROTOBUF_DEP) $(INTEROP_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(BINDIR)/$(CONFIG)/interop_test: $(PROTOBUF_DEP) $(INTEROP_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(INTEROP_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/interop_test
$(Q) $(LDXX) $(LDFLAGS) $(INTEROP_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/interop_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/interop/interop_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/interop/interop_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
deps_interop_test: $(INTEROP_TEST_OBJS:.o=.dep)

@ -3039,6 +3039,7 @@ targets:
- grpc
- gpr_test_util
- gpr
- grpc++_test_config
platforms:
- mac
- linux

@ -33,6 +33,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Google.Apis.Auth.OAuth2;
using Grpc.Core;
@ -72,9 +73,10 @@ namespace Grpc.Auth
public static AsyncAuthInterceptor FromAccessToken(string accessToken)
{
GrpcPreconditions.CheckNotNull(accessToken);
return new AsyncAuthInterceptor(async (context, metadata) =>
return new AsyncAuthInterceptor((context, metadata) =>
{
metadata.Add(CreateBearerTokenHeader(accessToken));
return Task.FromResult<object>(null);
});
}

@ -102,6 +102,7 @@ namespace Grpc.Core
return Calls.AsyncDuplexStreamingCall(call);
}
/// <summary>Creates call invocation details for given method.</summary>
protected virtual CallInvocationDetails<TRequest, TResponse> CreateCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options)
where TRequest : class
where TResponse : class

@ -138,6 +138,7 @@
<Compile Include="Internal\CallError.cs" />
<Compile Include="Logging\LogLevel.cs" />
<Compile Include="Logging\LogLevelFilterLogger.cs" />
<Compile Include="Internal\RequestCallContextSafeHandle.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />

@ -59,7 +59,6 @@ namespace Grpc.Core
static ILogger logger = new NullLogger();
readonly object myLock = new object();
readonly GrpcThreadPool threadPool;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();

@ -93,21 +93,6 @@ namespace Grpc.Core.Internal
return data;
}
// Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew(Server server)
{
var call = Native.grpcsharp_batch_context_server_rpc_new_call(this);
var method = Marshal.PtrToStringAnsi(Native.grpcsharp_batch_context_server_rpc_new_method(this));
var host = Marshal.PtrToStringAnsi(Native.grpcsharp_batch_context_server_rpc_new_host(this));
var deadline = Native.grpcsharp_batch_context_server_rpc_new_deadline(this);
IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_server_rpc_new_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
// Gets data of receive_close_on_server completion.
public bool GetReceivedCloseOnServerCancelled()
{

@ -44,6 +44,8 @@ namespace Grpc.Core.Internal
internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
internal delegate void RequestCallCompletionDelegate(bool success, RequestCallContextSafeHandle ctx);
internal class CompletionRegistry
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>();
@ -68,6 +70,12 @@ namespace Grpc.Core.Internal
Register(ctx.Handle, opCallback);
}
public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
{
OpCompletionDelegate opCallback = ((success) => HandleRequestCallCompletion(success, ctx, callback));
Register(ctx.Handle, opCallback);
}
public OpCompletionDelegate Extract(IntPtr key)
{
OpCompletionDelegate value;
@ -84,7 +92,26 @@ namespace Grpc.Core.Internal
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking completion delegate.");
Logger.Error(e, "Exception occured while invoking batch completion delegate.");
}
finally
{
if (ctx != null)
{
ctx.Dispose();
}
}
}
private static void HandleRequestCallCompletion(bool success, RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
{
try
{
callback(success, ctx);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking request call completion delegate.");
}
finally
{

@ -48,7 +48,7 @@ namespace Grpc.Core.Internal
readonly Func<CallOptions, CallOptions> callOptionsInterceptor;
/// <summary>
/// Initializes a new instance of the <see cref="Grpc.Core.InterceptingCallInvoker"/> class.
/// Initializes a new instance of the <see cref="Grpc.Core.Internal.InterceptingCallInvoker"/> class.
/// </summary>
public InterceptingCallInvoker(CallInvoker callInvoker,
Func<string, string> hostInterceptor = null,

@ -78,7 +78,10 @@ namespace Grpc.Core.Internal
{
var context = new AuthInterceptorContext(Marshal.PtrToStringAnsi(serviceUrlPtr),
Marshal.PtrToStringAnsi(methodNamePtr));
StartGetMetadata(context, callbackPtr, userDataPtr);
// Don't await, we are in a native callback and need to return.
#pragma warning disable 4014
GetMetadataAsync(context, callbackPtr, userDataPtr);
#pragma warning restore 4014
}
catch (Exception e)
{
@ -87,7 +90,7 @@ namespace Grpc.Core.Internal
}
}
private async Task StartGetMetadata(AuthInterceptorContext context, IntPtr callbackPtr, IntPtr userDataPtr)
private async Task GetMetadataAsync(AuthInterceptorContext context, IntPtr callbackPtr, IntPtr userDataPtr)
{
try
{

@ -64,14 +64,17 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate grpcsharp_batch_context_recv_status_on_client_status;
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details;
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_call_delegate grpcsharp_batch_context_server_rpc_new_call;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_method_delegate grpcsharp_batch_context_server_rpc_new_method;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_host_delegate grpcsharp_batch_context_server_rpc_new_host;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_deadline_delegate grpcsharp_batch_context_server_rpc_new_deadline;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_request_metadata_delegate grpcsharp_batch_context_server_rpc_new_request_metadata;
public readonly Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate grpcsharp_batch_context_recv_close_on_server_cancelled;
public readonly Delegates.grpcsharp_batch_context_destroy_delegate grpcsharp_batch_context_destroy;
public readonly Delegates.grpcsharp_request_call_context_create_delegate grpcsharp_request_call_context_create;
public readonly Delegates.grpcsharp_request_call_context_call_delegate grpcsharp_request_call_context_call;
public readonly Delegates.grpcsharp_request_call_context_method_delegate grpcsharp_request_call_context_method;
public readonly Delegates.grpcsharp_request_call_context_host_delegate grpcsharp_request_call_context_host;
public readonly Delegates.grpcsharp_request_call_context_deadline_delegate grpcsharp_request_call_context_deadline;
public readonly Delegates.grpcsharp_request_call_context_request_metadata_delegate grpcsharp_request_call_context_request_metadata;
public readonly Delegates.grpcsharp_request_call_context_destroy_delegate grpcsharp_request_call_context_destroy;
public readonly Delegates.grpcsharp_composite_call_credentials_create_delegate grpcsharp_composite_call_credentials_create;
public readonly Delegates.grpcsharp_call_credentials_release_delegate grpcsharp_call_credentials_release;
@ -170,14 +173,17 @@ namespace Grpc.Core.Internal
this.grpcsharp_batch_context_recv_status_on_client_status = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate>(library);
this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate>(library);
this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_call = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_call_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_method = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_method_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_host = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_host_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_deadline = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_deadline_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_request_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_request_metadata_delegate>(library);
this.grpcsharp_batch_context_recv_close_on_server_cancelled = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate>(library);
this.grpcsharp_batch_context_destroy = GetMethodDelegate<Delegates.grpcsharp_batch_context_destroy_delegate>(library);
this.grpcsharp_request_call_context_create = GetMethodDelegate<Delegates.grpcsharp_request_call_context_create_delegate>(library);
this.grpcsharp_request_call_context_call = GetMethodDelegate<Delegates.grpcsharp_request_call_context_call_delegate>(library);
this.grpcsharp_request_call_context_method = GetMethodDelegate<Delegates.grpcsharp_request_call_context_method_delegate>(library);
this.grpcsharp_request_call_context_host = GetMethodDelegate<Delegates.grpcsharp_request_call_context_host_delegate>(library);
this.grpcsharp_request_call_context_deadline = GetMethodDelegate<Delegates.grpcsharp_request_call_context_deadline_delegate>(library);
this.grpcsharp_request_call_context_request_metadata = GetMethodDelegate<Delegates.grpcsharp_request_call_context_request_metadata_delegate>(library);
this.grpcsharp_request_call_context_destroy = GetMethodDelegate<Delegates.grpcsharp_request_call_context_destroy_delegate>(library);
this.grpcsharp_composite_call_credentials_create = GetMethodDelegate<Delegates.grpcsharp_composite_call_credentials_create_delegate>(library);
this.grpcsharp_call_credentials_release = GetMethodDelegate<Delegates.grpcsharp_call_credentials_release_delegate>(library);
@ -302,14 +308,17 @@ namespace Grpc.Core.Internal
public delegate StatusCode grpcsharp_batch_context_recv_status_on_client_status_delegate(BatchContextSafeHandle ctx);
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx); // returns const char*
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx);
public delegate CallSafeHandle grpcsharp_batch_context_server_rpc_new_call_delegate(BatchContextSafeHandle ctx);
public delegate IntPtr grpcsharp_batch_context_server_rpc_new_method_delegate(BatchContextSafeHandle ctx); // returns const char*
public delegate IntPtr grpcsharp_batch_context_server_rpc_new_host_delegate(BatchContextSafeHandle ctx); // returns const char*
public delegate Timespec grpcsharp_batch_context_server_rpc_new_deadline_delegate(BatchContextSafeHandle ctx);
public delegate IntPtr grpcsharp_batch_context_server_rpc_new_request_metadata_delegate(BatchContextSafeHandle ctx);
public delegate int grpcsharp_batch_context_recv_close_on_server_cancelled_delegate(BatchContextSafeHandle ctx);
public delegate void grpcsharp_batch_context_destroy_delegate(IntPtr ctx);
public delegate RequestCallContextSafeHandle grpcsharp_request_call_context_create_delegate();
public delegate CallSafeHandle grpcsharp_request_call_context_call_delegate(RequestCallContextSafeHandle ctx);
public delegate IntPtr grpcsharp_request_call_context_method_delegate(RequestCallContextSafeHandle ctx); // returns const char*
public delegate IntPtr grpcsharp_request_call_context_host_delegate(RequestCallContextSafeHandle ctx); // returns const char*
public delegate Timespec grpcsharp_request_call_context_deadline_delegate(RequestCallContextSafeHandle ctx);
public delegate IntPtr grpcsharp_request_call_context_request_metadata_delegate(RequestCallContextSafeHandle ctx);
public delegate void grpcsharp_request_call_context_destroy_delegate(IntPtr ctx);
public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2);
public delegate void grpcsharp_call_credentials_release_delegate(IntPtr credentials);
@ -393,7 +402,7 @@ namespace Grpc.Core.Internal
public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, RequestCallContextSafeHandle ctx);
public delegate void grpcsharp_server_cancel_all_calls_delegate(ServerSafeHandle server);
public delegate void grpcsharp_server_shutdown_and_notify_callback_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
public delegate void grpcsharp_server_destroy_delegate(IntPtr server);

@ -0,0 +1,85 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using Grpc.Core;
namespace Grpc.Core.Internal
{
/// <summary>
/// grpcsharp_request_call_context
/// </summary>
internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid
{
static readonly NativeMethods Native = NativeMethods.Get();
private RequestCallContextSafeHandle()
{
}
public static RequestCallContextSafeHandle Create()
{
return Native.grpcsharp_request_call_context_create();
}
public IntPtr Handle
{
get
{
return handle;
}
}
// Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew(Server server)
{
var call = Native.grpcsharp_request_call_context_call(this);
var method = Marshal.PtrToStringAnsi(Native.grpcsharp_request_call_context_method(this));
var host = Marshal.PtrToStringAnsi(Native.grpcsharp_request_call_context_host(this));
var deadline = Native.grpcsharp_request_call_context_deadline(this);
IntPtr metadataArrayPtr = Native.grpcsharp_request_call_context_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
protected override bool ReleaseHandle()
{
Native.grpcsharp_request_call_context_destroy(handle);
return true;
}
}
}

@ -85,12 +85,12 @@ namespace Grpc.Core.Internal
}
}
public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
public void RequestCall(RequestCallCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
var ctx = RequestCallContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterRequestCallCompletion(ctx, callback);
Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
}
}

@ -134,7 +134,11 @@ namespace Grpc.Core.Internal
{
throw new MissingMethodException(string.Format("The native method \"{0}\" does not exist", methodName));
}
return Marshal.GetDelegateForFunctionPointer(ptr, typeof(T)) as T;
#if NETSTANDARD1_5
return Marshal.GetDelegateForFunctionPointer<T>(ptr); // non-generic version is obsolete
#else
return Marshal.GetDelegateForFunctionPointer(ptr, typeof(T)) as T; // generic version not available in .NET45
#endif
}
/// <summary>

@ -95,11 +95,18 @@ namespace Grpc.Core
#region IList members
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public int IndexOf(Metadata.Entry item)
{
return entries.IndexOf(item);
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public void Insert(int index, Metadata.Entry item)
{
GrpcPreconditions.CheckNotNull(item);
@ -107,12 +114,18 @@ namespace Grpc.Core
entries.Insert(index, item);
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public void RemoveAt(int index)
{
CheckWriteable();
entries.RemoveAt(index);
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public Metadata.Entry this[int index]
{
get
@ -128,6 +141,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public void Add(Metadata.Entry item)
{
GrpcPreconditions.CheckNotNull(item);
@ -135,48 +151,75 @@ namespace Grpc.Core
entries.Add(item);
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public void Add(string key, string value)
{
Add(new Entry(key, value));
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public void Add(string key, byte[] valueBytes)
{
Add(new Entry(key, valueBytes));
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public void Clear()
{
CheckWriteable();
entries.Clear();
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public bool Contains(Metadata.Entry item)
{
return entries.Contains(item);
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public void CopyTo(Metadata.Entry[] array, int arrayIndex)
{
entries.CopyTo(array, arrayIndex);
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public int Count
{
get { return entries.Count; }
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public bool IsReadOnly
{
get { return readOnly; }
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public bool Remove(Metadata.Entry item)
{
CheckWriteable();
return entries.Remove(item);
}
/// <summary>
/// <see cref="T:IList`1"/>
/// </summary>
public IEnumerator<Metadata.Entry> GetEnumerator()
{
return entries.GetEnumerator();
@ -221,7 +264,7 @@ namespace Grpc.Core
public Entry(string key, byte[] valueBytes)
{
this.key = NormalizeKey(key);
GrpcPreconditions.CheckArgument(this.key.EndsWith(BinaryHeaderSuffix),
GrpcPreconditions.CheckArgument(HasBinaryHeaderSuffix(this.key),
"Key for binary valued metadata entry needs to have suffix indicating binary value.");
this.value = null;
GrpcPreconditions.CheckNotNull(valueBytes, "valueBytes");
@ -237,7 +280,7 @@ namespace Grpc.Core
public Entry(string key, string value)
{
this.key = NormalizeKey(key);
GrpcPreconditions.CheckArgument(!this.key.EndsWith(BinaryHeaderSuffix),
GrpcPreconditions.CheckArgument(!HasBinaryHeaderSuffix(this.key),
"Key for ASCII valued metadata entry cannot have suffix indicating binary value.");
this.value = GrpcPreconditions.CheckNotNull(value, "value");
this.valueBytes = null;
@ -324,7 +367,7 @@ namespace Grpc.Core
/// </summary>
internal static Entry CreateUnsafe(string key, byte[] valueBytes)
{
if (key.EndsWith(BinaryHeaderSuffix))
if (HasBinaryHeaderSuffix(key))
{
return new Entry(key, null, valueBytes);
}
@ -338,6 +381,27 @@ namespace Grpc.Core
"Metadata entry key not valid. Keys can only contain lowercase alphanumeric characters, underscores and hyphens.");
return normalized;
}
/// <summary>
/// Returns <c>true</c> if the key has "-bin" binary header suffix.
/// </summary>
private static bool HasBinaryHeaderSuffix(string key)
{
// We don't use just string.EndsWith because its implementation is extremely slow
// on CoreCLR and we've seen significant differences in gRPC benchmarks caused by it.
// See https://github.com/dotnet/coreclr/issues/5612
int len = key.Length;
if (len >= 4 &&
key[len - 4] == '-' &&
key[len - 3] == 'b' &&
key[len - 2] == 'i' &&
key[len - 1] == 'n')
{
return true;
}
return false;
}
}
}
}

@ -47,7 +47,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
const int InitialAllowRpcTokenCountPerCq = 10;
const int DefaultRequestCallTokensPerCq = 2000;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@ -66,7 +66,7 @@ namespace Grpc.Core
bool startRequested;
volatile bool shutdownRequested;
int requestCallTokensPerCq = DefaultRequestCallTokensPerCq;
/// <summary>
/// Creates a new server.
@ -132,6 +132,27 @@ namespace Grpc.Core
}
}
/// <summary>
/// Experimental API. Might anytime change without prior notice.
/// Number or calls requested via grpc_server_request_call at any given time for each completion queue.
/// </summary>
public int RequestCallTokensPerCompletionQueue
{
get
{
return requestCallTokensPerCq;
}
set
{
lock (myLock)
{
GrpcPreconditions.CheckState(!startRequested);
GrpcPreconditions.CheckArgument(value > 0);
requestCallTokensPerCq = value;
}
}
}
/// <summary>
/// Starts the server.
/// </summary>
@ -145,9 +166,7 @@ namespace Grpc.Core
handle.Start();
// Starting with more than one AllowOneRpc tokens can significantly increase
// unary RPC throughput.
for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
for (int i = 0; i < requestCallTokensPerCq; i++)
{
foreach (var cq in environment.CompletionQueues)
{
@ -310,7 +329,7 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action continuation)
{
try
{
@ -325,25 +344,41 @@ namespace Grpc.Core
{
Logger.Warning(e, "Exception while handling RPC.");
}
if (continuation != null)
{
continuation();
}
}
/// <summary>
/// Handles the native callback.
/// </summary>
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
private void HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{
Task.Run(() => AllowOneRpc(cq));
bool nextRpcRequested = false;
if (success)
{
ServerRpcNew newRpc = ctx.GetServerRpcNew(this);
var newRpc = ctx.GetServerRpcNew(this);
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
HandleCallAsync(newRpc, cq); // we don't need to await.
nextRpcRequested = true;
// Start asynchronous handler for the call.
// Don't await, the continuations will run on gRPC thread pool once triggered
// by cq.Next().
#pragma warning disable 4014
HandleCallAsync(newRpc, cq, () => AllowOneRpc(cq));
#pragma warning restore 4014
}
}
if (!nextRpcRequested)
{
AllowOneRpc(cq);
}
}
/// <summary>

@ -76,6 +76,11 @@ namespace Grpc.IntegrationTesting
private async Task RunAsync()
{
// (ThreadPoolSize == ProcessorCount) gives best throughput in benchmarks
// and doesn't seem to harm performance even when server and client
// are running on the same machine.
GrpcEnvironment.SetThreadPoolSize(Environment.ProcessorCount);
string host = "0.0.0.0";
int port = options.DriverPort;

@ -84,11 +84,6 @@ typedef struct grpcsharp_batch_context {
size_t status_details_capacity;
} recv_status_on_client;
int recv_close_on_server_cancelled;
struct {
grpc_call *call;
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} server_rpc_new;
} grpcsharp_batch_context;
GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create() {
@ -97,6 +92,18 @@ GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create(
return ctx;
}
typedef struct {
grpc_call *call;
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} grpcsharp_request_call_context;
GPR_EXPORT grpcsharp_request_call_context *GPR_CALLTYPE grpcsharp_request_call_context_create() {
grpcsharp_request_call_context *ctx = gpr_malloc(sizeof(grpcsharp_request_call_context));
memset(ctx, 0, sizeof(grpcsharp_request_call_context));
return ctx;
}
/*
* Destroys array->metadata.
* The array pointer itself is not freed.
@ -230,13 +237,20 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_destroy(grpcsharp_batch_con
&(ctx->recv_status_on_client.trailing_metadata));
gpr_free((void *)ctx->recv_status_on_client.status_details);
gpr_free(ctx);
}
GPR_EXPORT void GPR_CALLTYPE grpcsharp_request_call_context_destroy(grpcsharp_request_call_context *ctx) {
if (!ctx) {
return;
}
/* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is
supposed
to take its ownership. */
grpc_call_details_destroy(&(ctx->server_rpc_new.call_details));
grpc_call_details_destroy(&(ctx->call_details));
grpcsharp_metadata_array_destroy_metadata_only(
&(ctx->server_rpc_new.request_metadata));
&(ctx->request_metadata));
gpr_free(ctx);
}
@ -303,32 +317,32 @@ grpcsharp_batch_context_recv_status_on_client_trailing_metadata(
return &(ctx->recv_status_on_client.trailing_metadata);
}
GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_call(
const grpcsharp_batch_context *ctx) {
return ctx->server_rpc_new.call;
GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_request_call_context_call(
const grpcsharp_request_call_context *ctx) {
return ctx->call;
}
GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_batch_context_server_rpc_new_method(
const grpcsharp_batch_context *ctx) {
return ctx->server_rpc_new.call_details.method;
grpcsharp_request_call_context_method(
const grpcsharp_request_call_context *ctx) {
return ctx->call_details.method;
}
GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_host(
const grpcsharp_batch_context *ctx) {
return ctx->server_rpc_new.call_details.host;
GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_request_call_context_host(
const grpcsharp_request_call_context *ctx) {
return ctx->call_details.host;
}
GPR_EXPORT gpr_timespec GPR_CALLTYPE
grpcsharp_batch_context_server_rpc_new_deadline(
const grpcsharp_batch_context *ctx) {
return ctx->server_rpc_new.call_details.deadline;
grpcsharp_request_call_context_deadline(
const grpcsharp_request_call_context *ctx) {
return ctx->call_details.deadline;
}
GPR_EXPORT const grpc_metadata_array *GPR_CALLTYPE
grpcsharp_batch_context_server_rpc_new_request_metadata(
const grpcsharp_batch_context *ctx) {
return &(ctx->server_rpc_new.request_metadata);
grpcsharp_request_call_context_request_metadata(
const grpcsharp_request_call_context *ctx) {
return &(ctx->request_metadata);
}
GPR_EXPORT int32_t GPR_CALLTYPE
@ -853,10 +867,10 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
grpcsharp_batch_context *ctx) {
grpcsharp_request_call_context *ctx) {
return grpc_server_request_call(
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, cq, ctx);
server, &(ctx->call), &(ctx->call_details),
&(ctx->request_metadata), cq, cq, ctx);
}
/* Security */

@ -44,17 +44,21 @@
#include <sys/wait.h>
#include <unistd.h>
#include <gflags/gflags.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "test/core/util/port.h"
#include "test/cpp/util/test_config.h"
extern "C" {
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/support/string.h"
}
DEFINE_string(extra_server_flags, "", "Extra flags to pass to server.");
int test_client(const char* root, const char* host, int port) {
int status;
pid_t cli;
@ -80,6 +84,7 @@ int test_client(const char* root, const char* host, int port) {
}
int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
char* me = argv[0];
char* lslash = strrchr(me, '/');
char root[1024];
@ -105,15 +110,19 @@ int main(int argc, char** argv) {
/* start the server */
svr = fork();
if (svr == 0) {
char* binary_path;
char* port_arg;
gpr_asprintf(&binary_path, "%s/interop_server", root);
gpr_asprintf(&port_arg, "--port=%d", port);
execl(binary_path, binary_path, port_arg, NULL);
gpr_free(binary_path);
gpr_free(port_arg);
const size_t num_args = 3 + !FLAGS_extra_server_flags.empty();
char** args = (char**)gpr_malloc(sizeof(char*) * num_args);
memset(args, 0, sizeof(char*) * num_args);
gpr_asprintf(&args[0], "%s/interop_server", root);
gpr_asprintf(&args[1], "--port=%d", port);
if (!FLAGS_extra_server_flags.empty()) {
args[2] = gpr_strdup(FLAGS_extra_server_flags.c_str());
}
execv(args[0], args);
for (size_t i = 0; i < num_args - 1; ++i) {
gpr_free(args[i]);
}
gpr_free(args);
return 1;
}
/* wait a little */

@ -91,12 +91,11 @@ def create_qpsworker_job(language, shortname=None,
else:
host_and_port='localhost:%s' % port
# TODO(jtattermusch): with some care, we can calculate the right timeout
# of a worker from the sum of warmup + benchmark times for all the scenarios
jobspec = jobset.JobSpec(
cmdline=cmdline,
shortname=shortname,
timeout_seconds=2*60*60)
timeout_seconds=5*60, # workers get restarted after each scenario
verbose_success=True)
return QpsWorkerJob(jobspec, language, host_and_port)
@ -357,6 +356,7 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
def finish_qps_workers(jobs):
"""Waits for given jobs to finish and eventually kills them."""
retries = 0
num_killed = 0
while any(job.is_running() for job in jobs):
for job in qpsworker_jobs:
if job.is_running():
@ -365,10 +365,11 @@ def finish_qps_workers(jobs):
print('Killing all QPS workers.')
for job in jobs:
job.kill()
num_killed += 1
retries += 1
time.sleep(3)
print('All QPS workers finished.')
return num_killed
argp = argparse.ArgumentParser(description='Run performance tests.')
argp.add_argument('-l', '--language',
@ -450,6 +451,8 @@ scenarios = create_scenarios(languages,
if not scenarios:
raise Exception('No scenarios to run')
total_scenario_failures = 0
qps_workers_killed = 0
for scenario in scenarios:
if args.dry_run:
print(scenario.name)
@ -457,8 +460,14 @@ for scenario in scenarios:
try:
for worker in scenario.workers:
worker.start()
jobset.run([scenario.jobspec,
create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)],
newline_on_success=True, maxjobs=1)
scenario_failures, _ = jobset.run([scenario.jobspec,
create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)],
newline_on_success=True, maxjobs=1)
total_scenario_failures += scenario_failures
finally:
finish_qps_workers(scenario.workers)
# Consider qps workers that need to be killed as failures
qps_workers_killed += finish_qps_workers(scenario.workers)
if total_scenario_failures > 0 or qps_workers_killed > 0:
print ("%s scenarios failed and %s qps worker jobs killed" % (total_scenario_failures, qps_workers_killed))
sys.exit(1)

@ -2695,6 +2695,7 @@
"gpr",
"gpr_test_util",
"grpc",
"grpc++_test_config",
"grpc_test_util"
],
"headers": [],

Loading…
Cancel
Save