Merge branch 'master' of https://github.com/grpc/grpc into grpc_to_grpc_impl_async_callback

pull/19517/head
Moiz Haidry 5 years ago
commit a67cd9c362
  1. 3
      .gitignore
  2. 19
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  3. 23
      src/core/lib/gprpp/host_port.cc
  4. 4
      src/core/lib/iomgr/iocp_windows.cc
  5. 2
      src/core/lib/iomgr/socket_windows.h
  6. 10
      src/core/lib/iomgr/tcp_posix.cc
  7. 14
      src/core/lib/iomgr/tcp_windows.cc
  8. 66
      src/csharp/Grpc.Microbenchmarks/CommonThreadedBase.cs
  9. 56
      src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs
  10. 69
      src/csharp/Grpc.Microbenchmarks/GCStats.cs
  11. 8
      src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj
  12. 38
      src/csharp/Grpc.Microbenchmarks/PInvokeByteArrayBenchmark.cs
  13. 102
      src/csharp/Grpc.Microbenchmarks/PingBenchmark.cs
  14. 89
      src/csharp/Grpc.Microbenchmarks/Program.cs
  15. 41
      src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
  16. 65
      src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs
  17. 70
      src/csharp/Grpc.Microbenchmarks/Utf8Decode.cs
  18. 126
      src/csharp/Grpc.Microbenchmarks/Utf8Encode.cs
  19. 2
      test/core/gprpp/host_port_test.cc
  20. 2
      tools/interop_matrix/client_matrix.py

3
.gitignore vendored

@ -146,3 +146,6 @@ bm_*.json
# Clion artifacts
cmake-build-debug/
# Benchmark outputs
BenchmarkDotNet.Artifacts/

@ -459,12 +459,13 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
connect_done_ = true;
GPR_ASSERT(wsa_connect_error_ == 0);
if (error == GRPC_ERROR_NONE) {
DWORD transfered_bytes = 0;
DWORD transferred_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(
grpc_winsocket_wrapped_socket(winsocket_),
&winsocket_->write_info.overlapped, &transfered_bytes, FALSE, &flags);
GPR_ASSERT(transfered_bytes == 0);
BOOL wsa_success =
WSAGetOverlappedResult(grpc_winsocket_wrapped_socket(winsocket_),
&winsocket_->write_info.overlapped,
&transferred_bytes, FALSE, &flags);
GPR_ASSERT(transferred_bytes == 0);
if (!wsa_success) {
wsa_connect_error_ = WSAGetLastError();
char* msg = gpr_format_message(wsa_connect_error_);
@ -620,8 +621,8 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
}
}
if (error == GRPC_ERROR_NONE) {
read_buf_ = grpc_slice_sub_no_ref(read_buf_, 0,
winsocket_->read_info.bytes_transfered);
read_buf_ = grpc_slice_sub_no_ref(
read_buf_, 0, winsocket_->read_info.bytes_transferred);
read_buf_has_data_ = true;
} else {
grpc_slice_unref_internal(read_buf_);
@ -657,9 +658,9 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
if (error == GRPC_ERROR_NONE) {
tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
write_buf_ = grpc_slice_sub_no_ref(
write_buf_, 0, winsocket_->write_info.bytes_transfered);
write_buf_, 0, winsocket_->write_info.bytes_transferred);
GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d",
GetName(), winsocket_->write_info.bytes_transfered);
GetName(), winsocket_->write_info.bytes_transferred);
} else {
grpc_slice_unref_internal(write_buf_);
write_buf_ = grpc_empty_slice();

@ -44,7 +44,10 @@ int JoinHostPort(UniquePtr<char>* out, const char* host, int port) {
return ret;
}
bool SplitHostPort(StringView name, StringView* host, StringView* port) {
namespace {
bool DoSplitHostPort(StringView name, StringView* host, StringView* port,
bool* has_port) {
*has_port = false;
if (name[0] == '[') {
/* Parse a bracketed host, typically an IPv6 literal. */
const size_t rbracket = name.find(']', 1);
@ -58,6 +61,7 @@ bool SplitHostPort(StringView name, StringView* host, StringView* port) {
} else if (name[rbracket + 1] == ':') {
/* ]:<port?> */
*port = name.substr(rbracket + 2, name.size() - rbracket - 2);
*has_port = true;
} else {
/* ]<invalid> */
return false;
@ -76,6 +80,7 @@ bool SplitHostPort(StringView name, StringView* host, StringView* port) {
/* Exactly 1 colon. Split into host:port. */
*host = name.substr(0, colon);
*port = name.substr(colon + 1, name.size() - colon - 1);
*has_port = true;
} else {
/* 0 or 2+ colons. Bare hostname or IPv6 litearal. */
*host = name;
@ -84,6 +89,12 @@ bool SplitHostPort(StringView name, StringView* host, StringView* port) {
}
return true;
}
} // namespace
bool SplitHostPort(StringView name, StringView* host, StringView* port) {
bool unused;
return DoSplitHostPort(name, host, port, &unused);
}
bool SplitHostPort(StringView name, UniquePtr<char>* host,
UniquePtr<char>* port) {
@ -91,12 +102,14 @@ bool SplitHostPort(StringView name, UniquePtr<char>* host,
GPR_DEBUG_ASSERT(port != nullptr && *port == nullptr);
StringView host_view;
StringView port_view;
const bool ret = SplitHostPort(name, &host_view, &port_view);
bool has_port;
const bool ret = DoSplitHostPort(name, &host_view, &port_view, &has_port);
if (ret) {
// We always set the host, but port is set only when it's non-empty,
// to remain backward compatible with the old split_host_port API.
// We always set the host, but port is set only when DoSplitHostPort find a
// port in the string, to remain backward compatible with the old
// gpr_split_host_port API.
*host = host_view.dup();
if (!port_view.empty()) {
if (has_port) {
*port = port_view.dup();
}
}

@ -90,12 +90,12 @@ grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) {
abort();
}
if (socket->shutdown_called) {
info->bytes_transfered = 0;
info->bytes_transferred = 0;
info->wsa_error = WSA_OPERATION_ABORTED;
} else {
success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
FALSE, &flags);
info->bytes_transfered = bytes;
info->bytes_transferred = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
}
GPR_ASSERT(overlapped == &info->overlapped);

@ -59,7 +59,7 @@ typedef struct grpc_winsocket_callback_info {
to hold a mutex for a long amount of time. */
int has_pending_iocp;
/* The results of the overlapped operation. */
DWORD bytes_transfered;
DWORD bytes_transferred;
int wsa_error;
} grpc_winsocket_callback_info;

@ -435,12 +435,17 @@ static void tcp_do_read(grpc_tcp* tcp) {
GPR_TIMER_SCOPE("tcp_do_read", 0);
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
char cmsgbuf[24 /*CMSG_SPACE(sizeof(int))*/];
ssize_t read_bytes;
size_t total_read_bytes = 0;
size_t iov_len =
std::min<size_t>(MAX_READ_IOVEC, tcp->incoming_buffer->count);
#ifdef GRPC_LINUX_ERRQUEUE
constexpr size_t cmsg_alloc_space =
CMSG_SPACE(sizeof(grpc_core::scm_timestamping)) + CMSG_SPACE(sizeof(int));
#else
constexpr size_t cmsg_alloc_space = 24 /* CMSG_SPACE(sizeof(int)) */;
#endif /* GRPC_LINUX_ERRQUEUE */
char cmsgbuf[cmsg_alloc_space];
for (size_t i = 0; i < iov_len; i++) {
iov[i].iov_base = GRPC_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
iov[i].iov_len = GRPC_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
@ -524,6 +529,7 @@ static void tcp_do_read(grpc_tcp* tcp) {
if (cmsg->cmsg_level == SOL_TCP && cmsg->cmsg_type == TCP_CM_INQ &&
cmsg->cmsg_len == CMSG_LEN(sizeof(int))) {
tcp->inq = *reinterpret_cast<int*>(CMSG_DATA(cmsg));
break;
}
}
}

@ -196,17 +196,17 @@ static void on_read(void* tcpp, grpc_error* error) {
gpr_free(utf8_message);
grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
} else {
if (info->bytes_transfered != 0 && !tcp->shutting_down) {
GPR_ASSERT((size_t)info->bytes_transfered <= tcp->read_slices->length);
if (static_cast<size_t>(info->bytes_transfered) !=
if (info->bytes_transferred != 0 && !tcp->shutting_down) {
GPR_ASSERT((size_t)info->bytes_transferred <= tcp->read_slices->length);
if (static_cast<size_t>(info->bytes_transferred) !=
tcp->read_slices->length) {
grpc_slice_buffer_trim_end(
tcp->read_slices,
tcp->read_slices->length -
static_cast<size_t>(info->bytes_transfered),
static_cast<size_t>(info->bytes_transferred),
&tcp->last_read_buffer);
}
GPR_ASSERT((size_t)info->bytes_transfered == tcp->read_slices->length);
GPR_ASSERT((size_t)info->bytes_transferred == tcp->read_slices->length);
if (grpc_tcp_trace.enabled()) {
size_t i;
@ -288,7 +288,7 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
info->bytes_transferred = bytes_read;
GRPC_CLOSURE_SCHED(&tcp->on_read, GRPC_ERROR_NONE);
return;
}
@ -333,7 +333,7 @@ static void on_write(void* tcpp, grpc_error* error) {
if (info->wsa_error != 0) {
error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
} else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
GPR_ASSERT(info->bytes_transferred == tcp->write_slices->length);
}
}

@ -0,0 +1,66 @@
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using Grpc.Core;
namespace Grpc.Microbenchmarks
{
// common base-type for tests that need to run with some level of concurrency;
// note there's nothing *special* about this type - it is just to save some
// boilerplate
[ClrJob, CoreJob] // test .NET Core and .NET Framework
[MemoryDiagnoser] // allocations
public abstract class CommonThreadedBase
{
protected virtual bool NeedsEnvironment => true;
[Params(1, 2, 4, 8, 12)]
public int ThreadCount { get; set; }
protected GrpcEnvironment Environment { get; private set; }
[GlobalSetup]
public virtual void Setup()
{
ThreadPool.GetMinThreads(out var workers, out var iocp);
if (workers <= ThreadCount) ThreadPool.SetMinThreads(ThreadCount + 1, iocp);
if (NeedsEnvironment) Environment = GrpcEnvironment.AddRef();
}
[GlobalCleanup]
public virtual void Cleanup()
{
if (Environment != null)
{
Environment = null;
GrpcEnvironment.ReleaseAsync().Wait();
}
}
protected void RunConcurrent(Action operation)
{
Parallel.For(0, ThreadCount, _ => operation());
}
}
}

@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
@ -17,62 +17,38 @@
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading;
using Grpc.Core;
using BenchmarkDotNet.Attributes;
using Grpc.Core.Internal;
using System.Collections.Generic;
using System.Diagnostics;
namespace Grpc.Microbenchmarks
{
public class CompletionRegistryBenchmark
public class CompletionRegistryBenchmark : CommonThreadedBase
{
GrpcEnvironment environment;
[Params(false, true)]
public bool UseSharedRegistry { get; set; }
public void Init()
const int Iterations = 1000000; // High number to make the overhead of RunConcurrent negligible.
[Benchmark(OperationsPerInvoke = Iterations)]
public void RegisterExtract()
{
environment = GrpcEnvironment.AddRef();
RunConcurrent(() => {
CompletionRegistry sharedRegistry = UseSharedRegistry ? new CompletionRegistry(Environment, () => BatchContextSafeHandle.Create(), () => RequestCallContextSafeHandle.Create()) : null;
RunBody(sharedRegistry);
});
}
public void Cleanup()
private void RunBody(CompletionRegistry optionalSharedRegistry)
{
GrpcEnvironment.ReleaseAsync().Wait();
}
public void Run(int threadCount, int iterations, bool useSharedRegistry)
{
Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry));
CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create(), () => RequestCallContextSafeHandle.Create()) : null;
var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry));
threadedBenchmark.Run();
// TODO: parametrize by number of pending completions
}
private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry)
{
var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => throw new NotImplementedException(), () => throw new NotImplementedException());
var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(Environment, () => throw new NotImplementedException(), () => throw new NotImplementedException());
var ctx = BatchContextSafeHandle.Create();
var stopwatch = Stopwatch.StartNew();
for (int i = 0; i < iterations; i++)
for (int i = 0; i < Iterations; i++)
{
completionRegistry.Register(ctx.Handle, ctx);
var callback = completionRegistry.Extract(ctx.Handle);
// NOTE: we are not calling the callback to avoid disposing ctx.
}
stopwatch.Stop();
Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
ctx.Recycle();
}
private class NopCompletionCallback : IOpCompletionCallback
{
public void OnComplete(bool success)
{
}
}
}
}

@ -1,69 +0,0 @@
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using Grpc.Core;
using Grpc.Core.Internal;
namespace Grpc.Microbenchmarks
{
internal class GCStats
{
readonly object myLock = new object();
GCStatsSnapshot lastSnapshot;
public GCStats()
{
lastSnapshot = new GCStatsSnapshot(GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2));
}
public GCStatsSnapshot GetSnapshot(bool reset = false)
{
lock (myLock)
{
var newSnapshot = new GCStatsSnapshot(GC.CollectionCount(0) - lastSnapshot.Gen0,
GC.CollectionCount(1) - lastSnapshot.Gen1,
GC.CollectionCount(2) - lastSnapshot.Gen2);
if (reset)
{
lastSnapshot = newSnapshot;
}
return newSnapshot;
}
}
}
public class GCStatsSnapshot
{
public GCStatsSnapshot(int gen0, int gen1, int gen2)
{
this.Gen0 = gen0;
this.Gen1 = gen1;
this.Gen2 = gen2;
}
public int Gen0 { get; }
public int Gen1 { get; }
public int Gen2 { get; }
public override string ToString()
{
return string.Format("[GCCollectionCount: gen0 {0}, gen1 {1}, gen2 {2}]", Gen0, Gen1, Gen2);
}
}
}

@ -3,9 +3,10 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
<TargetFrameworks>net45;netcoreapp2.1</TargetFrameworks>
<TargetFrameworks>net461;netcoreapp2.1</TargetFrameworks>
<OutputType>Exe</OutputType>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>
<ItemGroup>
@ -13,10 +14,10 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.3.0" />
<PackageReference Include="BenchmarkDotNet" Version="0.11.5" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
<ItemGroup Condition=" '$(TargetFramework)' == 'net461' ">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
@ -24,5 +25,4 @@
<ItemGroup>
<Compile Include="..\Grpc.Core.Api\Version.cs" />
</ItemGroup>
</Project>

@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
@ -16,49 +16,39 @@
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading;
using Grpc.Core;
using BenchmarkDotNet.Attributes;
using Grpc.Core.Internal;
using System.Collections.Generic;
using System.Diagnostics;
namespace Grpc.Microbenchmarks
{
public class PInvokeByteArrayBenchmark
public class PInvokeByteArrayBenchmark : CommonThreadedBase
{
static readonly NativeMethods Native = NativeMethods.Get();
public void Init()
{
}
protected override bool NeedsEnvironment => false;
public void Cleanup()
{
}
public void Run(int threadCount, int iterations, int payloadSize)
[Params(0)]
public int PayloadSize { get; set; }
const int Iterations = 1000000; // High number to make the overhead of RunConcurrent negligible.
[Benchmark(OperationsPerInvoke = Iterations)]
public void AllocFree()
{
Console.WriteLine(string.Format("PInvokeByteArrayBenchmark: threads={0}, iterations={1}, payloadSize={2}", threadCount, iterations, payloadSize));
var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, payloadSize));
threadedBenchmark.Run();
RunConcurrent(RunBody);
}
private void ThreadBody(int iterations, int payloadSize)
private void RunBody()
{
var payload = new byte[payloadSize];
var stopwatch = Stopwatch.StartNew();
for (int i = 0; i < iterations; i++)
var payload = new byte[PayloadSize];
for (int i = 0; i < Iterations; i++)
{
var gcHandle = GCHandle.Alloc(payload, GCHandleType.Pinned);
var payloadPtr = gcHandle.AddrOfPinnedObject();
Native.grpcsharp_test_nop(payloadPtr);
gcHandle.Free();
}
stopwatch.Stop();
Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
}
}
}

@ -0,0 +1,102 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using Grpc.Core;
namespace Grpc.Microbenchmarks
{
// this test creates a real server and client, measuring the inherent inbuilt
// platform overheads; the marshallers **DO NOT ALLOCATE**, so any allocations
// are from the framework, not the messages themselves
// important: allocs are not reliable on .NET Core until .NET Core 3, since
// this test involves multiple threads
[ClrJob, CoreJob] // test .NET Core and .NET Framework
[MemoryDiagnoser] // allocations
public class PingBenchmark
{
private static readonly Task<string> CompletedString = Task.FromResult("");
private static readonly byte[] EmptyBlob = new byte[0];
private static readonly Marshaller<string> EmptyMarshaller = new Marshaller<string>(_ => EmptyBlob, _ => "");
private static readonly Method<string, string> PingMethod = new Method<string, string>(MethodType.Unary, nameof(PingBenchmark), "Ping", EmptyMarshaller, EmptyMarshaller);
[Benchmark]
public async ValueTask<string> PingAsync()
{
using (var result = client.PingAsync("", new CallOptions()))
{
return await result.ResponseAsync;
}
}
[Benchmark]
public string Ping()
{
return client.Ping("", new CallOptions());
}
private Task<string> ServerMethod(string request, ServerCallContext context)
{
return CompletedString;
}
Server server;
Channel channel;
PingClient client;
[GlobalSetup]
public async Task Setup()
{
// create server
server = new Server {
Ports = { new ServerPort("localhost", 10042, ServerCredentials.Insecure) },
Services = { ServerServiceDefinition.CreateBuilder().AddMethod(PingMethod, ServerMethod).Build() },
};
server.Start();
// create client
channel = new Channel("localhost", 10042, ChannelCredentials.Insecure);
await channel.ConnectAsync();
client = new PingClient(new DefaultCallInvoker(channel));
}
[GlobalCleanup]
public async Task Cleanup()
{
await channel.ShutdownAsync();
await server.ShutdownAsync();
}
class PingClient : LiteClientBase
{
public PingClient(CallInvoker callInvoker) : base(callInvoker) { }
public AsyncUnaryCall<string> PingAsync(string request, CallOptions options)
{
return CallInvoker.AsyncUnaryCall(PingMethod, null, options, request);
}
public string Ping(string request, CallOptions options)
{
return CallInvoker.BlockingUnaryCall(PingMethod, null, options, request);
}
}
}
}

@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
@ -16,95 +16,18 @@
#endregion
using System;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using CommandLine;
using CommandLine.Text;
using BenchmarkDotNet.Running;
namespace Grpc.Microbenchmarks
{
class Program
{
public enum MicrobenchmarkType
{
CompletionRegistry,
PInvokeByteArray,
SendMessage
}
private class BenchmarkOptions
{
[Option("benchmark", Required = true, HelpText = "Benchmark to run")]
public MicrobenchmarkType Benchmark { get; set; }
}
// typical usage: dotnet run -c Release -f netcoreapp2.1
// (this will profile both .net core and .net framework; for some reason
// if you start from "-f net461", it goes horribly wrong)
public static void Main(string[] args)
{
GrpcEnvironment.SetLogger(new ConsoleLogger());
var parserResult = Parser.Default.ParseArguments<BenchmarkOptions>(args)
.WithNotParsed(errors => {
Console.WriteLine("Supported benchmarks:");
foreach (var enumValue in Enum.GetValues(typeof(MicrobenchmarkType)))
{
Console.WriteLine(" " + enumValue);
}
Environment.Exit(1);
})
.WithParsed(options =>
{
switch (options.Benchmark)
{
case MicrobenchmarkType.CompletionRegistry:
RunCompletionRegistryBenchmark();
break;
case MicrobenchmarkType.PInvokeByteArray:
RunPInvokeByteArrayBenchmark();
break;
case MicrobenchmarkType.SendMessage:
RunSendMessageBenchmark();
break;
default:
throw new ArgumentException("Unsupported benchmark.");
}
});
}
static void RunCompletionRegistryBenchmark()
{
var benchmark = new CompletionRegistryBenchmark();
benchmark.Init();
foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12})
{
foreach (bool useSharedRegistry in new bool[] {false, true})
{
benchmark.Run(threadCount, 4 * 1000 * 1000, useSharedRegistry);
}
}
benchmark.Cleanup();
}
static void RunPInvokeByteArrayBenchmark()
{
var benchmark = new PInvokeByteArrayBenchmark();
benchmark.Init();
foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12})
{
benchmark.Run(threadCount, 4 * 1000 * 1000, 0);
}
benchmark.Cleanup();
}
static void RunSendMessageBenchmark()
{
var benchmark = new SendMessageBenchmark();
benchmark.Init();
foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12})
{
benchmark.Run(threadCount, 4 * 1000 * 1000, 0);
}
benchmark.Cleanup();
BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args);
}
}
}

@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
@ -17,59 +17,48 @@
#endregion
using System;
using System.Threading;
using BenchmarkDotNet.Attributes;
using Grpc.Core;
using Grpc.Core.Internal;
using System.Collections.Generic;
using System.Diagnostics;
namespace Grpc.Microbenchmarks
{
public class SendMessageBenchmark
public class SendMessageBenchmark : CommonThreadedBase
{
static readonly NativeMethods Native = NativeMethods.Get();
GrpcEnvironment environment;
public void Init()
public override void Setup()
{
Native.grpcsharp_test_override_method("grpcsharp_call_start_batch", "nop");
environment = GrpcEnvironment.AddRef();
base.Setup();
}
public void Cleanup()
{
GrpcEnvironment.ReleaseAsync().Wait();
// TODO(jtattermusch): track GC stats
}
[Params(0)]
public int PayloadSize { get; set; }
public void Run(int threadCount, int iterations, int payloadSize)
const int Iterations = 1000000; // High number to make the overhead of RunConcurrent negligible.
[Benchmark(OperationsPerInvoke = Iterations)]
public void SendMessage()
{
Console.WriteLine(string.Format("SendMessageBenchmark: threads={0}, iterations={1}, payloadSize={2}", threadCount, iterations, payloadSize));
var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, payloadSize));
threadedBenchmark.Run();
RunConcurrent(RunBody);
}
private void ThreadBody(int iterations, int payloadSize)
private void RunBody()
{
var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease(), () => throw new NotImplementedException());
var completionRegistry = new CompletionRegistry(Environment, () => Environment.BatchContextPool.Lease(), () => throw new NotImplementedException());
var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry);
var call = CreateFakeCall(cq);
var sendCompletionCallback = new NopSendCompletionCallback();
var payload = new byte[payloadSize];
var payload = new byte[PayloadSize];
var writeFlags = default(WriteFlags);
var stopwatch = Stopwatch.StartNew();
for (int i = 0; i < iterations; i++)
for (int i = 0; i < Iterations; i++)
{
call.StartSendMessage(sendCompletionCallback, payload, writeFlags, false);
var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey);
callback.OnComplete(true);
}
stopwatch.Stop();
Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
cq.Dispose();
}

@ -1,65 +0,0 @@
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using Grpc.Core;
using Grpc.Core.Internal;
using System.Collections.Generic;
using System.Diagnostics;
namespace Grpc.Microbenchmarks
{
public class ThreadedBenchmark
{
List<ThreadStart> runners;
public ThreadedBenchmark(IEnumerable<ThreadStart> runners)
{
this.runners = new List<ThreadStart>(runners);
}
public ThreadedBenchmark(int threadCount, Action threadBody)
{
this.runners = new List<ThreadStart>();
for (int i = 0; i < threadCount; i++)
{
this.runners.Add(new ThreadStart(() => threadBody()));
}
}
public void Run()
{
Console.WriteLine("Running threads.");
var gcStats = new GCStats();
var threads = new List<Thread>();
for (int i = 0; i < runners.Count; i++)
{
var thread = new Thread(runners[i]);
thread.Start();
threads.Add(thread);
}
foreach (var thread in threads)
{
thread.Join();
}
Console.WriteLine("All threads finished (GC Stats Delta: " + gcStats.GetSnapshot() + ")");
}
}
}

@ -0,0 +1,70 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Collections.Generic;
using System.Text;
using BenchmarkDotNet.Attributes;
using Grpc.Core.Internal;
namespace Grpc.Microbenchmarks
{
[ClrJob, CoreJob] // test .NET Core and .NET Framework
[MemoryDiagnoser] // allocations
public class Utf8Decode
{
[Params(0, 1, 4, 128, 1024)]
public int PayloadSize
{
get { return payloadSize; }
set
{
payloadSize = value;
payload = Invent(value);
}
}
private int payloadSize;
private byte[] payload;
static byte[] Invent(int length)
{
var rand = new Random(Seed: length);
var chars = new char[length];
for(int i = 0; i < chars.Length; i++)
{
chars[i] = (char)rand.Next(32, 300);
}
return Encoding.UTF8.GetBytes(chars);
}
const int Iterations = 1000;
[Benchmark(OperationsPerInvoke = Iterations)]
public unsafe void Decode()
{
fixed (byte* ptr = payload)
{
var iPtr = new IntPtr(ptr);
for (int i = 0; i < Iterations; i++)
{
MarshalUtils.PtrToStringUTF8(iPtr, payload.Length);
}
}
}
}
}

@ -0,0 +1,126 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Collections.Generic;
using BenchmarkDotNet.Attributes;
using Grpc.Core;
using Grpc.Core.Internal;
namespace Grpc.Microbenchmarks
{
[ClrJob, CoreJob] // test .NET Core and .NET Framework
[MemoryDiagnoser] // allocations
public class Utf8Encode : ISendStatusFromServerCompletionCallback
{
[Params(0, 1, 4, 128, 1024)]
public int PayloadSize
{
get { return payloadSize; }
set
{
payloadSize = value;
status = new Status(StatusCode.OK, Invent(value));
}
}
private int payloadSize;
private Status status;
static string Invent(int length)
{
var rand = new Random(Seed: length);
var chars = new char[length];
for(int i = 0; i < chars.Length; i++)
{
chars[i] = (char)rand.Next(32, 300);
}
return new string(chars);
}
private GrpcEnvironment environment;
private CompletionRegistry completionRegistry;
[GlobalSetup]
public void Setup()
{
var native = NativeMethods.Get();
// nop the native-call via reflection
NativeMethods.Delegates.grpcsharp_call_send_status_from_server_delegate nop = (CallSafeHandle call, BatchContextSafeHandle ctx, StatusCode statusCode, byte[] statusMessage, UIntPtr statusMessageLen, MetadataArraySafeHandle metadataArray, int sendEmptyInitialMetadata, byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags) => {
completionRegistry.Extract(ctx.Handle).OnComplete(true); // drain the dictionary as we go
return CallError.OK;
};
native.GetType().GetField(nameof(native.grpcsharp_call_send_status_from_server)).SetValue(native, nop);
environment = GrpcEnvironment.AddRef();
metadata = MetadataArraySafeHandle.Create(Metadata.Empty);
completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease(), () => throw new NotImplementedException());
var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry);
call = CreateFakeCall(cq);
}
private static CallSafeHandle CreateFakeCall(CompletionQueueSafeHandle cq)
{
var call = CallSafeHandle.CreateFake(new IntPtr(0xdead), cq);
bool success = false;
while (!success)
{
// avoid calling destroy on a nonexistent grpc_call pointer
call.DangerousAddRef(ref success);
}
return call;
}
[GlobalCleanup]
public void Cleanup()
{
try
{
metadata?.Dispose();
metadata = null;
call?.Dispose();
call = null;
if (environment != null)
{
environment = null;
// cleanup seems... unreliable on CLR
// GrpcEnvironment.ReleaseAsync().Wait(1000);
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex.Message);
}
}
private CallSafeHandle call;
private MetadataArraySafeHandle metadata;
const int Iterations = 1000;
[Benchmark(OperationsPerInvoke = Iterations)]
public unsafe void SendStatus()
{
for (int i = 0; i < Iterations; i++)
{
call.StartSendStatusFromServer(this, status, metadata, false, null, WriteFlags.NoCompress);
}
}
void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success) { }
}
}

@ -71,7 +71,9 @@ static void test_split_host_port() {
split_host_port_expect("", "", nullptr, true);
split_host_port_expect("[a:b]", "a:b", nullptr, true);
split_host_port_expect("1.2.3.4", "1.2.3.4", nullptr, true);
split_host_port_expect("0.0.0.0:", "0.0.0.0", "", true);
split_host_port_expect("a:b:c::", "a:b:c::", nullptr, true);
split_host_port_expect("[a:b:c::]:", "a:b:c::", "", true);
split_host_port_expect("[a:b]:30", "a:b", "30", true);
split_host_port_expect("1.2.3.4:30", "1.2.3.4", "30", true);
split_host_port_expect(":30", "", "30", true);

@ -284,7 +284,7 @@ LANG_RELEASE_MATRIX = {
('v1.16.0', ReleaseInfo(testcases_file='php__v1.0.1')),
('v1.17.1', ReleaseInfo(testcases_file='php__v1.0.1')),
('v1.18.0', ReleaseInfo()),
# v1.19 and v1.20 were deliberately ommitted here because of an issue.
# v1.19 and v1.20 were deliberately omitted here because of an issue.
# See https://github.com/grpc/grpc/issues/18264
('v1.21.4', ReleaseInfo()),
]),

Loading…
Cancel
Save