Merge remote-tracking branch 'upstream/master'

pull/2945/head
Hongyu Chen 9 years ago
commit 87b87df355
  1. 4
      src/compiler/objective_c_generator.cc
  2. 125
      src/csharp/Grpc.Auth/GoogleCredential.cs
  3. 27
      src/csharp/Grpc.Auth/Grpc.Auth.csproj
  4. 26
      src/csharp/Grpc.Auth/OAuth2Interceptors.cs
  5. 4
      src/csharp/Grpc.Auth/app.config
  6. 4
      src/csharp/Grpc.Auth/packages.config
  7. 6
      src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs
  8. 6
      src/csharp/Grpc.Core.Tests/ChannelTest.cs
  9. 6
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  10. 4
      src/csharp/Grpc.Core.Tests/CompressionTest.cs
  11. 31
      src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
  12. 12
      src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
  13. 4
      src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
  14. 72
      src/csharp/Grpc.Core/CallInvocationDetails.cs
  15. 86
      src/csharp/Grpc.Core/CallOptions.cs
  16. 46
      src/csharp/Grpc.Core/Calls.cs
  17. 24
      src/csharp/Grpc.Core/Channel.cs
  18. 6
      src/csharp/Grpc.Core/ChannelOptions.cs
  19. 29
      src/csharp/Grpc.Core/ClientBase.cs
  20. 58
      src/csharp/Grpc.Core/ContextPropagationToken.cs
  21. 2
      src/csharp/Grpc.Core/Grpc.Core.csproj
  22. 20
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  23. 19
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  24. 4
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  25. 2
      src/csharp/Grpc.Core/Internal/Timespec.cs
  26. 4
      src/csharp/Grpc.Core/KeyCertificatePair.cs
  27. 13
      src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
  28. 6
      src/csharp/Grpc.Core/Logging/ILogger.cs
  29. 23
      src/csharp/Grpc.Core/Marshaller.cs
  30. 8
      src/csharp/Grpc.Core/Metadata.cs
  31. 60
      src/csharp/Grpc.Core/Method.cs
  32. 47
      src/csharp/Grpc.Core/OperationFailedException.cs
  33. 14
      src/csharp/Grpc.Core/RpcException.cs
  34. 2
      src/csharp/Grpc.Core/Server.cs
  35. 2
      src/csharp/Grpc.Core/ServerCredentials.cs
  36. 4
      src/csharp/Grpc.Core/ServerMethods.cs
  37. 4
      src/csharp/Grpc.Core/ServerPort.cs
  38. 8
      src/csharp/Grpc.Core/ServerServiceDefinition.cs
  39. 13
      src/csharp/Grpc.Core/Status.cs
  40. 156
      src/csharp/Grpc.Core/StatusCode.cs
  41. 9
      src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
  42. 3
      src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
  43. 57
      src/csharp/Grpc.Core/Utils/ExceptionHelper.cs
  44. 25
      src/csharp/Grpc.Core/Utils/Preconditions.cs
  45. 34
      src/csharp/Grpc.Core/Version.cs
  46. 37
      src/csharp/Grpc.Core/VersionInfo.cs
  47. 10
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  48. 10
      src/csharp/Grpc.Examples/MathExamples.cs
  49. 7
      src/csharp/Grpc.Examples/MathServiceImpl.cs
  50. 8
      src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
  51. 4
      src/csharp/Grpc.IntegrationTesting.Client/app.config
  52. 4
      src/csharp/Grpc.IntegrationTesting.Server/app.config
  53. 38
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
  54. 86
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  55. 4
      src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
  56. 4
      src/csharp/Grpc.IntegrationTesting/app.config
  57. 4
      src/csharp/Grpc.IntegrationTesting/packages.config
  58. 3
      src/node/binding.gyp
  59. 2
      src/node/examples/perf_test.js
  60. 19
      src/node/ext/call.cc
  61. 25
      src/node/ext/channel.cc
  62. 6
      src/node/ext/channel.h
  63. 17
      src/node/interop/interop_client.js
  64. 70
      src/node/src/client.js
  65. 5
      src/node/test/call_test.js
  66. 8
      src/node/test/end_to_end_test.js
  67. 8
      src/objective-c/RxLibrary/GRXWriteable.h
  68. 4
      src/objective-c/RxLibrary/GRXWriteable.m
  69. 14
      src/objective-c/tests/RxLibraryUnitTests.m
  70. 9
      src/python/grpcio/grpc/_adapter/_c/types.h
  71. 8
      src/python/grpcio/grpc/_adapter/_c/types/call.c
  72. 54
      src/python/grpcio/grpc/_adapter/_c/types/channel.c
  73. 2
      src/python/grpcio/grpc/_adapter/_c/types/server.c
  74. 21
      src/python/grpcio/grpc/_adapter/_c/utility.c
  75. 2
      src/python/grpcio/grpc/_adapter/_intermediary_low.py
  76. 14
      src/python/grpcio/grpc/_adapter/_low.py
  77. 95
      src/python/grpcio/grpc/_adapter/_types.py
  78. 13
      src/python/grpcio_test/grpc_test/_adapter/_low_test.py
  79. 34
      src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py
  80. 64
      src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
  81. 45
      src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
  82. 5
      src/ruby/ext/grpc/rb_channel.c
  83. 2
      src/ruby/lib/grpc/generic/client_stub.rb
  84. 2
      src/ruby/spec/call_spec.rb
  85. 4
      src/ruby/spec/channel_spec.rb
  86. 2
      src/ruby/spec/client_server_spec.rb
  87. 2
      src/ruby/spec/generic/active_call_spec.rb
  88. 2
      test/core/security/verify_jwt.c
  89. 2
      test/cpp/end2end/end2end_test.cc
  90. 2
      tools/dockerfile/grpc_go/Dockerfile
  91. 34
      tools/dockerfile/grpc_go/build.sh

@ -154,9 +154,9 @@ void PrintAdvancedImplementation(Printer *printer,
printer->Print(" responsesWriteable:[GRXWriteable ");
if (method->server_streaming()) {
printer->Print("writeableWithStreamHandler:eventHandler]];\n");
printer->Print("writeableWithEventHandler:eventHandler]];\n");
} else {
printer->Print("writeableWithSingleValueHandler:handler]];\n");
printer->Print("writeableWithSingleHandler:handler]];\n");
}
printer->Print("}\n");

@ -1,125 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.IO;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using Google.Apis.Auth.OAuth2;
using Google.Apis.Auth.OAuth2.Responses;
using Newtonsoft.Json.Linq;
using Org.BouncyCastle.Crypto.Parameters;
using Org.BouncyCastle.Security;
namespace Grpc.Auth
{
// TODO(jtattermusch): Remove this class once possible.
/// <summary>
/// A temporary placeholder for Google credential from
/// Google Auth library for .NET. It emulates the usage pattern
/// for Usable auth.
/// </summary>
public class GoogleCredential
{
private const string GoogleApplicationCredentialsEnvName = "GOOGLE_APPLICATION_CREDENTIALS";
private const string ClientEmailFieldName = "client_email";
private const string PrivateKeyFieldName = "private_key";
private ServiceCredential credential;
private GoogleCredential(ServiceCredential credential)
{
this.credential = credential;
}
public static GoogleCredential GetApplicationDefault()
{
return new GoogleCredential(null);
}
public bool IsCreateScopedRequired
{
get
{
return true;
}
}
public GoogleCredential CreateScoped(IEnumerable<string> scopes)
{
var credsPath = Environment.GetEnvironmentVariable(GoogleApplicationCredentialsEnvName);
if (credsPath == null)
{
// Default to ComputeCredentials if path to JSON key is not set.
// ComputeCredential is not scoped actually, but for our use case it's
// fine to treat is as such.
return new GoogleCredential(new ComputeCredential(new ComputeCredential.Initializer()));
}
JObject jsonCredentialParameters = JObject.Parse(File.ReadAllText(credsPath));
string clientEmail = jsonCredentialParameters.GetValue(ClientEmailFieldName).Value<string>();
string privateKeyString = jsonCredentialParameters.GetValue(PrivateKeyFieldName).Value<string>();
var serviceCredential = new ServiceAccountCredential(
new ServiceAccountCredential.Initializer(clientEmail)
{
Scopes = scopes,
}.FromPrivateKey(privateKeyString));
return new GoogleCredential(serviceCredential);
}
public Task<bool> RequestAccessTokenAsync(CancellationToken taskCancellationToken)
{
return credential.RequestAccessTokenAsync(taskCancellationToken);
}
public TokenResponse Token
{
get
{
return credential.Token;
}
}
internal ServiceCredential InternalCredential
{
get
{
return credential;
}
}
}
}

@ -11,7 +11,7 @@
<AssemblyName>Grpc.Auth</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<DocumentationFile>bin\$(Configuration)\Grpc.Auth.Xml</DocumentationFile>
<NuGetPackageImportStamp>9b408026</NuGetPackageImportStamp>
<NuGetPackageImportStamp>4f8487a9</NuGetPackageImportStamp>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@ -41,28 +41,32 @@
<AssemblyOriginatorKeyFile>C:\keys\Grpc.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
<Reference Include="BouncyCastle.Crypto">
<Reference Include="BouncyCastle.Crypto, Version=1.7.4137.9688, Culture=neutral, PublicKeyToken=a4292a325f69b123, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\BouncyCastle.1.7.0\lib\Net40-Client\BouncyCastle.Crypto.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Auth, Version=1.9.2.27817, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<Reference Include="Google.Apis.Auth, Version=1.9.3.19379, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.Apis.Auth.1.9.2\lib\net40\Google.Apis.Auth.dll</HintPath>
<HintPath>..\packages\Google.Apis.Auth.1.9.3\lib\net40\Google.Apis.Auth.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Auth.PlatformServices, Version=1.9.2.27820, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<Reference Include="Google.Apis.Auth.PlatformServices, Version=1.9.3.19383, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.Apis.Auth.1.9.2\lib\net40\Google.Apis.Auth.PlatformServices.dll</HintPath>
<HintPath>..\packages\Google.Apis.Auth.1.9.3\lib\net40\Google.Apis.Auth.PlatformServices.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Core, Version=1.9.2.27816, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<Reference Include="Google.Apis.Core, Version=1.9.3.19379, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.Apis.Core.1.9.2\lib\portable-net40+sl50+win+wpa81+wp80\Google.Apis.Core.dll</HintPath>
<HintPath>..\packages\Google.Apis.Core.1.9.3\lib\portable-net40+sl50+win+wpa81+wp80\Google.Apis.Core.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Threading.Tasks">
<Reference Include="Microsoft.Threading.Tasks, Version=1.0.12.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Threading.Tasks.Extensions">
<Reference Include="Microsoft.Threading.Tasks.Extensions, Version=1.0.12.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Threading.Tasks.Extensions.Desktop">
<Reference Include="Microsoft.Threading.Tasks.Extensions.Desktop, Version=1.0.168.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.Desktop.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=7.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
@ -87,7 +91,6 @@
<Link>Version.cs</Link>
</Compile>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="GoogleCredential.cs" />
<Compile Include="OAuth2Interceptors.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />

@ -54,7 +54,7 @@ namespace Grpc.Auth
/// </summary>
public static MetadataInterceptorDelegate FromCredential(GoogleCredential googleCredential)
{
var interceptor = new OAuth2Interceptor(googleCredential.InternalCredential, SystemClock.Default);
var interceptor = new OAuth2Interceptor(googleCredential, SystemClock.Default);
return new MetadataInterceptorDelegate(interceptor.InterceptHeaders);
}
@ -66,7 +66,7 @@ namespace Grpc.Auth
public static MetadataInterceptorDelegate FromAccessToken(string oauth2Token)
{
Preconditions.CheckNotNull(oauth2Token);
return new MetadataInterceptorDelegate((metadata) =>
return new MetadataInterceptorDelegate((authUri, metadata) =>
{
metadata.Add(OAuth2Interceptor.CreateBearerTokenHeader(oauth2Token));
});
@ -80,10 +80,10 @@ namespace Grpc.Auth
private const string AuthorizationHeader = "Authorization";
private const string Schema = "Bearer";
private ServiceCredential credential;
private ITokenAccess credential;
private IClock clock;
public OAuth2Interceptor(ServiceCredential credential, IClock clock)
public OAuth2Interceptor(ITokenAccess credential, IClock clock)
{
this.credential = credential;
this.clock = clock;
@ -94,23 +94,15 @@ namespace Grpc.Auth
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public string GetAccessToken(CancellationToken cancellationToken)
public string GetAccessToken(string authUri, CancellationToken cancellationToken)
{
if (credential.Token == null || credential.Token.IsExpired(clock))
{
// TODO(jtattermusch): Parallel requests will spawn multiple requests to refresh the token once the token expires.
// TODO(jtattermusch): Rethink synchronous wait to obtain the result.
if (!credential.RequestAccessTokenAsync(cancellationToken).Result)
{
throw new InvalidOperationException("The access token has expired but we can't refresh it");
}
}
return credential.Token.AccessToken;
// TODO(jtattermusch): Rethink synchronous wait to obtain the result.
return credential.GetAccessTokenForRequestAsync(authUri, cancellationToken: cancellationToken).GetAwaiter().GetResult();
}
public void InterceptHeaders(Metadata metadata)
public void InterceptHeaders(string authUri, Metadata metadata)
{
var accessToken = GetAccessToken(CancellationToken.None);
var accessToken = GetAccessToken(authUri, CancellationToken.None);
metadata.Add(CreateBearerTokenHeader(accessToken));
}

@ -10,6 +10,10 @@
<assemblyIdentity name="System.Net.Http" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-4.2.28.0" newVersion="4.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Google.Apis.Core" publicKeyToken="4b01fa6e34db77ab" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.9.2.38523" newVersion="1.9.2.38523" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

@ -1,8 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="BouncyCastle" version="1.7.0" targetFramework="net45" />
<package id="Google.Apis.Auth" version="1.9.2" targetFramework="net45" />
<package id="Google.Apis.Core" version="1.9.2" targetFramework="net45" />
<package id="Google.Apis.Auth" version="1.9.3" targetFramework="net45" />
<package id="Google.Apis.Core" version="1.9.3" targetFramework="net45" />
<package id="Microsoft.Bcl" version="1.1.10" targetFramework="net45" />
<package id="Microsoft.Bcl.Async" version="1.0.168" targetFramework="net45" />
<package id="Microsoft.Bcl.Build" version="1.0.21" targetFramework="net45" />

@ -67,9 +67,9 @@ namespace Grpc.Core.Internal.Tests
[Test]
public void ConstructorPreconditions()
{
Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, "abc"); });
Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, 1); });
Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption("abc", null); });
Assert.Throws(typeof(ArgumentNullException), () => { new ChannelOption(null, "abc"); });
Assert.Throws(typeof(ArgumentNullException), () => { new ChannelOption(null, 1); });
Assert.Throws(typeof(ArgumentNullException), () => { new ChannelOption("abc", null); });
}
[Test]

@ -50,7 +50,7 @@ namespace Grpc.Core.Tests
[Test]
public void Constructor_RejectsInvalidParams()
{
Assert.Throws(typeof(NullReferenceException), () => new Channel(null, Credentials.Insecure));
Assert.Throws(typeof(ArgumentNullException), () => new Channel(null, Credentials.Insecure));
}
[Test]
@ -72,11 +72,11 @@ namespace Grpc.Core.Tests
}
[Test]
public void Target()
public void ResolvedTarget()
{
using (var channel = new Channel("127.0.0.1", Credentials.Insecure))
{
Assert.IsTrue(channel.Target.Contains("127.0.0.1"));
Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1"));
}
}

@ -138,7 +138,7 @@ namespace Grpc.Core.Tests
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
string result = "";
await requestStream.ForEach(async (request) =>
await requestStream.ForEachAsync(async (request) =>
{
result += request;
});
@ -147,7 +147,7 @@ namespace Grpc.Core.Tests
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
await call.RequestStream.WriteAll(new string[] { "A", "B", "C" });
await call.RequestStream.WriteAllAsync(new string[] { "A", "B", "C" });
Assert.AreEqual("ABC", await call.ResponseAsync);
}
@ -159,7 +159,7 @@ namespace Grpc.Core.Tests
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
barrier.SetResult(null);
await requestStream.ToList();
await requestStream.ToListAsync();
return "";
});

@ -90,7 +90,7 @@ namespace Grpc.Core.Tests
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
await requestStream.ToList();
await requestStream.ToListAsync();
context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
@ -122,7 +122,7 @@ namespace Grpc.Core.Tests
await call.RequestStream.CompleteAsync();
await call.ResponseStream.ToList();
await call.ResponseStream.ToListAsync();
}
}
}

@ -110,6 +110,14 @@ namespace Grpc.Core.Tests
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
Assert.Throws(typeof(ArgumentException), () =>
{
// Trying to override deadline while propagating deadline from parent call will throw.
Calls.BlockingUnaryCall(helper.CreateUnaryCall(
new CallOptions(deadline: DateTime.UtcNow.AddDays(8),
propagationToken: context.CreatePropagationToken())), "");
});
var callOptions = new CallOptions(propagationToken: context.CreatePropagationToken());
return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
});
@ -118,5 +126,28 @@ namespace Grpc.Core.Tests
await call.RequestStream.CompleteAsync();
Assert.AreEqual("PASS", await call);
}
[Test]
public async Task SuppressDeadlinePropagation()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Assert.AreEqual(DateTime.MaxValue, context.Deadline);
return "PASS";
});
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
Assert.IsTrue(context.CancellationToken.CanBeCanceled);
var callOptions = new CallOptions(propagationToken: context.CreatePropagationToken(new ContextPropagationOptions(propagateDeadline: false)));
return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
});
var cts = new CancellationTokenSource();
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddDays(7))));
await call.RequestStream.CompleteAsync();
Assert.AreEqual("PASS", await call);
}
}
}

@ -153,27 +153,23 @@ namespace Grpc.Core.Tests
return channel;
}
public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = null)
public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = default(CallOptions))
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, UnaryMethod, options);
}
public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = null)
public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = default(CallOptions))
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, ClientStreamingMethod, options);
}
public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = null)
public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = default(CallOptions))
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, ServerStreamingMethod, options);
}
public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = null)
public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = default(CallOptions))
{
options = options ?? new CallOptions();
return new CallInvocationDetails<string, string>(channel, DuplexStreamingMethod, options);
}

@ -84,7 +84,7 @@ namespace Grpc.Core.Tests
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
Assert.Throws(typeof(NullReferenceException), async () => await context.WriteResponseHeadersAsync(null));
Assert.Throws(typeof(ArgumentNullException), async () => await context.WriteResponseHeadersAsync(null));
return "PASS";
});
@ -129,7 +129,7 @@ namespace Grpc.Core.Tests
});
var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
var responses = await call.ResponseStream.ToList();
var responses = await call.ResponseStream.ToListAsync();
CollectionAssert.AreEqual(new[] { "A", "B" }, responses);
}
}

@ -40,30 +40,60 @@ namespace Grpc.Core
/// <summary>
/// Details about a client-side call to be invoked.
/// </summary>
public class CallInvocationDetails<TRequest, TResponse>
public struct CallInvocationDetails<TRequest, TResponse>
{
readonly Channel channel;
readonly string method;
readonly string host;
readonly Marshaller<TRequest> requestMarshaller;
readonly Marshaller<TResponse> responseMarshaller;
readonly CallOptions options;
CallOptions options;
/// <summary>
/// Initializes a new instance of the <see cref="Grpc.Core.CallInvocationDetails`2"/> struct.
/// </summary>
/// <param name="channel">Channel to use for this call.</param>
/// <param name="method">Method to call.</param>
/// <param name="options">Call options.</param>
public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, CallOptions options) :
this(channel, method.FullName, null, method.RequestMarshaller, method.ResponseMarshaller, options)
this(channel, method, null, options)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="Grpc.Core.CallInvocationDetails`2"/> struct.
/// </summary>
/// <param name="channel">Channel to use for this call.</param>
/// <param name="method">Method to call.</param>
/// <param name="host">Host that contains the method. if <c>null</c>, default host will be used.</param>
/// <param name="options">Call options.</param>
public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, string host, CallOptions options) :
this(channel, method.FullName, host, method.RequestMarshaller, method.ResponseMarshaller, options)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="Grpc.Core.CallInvocationDetails`2"/> struct.
/// </summary>
/// <param name="channel">Channel to use for this call.</param>
/// <param name="method">Qualified method name.</param>
/// <param name="host">Host that contains the method.</param>
/// <param name="requestMarshaller">Request marshaller.</param>
/// <param name="responseMarshaller">Response marshaller.</param>
/// <param name="options">Call options.</param>
public CallInvocationDetails(Channel channel, string method, string host, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller, CallOptions options)
{
this.channel = Preconditions.CheckNotNull(channel);
this.method = Preconditions.CheckNotNull(method);
this.channel = Preconditions.CheckNotNull(channel, "channel");
this.method = Preconditions.CheckNotNull(method, "method");
this.host = host;
this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller);
this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller);
this.options = Preconditions.CheckNotNull(options);
this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller, "requestMarshaller");
this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller, "responseMarshaller");
this.options = options;
}
/// <summary>
/// Get channel associated with this call.
/// </summary>
public Channel Channel
{
get
@ -72,6 +102,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets name of method to be called.
/// </summary>
public string Method
{
get
@ -80,6 +113,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Get name of host.
/// </summary>
public string Host
{
get
@ -88,6 +124,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets marshaller used to serialize requests.
/// </summary>
public Marshaller<TRequest> RequestMarshaller
{
get
@ -96,6 +135,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets marshaller used to deserialized responses.
/// </summary>
public Marshaller<TResponse> ResponseMarshaller
{
get
@ -104,6 +146,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the call options.
/// </summary>
public CallOptions Options
{
get
@ -111,5 +156,16 @@ namespace Grpc.Core
return options;
}
}
/// <summary>
/// Returns new instance of <see cref="CallInvocationDetails"/> with
/// <c>Options</c> set to the value provided. Values of all other fields are preserved.
/// </summary>
public CallInvocationDetails<TRequest, TResponse> WithOptions(CallOptions options)
{
var newDetails = this;
newDetails.options = options;
return newDetails;
}
}
}

@ -42,29 +42,28 @@ namespace Grpc.Core
/// <summary>
/// Options for calls made by client.
/// </summary>
public class CallOptions
public struct CallOptions
{
readonly Metadata headers;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
readonly WriteOptions writeOptions;
readonly ContextPropagationToken propagationToken;
Metadata headers;
DateTime? deadline;
CancellationToken cancellationToken;
WriteOptions writeOptions;
ContextPropagationToken propagationToken;
/// <summary>
/// Creates a new instance of <c>CallOptions</c>.
/// Creates a new instance of <c>CallOptions</c> struct.
/// </summary>
/// <param name="headers">Headers to be sent with the call.</param>
/// <param name="deadline">Deadline for the call to finish. null means no deadline.</param>
/// <param name="cancellationToken">Can be used to request cancellation of the call.</param>
/// <param name="writeOptions">Write options that will be used for this call.</param>
/// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param>
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken? cancellationToken = null,
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken),
WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null)
{
// TODO(jtattermusch): consider only creating metadata object once it's really needed.
this.headers = headers ?? new Metadata();
this.deadline = deadline ?? (propagationToken != null ? propagationToken.Deadline : DateTime.MaxValue);
this.cancellationToken = cancellationToken ?? (propagationToken != null ? propagationToken.CancellationToken : CancellationToken.None);
this.headers = headers;
this.deadline = deadline;
this.cancellationToken = cancellationToken;
this.writeOptions = writeOptions;
this.propagationToken = propagationToken;
}
@ -80,7 +79,7 @@ namespace Grpc.Core
/// <summary>
/// Call deadline.
/// </summary>
public DateTime Deadline
public DateTime? Deadline
{
get { return deadline; }
}
@ -114,5 +113,66 @@ namespace Grpc.Core
return this.propagationToken;
}
}
/// <summary>
/// Returns new instance of <see cref="CallOptions"/> with
/// <c>Headers</c> set to the value provided. Values of all other fields are preserved.
/// </summary>
public CallOptions WithHeaders(Metadata headers)
{
var newOptions = this;
newOptions.headers = headers;
return newOptions;
}
/// <summary>
/// Returns new instance of <see cref="CallOptions"/> with
/// <c>Deadline</c> set to the value provided. Values of all other fields are preserved.
/// </summary>
public CallOptions WithDeadline(DateTime deadline)
{
var newOptions = this;
newOptions.deadline = deadline;
return newOptions;
}
/// <summary>
/// Returns new instance of <see cref="CallOptions"/> with
/// <c>CancellationToken</c> set to the value provided. Values of all other fields are preserved.
/// </summary>
public CallOptions WithCancellationToken(CancellationToken cancellationToken)
{
var newOptions = this;
newOptions.cancellationToken = cancellationToken;
return newOptions;
}
/// <summary>
/// Returns a new instance of <see cref="CallOptions"/> with
/// all previously unset values set to their defaults and deadline and cancellation
/// token propagated when appropriate.
/// </summary>
internal CallOptions Normalize()
{
var newOptions = this;
if (propagationToken != null)
{
if (propagationToken.Options.IsPropagateDeadline)
{
Preconditions.CheckArgument(!newOptions.deadline.HasValue,
"Cannot propagate deadline from parent call. The deadline has already been set explicitly.");
newOptions.deadline = propagationToken.ParentDeadline;
}
if (propagationToken.Options.IsPropagateCancellation)
{
Preconditions.CheckArgument(!newOptions.cancellationToken.CanBeCanceled,
"Cannot propagate cancellation token from parent call. The cancellation token has already been set to a non-default value.");
}
}
newOptions.headers = newOptions.headers ?? Metadata.Empty;
newOptions.deadline = newOptions.deadline ?? DateTime.MaxValue;
return newOptions;
}
}
}

@ -31,8 +31,6 @@
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@ -40,9 +38,20 @@ namespace Grpc.Core
{
/// <summary>
/// Helper methods for generated clients to make RPC calls.
/// Most users will use this class only indirectly and will be
/// making calls using client object generated from protocol
/// buffer definition files.
/// </summary>
public static class Calls
{
/// <summary>
/// Invokes a simple remote call in a blocking fashion.
/// </summary>
/// <returns>The response.</returns>
/// <param name="call">The call defintion.</param>
/// <param name="req">Request message.</param>
/// <typeparam name="TRequest">Type of request message.</typeparam>
/// <typeparam name="TResponse">The of response message.</typeparam>
public static TResponse BlockingUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
@ -51,6 +60,14 @@ namespace Grpc.Core
return asyncCall.UnaryCall(req);
}
/// <summary>
/// Invokes a simple remote call asynchronously.
/// </summary>
/// <returns>An awaitable call object providing access to the response.</returns>
/// <param name="call">The call defintion.</param>
/// <param name="req">Request message.</param>
/// <typeparam name="TRequest">Type of request message.</typeparam>
/// <typeparam name="TResponse">The of response message.</typeparam>
public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
@ -60,6 +77,15 @@ namespace Grpc.Core
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
/// <summary>
/// Invokes a server streaming call asynchronously.
/// In server streaming scenario, client sends on request and server responds with a stream of responses.
/// </summary>
/// <returns>A call object providing access to the asynchronous response stream.</returns>
/// <param name="call">The call defintion.</param>
/// <param name="req">Request message.</param>
/// <typeparam name="TRequest">Type of request message.</typeparam>
/// <typeparam name="TResponse">The of response messages.</typeparam>
public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
@ -70,6 +96,13 @@ namespace Grpc.Core
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
/// <summary>
/// Invokes a client streaming call asynchronously.
/// In client streaming scenario, client sends a stream of requests and server responds with a single response.
/// </summary>
/// <returns>An awaitable call object providing access to the response.</returns>
/// <typeparam name="TRequest">Type of request messages.</typeparam>
/// <typeparam name="TResponse">The of response message.</typeparam>
public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call)
where TRequest : class
where TResponse : class
@ -80,6 +113,15 @@ namespace Grpc.Core
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
/// <summary>
/// Invokes a duplex streaming call asynchronously.
/// In duplex streaming scenario, client sends a stream of requests and server responds with a stream of responses.
/// The response stream is completely independent and both side can be sending messages at the same time.
/// </summary>
/// <returns>A call object providing access to the asynchronous request and response streams.</returns>
/// <param name="call">The call definition.</param>
/// <typeparam name="TRequest">Type of request messages.</typeparam>
/// <typeparam name="TResponse">Type of reponse messages.</typeparam>
public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call)
where TRequest : class
where TResponse : class

@ -49,6 +49,7 @@ namespace Grpc.Core
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
readonly string target;
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options;
@ -58,12 +59,12 @@ namespace Grpc.Core
/// Creates a channel that connects to a specific host.
/// Port will default to 80 for an unsecure channel and to 443 for a secure channel.
/// </summary>
/// <param name="host">The name or IP address of the host.</param>
/// <param name="target">Target of the channel.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
public Channel(string host, Credentials credentials, IEnumerable<ChannelOption> options = null)
public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null)
{
Preconditions.CheckNotNull(host);
this.target = Preconditions.CheckNotNull(target, "target");
this.environment = GrpcEnvironment.GetInstance();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
@ -73,11 +74,11 @@ namespace Grpc.Core
{
if (nativeCredentials != null)
{
this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, host, nativeChannelArgs);
this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs);
}
else
{
this.handle = ChannelSafeHandle.CreateInsecure(host, nativeChannelArgs);
this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs);
}
}
}
@ -131,8 +132,8 @@ namespace Grpc.Core
return tcs.Task;
}
/// <summary> Address of the remote endpoint in URI format.</summary>
public string Target
/// <summary>Resolved address of the remote endpoint in URI format.</summary>
public string ResolvedTarget
{
get
{
@ -140,6 +141,15 @@ namespace Grpc.Core
}
}
/// <summary>The original target used to create the channel.</summary>
public string Target
{
get
{
return this.target;
}
}
/// <summary>
/// Allows explicitly requesting channel to connect without starting an RPC.
/// Returned task completes once state Ready was seen. If the deadline is reached,

@ -63,8 +63,8 @@ namespace Grpc.Core
public ChannelOption(string name, string stringValue)
{
this.type = OptionType.String;
this.name = Preconditions.CheckNotNull(name);
this.stringValue = Preconditions.CheckNotNull(stringValue);
this.name = Preconditions.CheckNotNull(name, "name");
this.stringValue = Preconditions.CheckNotNull(stringValue, "stringValue");
}
/// <summary>
@ -75,7 +75,7 @@ namespace Grpc.Core
public ChannelOption(string name, int intValue)
{
this.type = OptionType.Integer;
this.name = Preconditions.CheckNotNull(name);
this.name = Preconditions.CheckNotNull(name, "name");
this.intValue = intValue;
}

@ -35,21 +35,26 @@ using System;
using System.Collections.Generic;
using Grpc.Core.Internal;
using System.Text.RegularExpressions;
namespace Grpc.Core
{
public delegate void MetadataInterceptorDelegate(Metadata metadata);
public delegate void MetadataInterceptorDelegate(string authUri, Metadata metadata);
/// <summary>
/// Base class for client-side stubs.
/// </summary>
public abstract class ClientBase
{
static readonly Regex TrailingPortPattern = new Regex(":[0-9]+/?$");
readonly Channel channel;
readonly string authUriBase;
public ClientBase(Channel channel)
{
this.channel = channel;
// TODO(jtattermush): we shouldn't need to hand-curate the channel.Target contents.
this.authUriBase = "https://" + TrailingPortPattern.Replace(channel.Target, "") + "/";
}
/// <summary>
@ -62,6 +67,18 @@ namespace Grpc.Core
set;
}
/// <summary>
/// gRPC supports multiple "hosts" being served by a single server.
/// This property can be used to set the target host explicitly.
/// By default, this will be set to <c>null</c> with the meaning
/// "use default host".
/// </summary>
public string Host
{
get;
set;
}
/// <summary>
/// Channel associated with this client.
/// </summary>
@ -83,10 +100,14 @@ namespace Grpc.Core
var interceptor = HeaderInterceptor;
if (interceptor != null)
{
interceptor(options.Headers);
options.Headers.Freeze();
if (options.Headers == null)
{
options = options.WithHeaders(new Metadata());
}
var authUri = authUriBase + method.ServiceName;
interceptor(authUri, options.Headers);
}
return new CallInvocationDetails<TRequest, TResponse>(channel, method, options);
return new CallInvocationDetails<TRequest, TResponse>(channel, method, Host, options);
}
}
}

@ -52,7 +52,7 @@ namespace Grpc.Core
/// <summary>
/// Default propagation mask used by C core.
/// </summary>
const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
private const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
/// <summary>
/// Default propagation mask used by C# - we want to propagate deadline
@ -74,6 +74,9 @@ namespace Grpc.Core
this.options = options ?? ContextPropagationOptions.Default;
}
/// <summary>
/// Gets the native handle of the parent call.
/// </summary>
internal CallSafeHandle ParentCall
{
get
@ -82,7 +85,10 @@ namespace Grpc.Core
}
}
internal DateTime Deadline
/// <summary>
/// Gets the parent call's deadline.
/// </summary>
internal DateTime ParentDeadline
{
get
{
@ -90,7 +96,10 @@ namespace Grpc.Core
}
}
internal CancellationToken CancellationToken
/// <summary>
/// Gets the parent call's cancellation token.
/// </summary>
internal CancellationToken ParentCancellationToken
{
get
{
@ -98,6 +107,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Get the context propagation options.
/// </summary>
internal ContextPropagationOptions Options
{
get
@ -105,16 +117,6 @@ namespace Grpc.Core
return this.options;
}
}
internal bool IsPropagateDeadline
{
get { return false; }
}
internal bool IsPropagateCancellation
{
get { return false; }
}
}
/// <summary>
@ -122,7 +124,37 @@ namespace Grpc.Core
/// </summary>
public class ContextPropagationOptions
{
/// <summary>
/// The context propagation options that will be used by default.
/// </summary>
public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
bool propagateDeadline;
bool propagateCancellation;
/// <summary>
/// Creates new context propagation options.
/// </summary>
/// <param name="propagateDeadline">If set to <c>true</c> parent call's deadline will be propagated to the child call.</param>
/// <param name="propagateCancellation">If set to <c>true</c> parent call's cancellation token will be propagated to the child call.</param>
public ContextPropagationOptions(bool propagateDeadline = true, bool propagateCancellation = true)
{
this.propagateDeadline = propagateDeadline;
this.propagateCancellation = propagateCancellation;
}
/// <value><c>true</c> if parent call's deadline should be propagated to the child call.</value>
public bool IsPropagateDeadline
{
get { return this.propagateDeadline; }
}
/// <value><c>true</c> if parent call's cancellation token should be propagated to the child call.</value>
public bool IsPropagateCancellation
{
get { return this.propagateCancellation; }
}
}
/// <summary>

@ -77,14 +77,12 @@
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\AsyncStreamExtensions.cs" />
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="Utils\ExceptionHelper.cs" />
<Compile Include="Internal\CredentialsSafeHandle.cs" />
<Compile Include="Credentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
<Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
<Compile Include="OperationFailedException.cs" />
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Utils\Preconditions.cs" />
<Compile Include="Internal\ServerCredentialsSafeHandle.cs" />

@ -115,7 +115,7 @@ namespace Grpc.Core
/// </summary>
public static void SetLogger(ILogger customLogger)
{
Preconditions.CheckNotNull(customLogger);
Preconditions.CheckNotNull(customLogger, "customLogger");
logger = customLogger;
}
@ -192,23 +192,5 @@ namespace Grpc.Core
Logger.Info("gRPC shutdown.");
}
/// <summary>
/// Shuts down this environment asynchronously.
/// </summary>
private Task CloseAsync()
{
return Task.Run(() =>
{
try
{
Close();
}
catch (Exception e)
{
Logger.Error(e, "Error occured while shutting down GrpcEnvironment.");
}
});
}
}
}

@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails;
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
@ -109,15 +109,9 @@ namespace Grpc.Core.Internal
}
}
try
{
// Once the blocking call returns, the result should be available synchronously.
return unaryResponseTcs.Task.Result;
}
catch (AggregateException ae)
{
throw ExceptionHelper.UnwrapRpcException(ae);
}
// Once the blocking call returns, the result should be available synchronously.
// Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException.
return unaryResponseTcs.Task.GetAwaiter().GetResult();
}
}
@ -324,12 +318,11 @@ namespace Grpc.Core.Internal
private void Initialize(CompletionQueueSafeHandle cq)
{
var propagationToken = details.Options.PropagationToken;
var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance;
var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
RegisterCancellationCallback();

@ -293,7 +293,7 @@ namespace Grpc.Core.Internal
if (!success)
{
FireCompletion(origCompletionDelegate, null, new OperationFailedException("Send failed"));
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
}
else
{
@ -318,7 +318,7 @@ namespace Grpc.Core.Internal
if (!success)
{
FireCompletion(origCompletionDelegate, null, new OperationFailedException("Halfclose failed"));
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Halfclose failed"));
}
else
{

@ -211,7 +211,7 @@ namespace Grpc.Core.Internal
return Timespec.InfPast;
}
Preconditions.CheckArgument(dateTime.Kind == DateTimeKind.Utc, "dateTime");
Preconditions.CheckArgument(dateTime.Kind == DateTimeKind.Utc, "dateTime needs of kind DateTimeKind.Utc or be equal to DateTime.MaxValue or DateTime.MinValue.");
try
{

@ -54,8 +54,8 @@ namespace Grpc.Core
/// <param name="privateKey">PEM encoded private key.</param>
public KeyCertificatePair(string certificateChain, string privateKey)
{
this.certificateChain = Preconditions.CheckNotNull(certificateChain);
this.privateKey = Preconditions.CheckNotNull(privateKey);
this.certificateChain = Preconditions.CheckNotNull(certificateChain, "certificateChain");
this.privateKey = Preconditions.CheckNotNull(privateKey, "privateKey");
}
/// <summary>

@ -42,16 +42,21 @@ namespace Grpc.Core.Logging
readonly Type forType;
readonly string forTypeString;
/// <summary>Creates a console logger not associated to any specific type.</summary>
public ConsoleLogger() : this(null)
{
}
/// <summary>Creates a console logger that logs messsage specific for given type.</summary>
private ConsoleLogger(Type forType)
{
this.forType = forType;
this.forTypeString = forType != null ? forType.FullName + " " : "";
}
/// <summary>
/// Returns a logger associated with the specified type.
/// </summary>
public ILogger ForType<T>()
{
if (typeof(T) == forType)
@ -61,31 +66,37 @@ namespace Grpc.Core.Logging
return new ConsoleLogger(typeof(T));
}
/// <summary>Logs a message with severity Debug.</summary>
public void Debug(string message, params object[] formatArgs)
{
Log("D", message, formatArgs);
}
/// <summary>Logs a message with severity Info.</summary>
public void Info(string message, params object[] formatArgs)
{
Log("I", message, formatArgs);
}
/// <summary>Logs a message with severity Warning.</summary>
public void Warning(string message, params object[] formatArgs)
{
Log("W", message, formatArgs);
}
/// <summary>Logs a message and an associated exception with severity Warning.</summary>
public void Warning(Exception exception, string message, params object[] formatArgs)
{
Log("W", message + " " + exception, formatArgs);
}
/// <summary>Logs a message with severity Error.</summary>
public void Error(string message, params object[] formatArgs)
{
Log("E", message, formatArgs);
}
/// <summary>Logs a message and an associated exception with severity Error.</summary>
public void Error(Exception exception, string message, params object[] formatArgs)
{
Log("E", message + " " + exception, formatArgs);

@ -42,16 +42,22 @@ namespace Grpc.Core.Logging
/// <summary>Returns a logger associated with the specified type.</summary>
ILogger ForType<T>();
/// <summary>Logs a message with severity Debug.</summary>
void Debug(string message, params object[] formatArgs);
/// <summary>Logs a message with severity Info.</summary>
void Info(string message, params object[] formatArgs);
/// <summary>Logs a message with severity Warning.</summary>
void Warning(string message, params object[] formatArgs);
/// <summary>Logs a message and an associated exception with severity Warning.</summary>
void Warning(Exception exception, string message, params object[] formatArgs);
/// <summary>Logs a message with severity Error.</summary>
void Error(string message, params object[] formatArgs);
/// <summary>Logs a message and an associated exception with severity Error.</summary>
void Error(Exception exception, string message, params object[] formatArgs);
}
}

@ -37,19 +37,27 @@ using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
/// For serializing and deserializing messages.
/// Encapsulates the logic for serializing and deserializing messages.
/// </summary>
public struct Marshaller<T>
{
readonly Func<T, byte[]> serializer;
readonly Func<byte[], T> deserializer;
/// <summary>
/// Initializes a new marshaller.
/// </summary>
/// <param name="serializer">Function that will be used to serialize messages.</param>
/// <param name="deserializer">Function that will be used to deserialize messages.</param>
public Marshaller(Func<T, byte[]> serializer, Func<byte[], T> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = Preconditions.CheckNotNull(deserializer);
this.serializer = Preconditions.CheckNotNull(serializer, "serializer");
this.deserializer = Preconditions.CheckNotNull(deserializer, "deserializer");
}
/// <summary>
/// Gets the serializer function.
/// </summary>
public Func<T, byte[]> Serializer
{
get
@ -58,6 +66,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the deserializer function.
/// </summary>
public Func<byte[], T> Deserializer
{
get
@ -72,11 +83,17 @@ namespace Grpc.Core
/// </summary>
public static class Marshallers
{
/// <summary>
/// Creates a marshaller from specified serializer and deserializer.
/// </summary>
public static Marshaller<T> Create<T>(Func<T, byte[]> serializer, Func<byte[], T> deserializer)
{
return new Marshaller<T>(serializer, deserializer);
}
/// <summary>
/// Returns a marshaller for <c>string</c> type. This is useful for testing.
/// </summary>
public static Marshaller<string> StringMarshaller
{
get

@ -186,15 +186,15 @@ namespace Grpc.Core
public Entry(string key, byte[] valueBytes)
{
this.key = Preconditions.CheckNotNull(key);
this.key = Preconditions.CheckNotNull(key, "key");
this.value = null;
this.valueBytes = Preconditions.CheckNotNull(valueBytes);
this.valueBytes = Preconditions.CheckNotNull(valueBytes, "valueBytes");
}
public Entry(string key, string value)
{
this.key = Preconditions.CheckNotNull(key);
this.value = Preconditions.CheckNotNull(value);
this.key = Preconditions.CheckNotNull(key, "key");
this.value = Preconditions.CheckNotNull(value, "value");
this.valueBytes = null;
}

@ -41,14 +41,21 @@ namespace Grpc.Core
/// </summary>
public enum MethodType
{
Unary, // Unary request, unary response.
ClientStreaming, // Streaming request, unary response.
ServerStreaming, // Unary request, streaming response.
DuplexStreaming // Streaming request, streaming response.
/// <summary>Single request sent from client, single response received from server.</summary>
Unary,
/// <summary>Stream of request sent from client, single response received from server.</summary>
ClientStreaming,
/// <summary>Single request sent from client, stream of responses received from server.</summary>
ServerStreaming,
/// <summary>Both server and client can stream arbitrary number of requests and responses simultaneously.</summary>
DuplexStreaming
}
/// <summary>
/// A description of a service method.
/// A description of a remote method.
/// </summary>
public class Method<TRequest, TResponse>
{
@ -59,16 +66,27 @@ namespace Grpc.Core
readonly Marshaller<TResponse> responseMarshaller;
readonly string fullName;
/// <summary>
/// Initializes a new instance of the <c>Method</c> class.
/// </summary>
/// <param name="type">Type of method.</param>
/// <param name="serviceName">Name of service this method belongs to.</param>
/// <param name="name">Unqualified name of the method.</param>
/// <param name="requestMarshaller">Marshaller used for request messages.</param>
/// <param name="responseMarshaller">Marshaller used for response messages.</param>
public Method(MethodType type, string serviceName, string name, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller)
{
this.type = type;
this.serviceName = Preconditions.CheckNotNull(serviceName);
this.name = Preconditions.CheckNotNull(name);
this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller);
this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller);
this.fullName = GetFullName(serviceName);
this.serviceName = Preconditions.CheckNotNull(serviceName, "serviceName");
this.name = Preconditions.CheckNotNull(name, "name");
this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller, "requestMarshaller");
this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller, "responseMarshaller");
this.fullName = GetFullName(serviceName, name);
}
/// <summary>
/// Gets the type of the method.
/// </summary>
public MethodType Type
{
get
@ -77,6 +95,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the name of the service to which this method belongs.
/// </summary>
public string ServiceName
{
get
@ -85,6 +106,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the unqualified name of the method.
/// </summary>
public string Name
{
get
@ -93,6 +117,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the marshaller used for request messages.
/// </summary>
public Marshaller<TRequest> RequestMarshaller
{
get
@ -101,6 +128,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the marshaller used for response messages.
/// </summary>
public Marshaller<TResponse> ResponseMarshaller
{
get
@ -108,7 +138,11 @@ namespace Grpc.Core
return this.responseMarshaller;
}
}
/// <summary>
/// Gets the fully qualified name of the method. On the server side, methods are dispatched
/// based on this name.
/// </summary>
public string FullName
{
get
@ -120,9 +154,9 @@ namespace Grpc.Core
/// <summary>
/// Gets full name of the method including the service name.
/// </summary>
internal string GetFullName(string serviceName)
internal static string GetFullName(string serviceName, string methodName)
{
return "/" + Preconditions.CheckNotNull(serviceName) + "/" + this.Name;
return "/" + serviceName + "/" + methodName;
}
}
}

@ -1,47 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
namespace Grpc.Core
{
/// <summary>
/// Thrown when gRPC operation fails.
/// </summary>
public class OperationFailedException : Exception
{
public OperationFailedException(string message) : base(message)
{
}
}
}

@ -36,22 +36,34 @@ using System;
namespace Grpc.Core
{
/// <summary>
/// Thrown when remote procedure call fails.
/// Thrown when remote procedure call fails. Every <c>RpcException</c> is associated with a resulting <see cref="Status"/> of the call.
/// </summary>
public class RpcException : Exception
{
private readonly Status status;
/// <summary>
/// Creates a new <c>RpcException</c> associated with given status.
/// </summary>
/// <param name="status">Resulting status of a call.</param>
public RpcException(Status status) : base(status.ToString())
{
this.status = status;
}
/// <summary>
/// Creates a new <c>RpcException</c> associated with given status and message.
/// </summary>
/// <param name="status">Resulting status of a call.</param>
/// <param name="message">The exception message.</param>
public RpcException(Status status, string message) : base(message)
{
this.status = status;
}
/// <summary>
/// Resulting status of the call.
/// </summary>
public Status Status
{
get

@ -192,7 +192,7 @@ namespace Grpc.Core
{
lock (myLock)
{
Preconditions.CheckNotNull(serverPort.Credentials);
Preconditions.CheckNotNull(serverPort.Credentials, "serverPort");
Preconditions.CheckState(!startRequested);
var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port);
int boundPort;

@ -91,7 +91,7 @@ namespace Grpc.Core
{
this.keyCertificatePairs = new List<KeyCertificatePair>(keyCertificatePairs).AsReadOnly();
Preconditions.CheckArgument(this.keyCertificatePairs.Count > 0,
"At least one KeyCertificatePair needs to be provided");
"At least one KeyCertificatePair needs to be provided.");
if (forceClientAuth)
{
Preconditions.CheckNotNull(rootCertificates,

@ -31,12 +31,8 @@
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core
{
/// <summary>

@ -62,9 +62,9 @@ namespace Grpc.Core
/// <param name="credentials">credentials to use to secure this port.</param>
public ServerPort(string host, int port, ServerCredentials credentials)
{
this.host = Preconditions.CheckNotNull(host);
this.host = Preconditions.CheckNotNull(host, "host");
this.port = port;
this.credentials = Preconditions.CheckNotNull(credentials);
this.credentials = Preconditions.CheckNotNull(credentials, "credentials");
}
/// <summary>

@ -79,7 +79,7 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
callHandlers.Add(method.GetFullName(serviceName), ServerCalls.UnaryCall(method, handler));
callHandlers.Add(method.FullName, ServerCalls.UnaryCall(method, handler));
return this;
}
@ -89,7 +89,7 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
callHandlers.Add(method.GetFullName(serviceName), ServerCalls.ClientStreamingCall(method, handler));
callHandlers.Add(method.FullName, ServerCalls.ClientStreamingCall(method, handler));
return this;
}
@ -99,7 +99,7 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
callHandlers.Add(method.GetFullName(serviceName), ServerCalls.ServerStreamingCall(method, handler));
callHandlers.Add(method.FullName, ServerCalls.ServerStreamingCall(method, handler));
return this;
}
@ -109,7 +109,7 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
callHandlers.Add(method.GetFullName(serviceName), ServerCalls.DuplexStreamingCall(method, handler));
callHandlers.Add(method.FullName, ServerCalls.DuplexStreamingCall(method, handler));
return this;
}

@ -29,13 +29,12 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
/// Represents RPC result.
/// Represents RPC result, which consists of <see cref="StatusCode"/> and an optional detail string.
/// </summary>
public struct Status
{
@ -52,6 +51,11 @@ namespace Grpc.Core
readonly StatusCode statusCode;
readonly string detail;
/// <summary>
/// Creates a new instance of <c>Status</c>.
/// </summary>
/// <param name="statusCode">Status code.</param>
/// <param name="detail">Detail.</param>
public Status(StatusCode statusCode, string detail)
{
this.statusCode = statusCode;
@ -80,6 +84,9 @@ namespace Grpc.Core
}
}
/// <summary>
/// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Status"/>.
/// </summary>
public override string ToString()
{
return string.Format("Status(StatusCode={0}, Detail=\"{1}\")", statusCode, detail);

@ -31,8 +31,6 @@
#endregion
using System;
namespace Grpc.Core
{
/// <summary>
@ -41,101 +39,101 @@ namespace Grpc.Core
/// </summary>
public enum StatusCode
{
/* Not an error; returned on success */
/// <summary>Not an error; returned on success.</summary>
OK = 0,
/* The operation was cancelled (typically by the caller). */
/// <summary>The operation was cancelled (typically by the caller).</summary>
Cancelled = 1,
/* Unknown error. An example of where this error may be returned is
if a Status value received from another address space belongs to
an error-space that is not known in this address space. Also
errors raised by APIs that do not return enough error information
may be converted to this error. */
/// <summary>
/// Unknown error. An example of where this error may be returned is
/// if a Status value received from another address space belongs to
/// an error-space that is not known in this address space. Also
/// errors raised by APIs that do not return enough error information
/// may be converted to this error.
/// </summary>
Unknown = 2,
/* Client specified an invalid argument. Note that this differs
from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments
that are problematic regardless of the state of the system
(e.g., a malformed file name). */
/// <summary>
/// Client specified an invalid argument. Note that this differs
/// from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments
/// that are problematic regardless of the state of the system
/// (e.g., a malformed file name).
/// </summary>
InvalidArgument = 3,
/* Deadline expired before operation could complete. For operations
that change the state of the system, this error may be returned
even if the operation has completed successfully. For example, a
successful response from a server could have been delayed long
enough for the deadline to expire. */
/// <summary>
/// Deadline expired before operation could complete. For operations
/// that change the state of the system, this error may be returned
/// even if the operation has completed successfully. For example, a
/// successful response from a server could have been delayed long
/// enough for the deadline to expire.
/// </summary>
DeadlineExceeded = 4,
/* Some requested entity (e.g., file or directory) was not found. */
/// <summary>Some requested entity (e.g., file or directory) was not found.</summary>
NotFound = 5,
/* Some entity that we attempted to create (e.g., file or directory)
already exists. */
/// <summary>Some entity that we attempted to create (e.g., file or directory) already exists.</summary>
AlreadyExists = 6,
/* The caller does not have permission to execute the specified
operation. PERMISSION_DENIED must not be used for rejections
caused by exhausting some resource (use RESOURCE_EXHAUSTED
instead for those errors). PERMISSION_DENIED must not be
used if the caller can not be identified (use UNAUTHENTICATED
instead for those errors). */
/// <summary>
/// The caller does not have permission to execute the specified
/// operation. PERMISSION_DENIED must not be used for rejections
/// caused by exhausting some resource (use RESOURCE_EXHAUSTED
/// instead for those errors). PERMISSION_DENIED must not be
/// used if the caller can not be identified (use UNAUTHENTICATED
/// instead for those errors).
/// </summary>
PermissionDenied = 7,
/* The request does not have valid authentication credentials for the
operation. */
/// <summary>The request does not have valid authentication credentials for the operation.</summary>
Unauthenticated = 16,
/* Some resource has been exhausted, perhaps a per-user quota, or
perhaps the entire file system is out of space. */
/// <summary>
/// Some resource has been exhausted, perhaps a per-user quota, or
/// perhaps the entire file system is out of space.
/// </summary>
ResourceExhausted = 8,
/* Operation was rejected because the system is not in a state
required for the operation's execution. For example, directory
to be deleted may be non-empty, an rmdir operation is applied to
a non-directory, etc.
A litmus test that may help a service implementor in deciding
between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE:
(a) Use UNAVAILABLE if the client can retry just the failing call.
(b) Use ABORTED if the client should retry at a higher-level
(e.g., restarting a read-modify-write sequence).
(c) Use FAILED_PRECONDITION if the client should not retry until
the system state has been explicitly fixed. E.g., if an "rmdir"
fails because the directory is non-empty, FAILED_PRECONDITION
should be returned since the client should not retry unless
they have first fixed up the directory by deleting files from it.
(d) Use FAILED_PRECONDITION if the client performs conditional
REST Get/Update/Delete on a resource and the resource on the
server does not match the condition. E.g., conflicting
read-modify-write on the same resource. */
/// <summary>
/// Operation was rejected because the system is not in a state
/// required for the operation's execution. For example, directory
/// to be deleted may be non-empty, an rmdir operation is applied to
/// a non-directory, etc.
/// </summary>
FailedPrecondition = 9,
/* The operation was aborted, typically due to a concurrency issue
like sequencer check failures, transaction aborts, etc.
See litmus test above for deciding between FAILED_PRECONDITION,
ABORTED, and UNAVAILABLE. */
/// <summary>
/// The operation was aborted, typically due to a concurrency issue
/// like sequencer check failures, transaction aborts, etc.
/// </summary>
Aborted = 10,
/* Operation was attempted past the valid range. E.g., seeking or
reading past end of file.
Unlike INVALID_ARGUMENT, this error indicates a problem that may
be fixed if the system state changes. For example, a 32-bit file
system will generate INVALID_ARGUMENT if asked to read at an
offset that is not in the range [0,2^32-1], but it will generate
OUT_OF_RANGE if asked to read from an offset past the current
file size.
There is a fair bit of overlap between FAILED_PRECONDITION and
OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific
error) when it applies so that callers who are iterating through
a space can easily look for an OUT_OF_RANGE error to detect when
they are done. */
/// <summary>
/// Operation was attempted past the valid range. E.g., seeking or
/// reading past end of file.
/// </summary>
OutOfRange = 11,
/* Operation is not implemented or not supported/enabled in this service. */
/// <summary>Operation is not implemented or not supported/enabled in this service.</summary>
Unimplemented = 12,
/* Internal errors. Means some invariants expected by underlying
system has been broken. If you see one of these errors,
something is very broken. */
/// <summary>
/// Internal errors. Means some invariants expected by underlying
/// system has been broken. If you see one of these errors,
/// something is very broken.
/// </summary>
Internal = 13,
/* The service is currently unavailable. This is a most likely a
transient condition and may be corrected by retrying with
a backoff.
See litmus test above for deciding between FAILED_PRECONDITION,
ABORTED, and UNAVAILABLE. */
/// <summary>
/// The service is currently unavailable. This is a most likely a
/// transient condition and may be corrected by retrying with
/// a backoff.
/// </summary>
Unavailable = 14,
/* Unrecoverable data loss or corruption. */
/// <summary>Unrecoverable data loss or corruption.</summary>
DataLoss = 15
}
}

@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Grpc.Core.Utils
@ -46,7 +45,7 @@ namespace Grpc.Core.Utils
/// <summary>
/// Reads the entire stream and executes an async action for each element.
/// </summary>
public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
public static async Task ForEachAsync<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
where T : class
{
while (await streamReader.MoveNext())
@ -58,7 +57,7 @@ namespace Grpc.Core.Utils
/// <summary>
/// Reads the entire stream and creates a list containing all the elements read.
/// </summary>
public static async Task<List<T>> ToList<T>(this IAsyncStreamReader<T> streamReader)
public static async Task<List<T>> ToListAsync<T>(this IAsyncStreamReader<T> streamReader)
where T : class
{
var result = new List<T>();
@ -73,7 +72,7 @@ namespace Grpc.Core.Utils
/// Writes all elements from given enumerable to the stream.
/// Completes the stream afterwards unless close = false.
/// </summary>
public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
public static async Task WriteAllAsync<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
where T : class
{
foreach (var element in elements)
@ -89,7 +88,7 @@ namespace Grpc.Core.Utils
/// <summary>
/// Writes all elements from given enumerable to the stream.
/// </summary>
public static async Task WriteAll<T>(this IServerStreamWriter<T> streamWriter, IEnumerable<T> elements)
public static async Task WriteAllAsync<T>(this IServerStreamWriter<T> streamWriter, IEnumerable<T> elements)
where T : class
{
foreach (var element in elements)

@ -39,6 +39,9 @@ using System.Threading.Tasks;
namespace Grpc.Core.Utils
{
/// <summary>
/// Utility methods to run microbenchmarks.
/// </summary>
public static class BenchmarkUtil
{
/// <summary>

@ -1,57 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
namespace Grpc.Core.Utils
{
public static class ExceptionHelper
{
/// <summary>
/// If inner exceptions contain RpcException, rethrows it.
/// Otherwise, rethrows the original aggregate exception.
/// Always throws, the exception return type is here only to make the.
/// </summary>
public static Exception UnwrapRpcException(AggregateException ae)
{
foreach (var e in ae.InnerExceptions)
{
if (e is RpcException)
{
throw e;
}
}
throw ae;
}
}
}

@ -32,17 +32,16 @@
#endregion
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
namespace Grpc.Core.Utils
{
/// <summary>
/// Utility methods to simplify checking preconditions in the code.
/// </summary>
public static class Preconditions
{
/// <summary>
/// Throws ArgumentException if condition is false.
/// Throws <see cref="ArgumentException"/> if condition is false.
/// </summary>
public static void CheckArgument(bool condition)
{
@ -53,7 +52,7 @@ namespace Grpc.Core.Utils
}
/// <summary>
/// Throws ArgumentException with given message if condition is false.
/// Throws <see cref="ArgumentException"/> with given message if condition is false.
/// </summary>
public static void CheckArgument(bool condition, string errorMessage)
{
@ -64,31 +63,31 @@ namespace Grpc.Core.Utils
}
/// <summary>
/// Throws NullReferenceException if reference is null.
/// Throws <see cref="ArgumentNullException"/> if reference is null.
/// </summary>
public static T CheckNotNull<T>(T reference)
{
if (reference == null)
{
throw new NullReferenceException();
throw new ArgumentNullException();
}
return reference;
}
/// <summary>
/// Throws NullReferenceException with given message if reference is null.
/// Throws <see cref="ArgumentNullException"/> if reference is null.
/// </summary>
public static T CheckNotNull<T>(T reference, string errorMessage)
public static T CheckNotNull<T>(T reference, string paramName)
{
if (reference == null)
{
throw new NullReferenceException(errorMessage);
throw new ArgumentNullException(paramName);
}
return reference;
}
/// <summary>
/// Throws InvalidOperationException if condition is false.
/// Throws <see cref="InvalidOperationException"/> if condition is false.
/// </summary>
public static void CheckState(bool condition)
{
@ -99,7 +98,7 @@ namespace Grpc.Core.Utils
}
/// <summary>
/// Throws InvalidOperationException with given message if condition is false.
/// Throws <see cref="InvalidOperationException"/> with given message if condition is false.
/// </summary>
public static void CheckState(bool condition, string errorMessage)
{

@ -1,5 +1,37 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System.Reflection;
using System.Runtime.CompilerServices;
// The current version of gRPC C#.
[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".0")]

@ -1,8 +1,41 @@
using System.Reflection;
using System.Runtime.CompilerServices;
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
namespace Grpc.Core
{
/// <summary>
/// Provides info about current version of gRPC.
/// </summary>
public static class VersionInfo
{
/// <summary>

@ -109,7 +109,7 @@ namespace math.Tests
{
using (var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build()))
{
var responses = await call.ResponseStream.ToList();
var responses = await call.ResponseStream.ToListAsync();
CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 },
responses.ConvertAll((n) => n.Num_));
}
@ -151,7 +151,7 @@ namespace math.Tests
using (var call = client.Fib(new FibArgs.Builder { Limit = 0 }.Build(),
deadline: DateTime.UtcNow.AddMilliseconds(500)))
{
var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.ToList());
var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.ToListAsync());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
@ -167,7 +167,7 @@ namespace math.Tests
var numbers = new List<long> { 10, 20, 30 }.ConvertAll(
n => Num.CreateBuilder().SetNum_(n).Build());
await call.RequestStream.WriteAll(numbers);
await call.RequestStream.WriteAllAsync(numbers);
var result = await call.ResponseAsync;
Assert.AreEqual(60, result.Num_);
}
@ -185,8 +185,8 @@ namespace math.Tests
using (var call = client.DivMany())
{
await call.RequestStream.WriteAll(divArgsList);
var result = await call.ResponseStream.ToList();
await call.RequestStream.WriteAllAsync(divArgsList);
var result = await call.ResponseStream.ToListAsync();
CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient));
CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder));

@ -54,7 +54,7 @@ namespace math
{
using (var call = client.Fib(new FibArgs.Builder { Limit = 5 }.Build()))
{
List<Num> result = await call.ResponseStream.ToList();
List<Num> result = await call.ResponseStream.ToListAsync();
Console.WriteLine("Fib Result: " + string.Join("|", result));
}
}
@ -70,7 +70,7 @@ namespace math
using (var call = client.Sum())
{
await call.RequestStream.WriteAll(numbers);
await call.RequestStream.WriteAllAsync(numbers);
Console.WriteLine("Sum Result: " + await call.ResponseAsync);
}
}
@ -85,8 +85,8 @@ namespace math
};
using (var call = client.DivMany())
{
await call.RequestStream.WriteAll(divArgsList);
Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList()));
await call.RequestStream.WriteAllAsync(divArgsList);
Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToListAsync()));
}
}
@ -102,7 +102,7 @@ namespace math
Num sum;
using (var sumCall = client.Sum())
{
await sumCall.RequestStream.WriteAll(numbers);
await sumCall.RequestStream.WriteAllAsync(numbers);
sum = await sumCall.ResponseAsync;
}

@ -75,7 +75,7 @@ namespace math
public async Task<Num> Sum(IAsyncStreamReader<Num> requestStream, ServerCallContext context)
{
long sum = 0;
await requestStream.ForEach(async num =>
await requestStream.ForEachAsync(async num =>
{
sum += num.Num_;
});
@ -84,10 +84,7 @@ namespace math
public async Task DivMany(IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream, ServerCallContext context)
{
await requestStream.ForEach(async divArgs =>
{
await responseStream.WriteAsync(DivInternal(divArgs));
});
await requestStream.ForEachAsync(async divArgs => await responseStream.WriteAsync(DivInternal(divArgs)));
}
static DivReply DivInternal(DivArgs args)

@ -92,11 +92,11 @@ namespace Grpc.HealthCheck.Tests
public void NullsRejected()
{
var impl = new HealthServiceImpl();
Assert.Throws(typeof(NullReferenceException), () => impl.SetStatus(null, "", HealthCheckResponse.Types.ServingStatus.SERVING));
Assert.Throws(typeof(NullReferenceException), () => impl.SetStatus("", null, HealthCheckResponse.Types.ServingStatus.SERVING));
Assert.Throws(typeof(ArgumentNullException), () => impl.SetStatus(null, "", HealthCheckResponse.Types.ServingStatus.SERVING));
Assert.Throws(typeof(ArgumentNullException), () => impl.SetStatus("", null, HealthCheckResponse.Types.ServingStatus.SERVING));
Assert.Throws(typeof(NullReferenceException), () => impl.ClearStatus(null, ""));
Assert.Throws(typeof(NullReferenceException), () => impl.ClearStatus("", null));
Assert.Throws(typeof(ArgumentNullException), () => impl.ClearStatus(null, ""));
Assert.Throws(typeof(ArgumentNullException), () => impl.ClearStatus("", null));
}
private static HealthCheckResponse.Types.ServingStatus GetStatusHelper(HealthServiceImpl impl, string host, string service)

@ -10,6 +10,10 @@
<assemblyIdentity name="System.Net.Http" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-4.2.28.0" newVersion="4.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Google.Apis.Core" publicKeyToken="4b01fa6e34db77ab" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.9.2.38523" newVersion="1.9.2.38523" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

@ -10,6 +10,10 @@
<assemblyIdentity name="System.Net.Http" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-4.2.28.0" newVersion="4.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Google.Apis.Core" publicKeyToken="4b01fa6e34db77ab" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.9.2.38523" newVersion="1.9.2.38523" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

@ -8,7 +8,7 @@
<RootNamespace>Grpc.IntegrationTesting</RootNamespace>
<AssemblyName>Grpc.IntegrationTesting</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<NuGetPackageImportStamp>041c163e</NuGetPackageImportStamp>
<NuGetPackageImportStamp>6566287f</NuGetPackageImportStamp>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@ -38,20 +38,33 @@
<AssemblyOriginatorKeyFile>C:\keys\Grpc.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
<Reference Include="BouncyCastle.Crypto">
<Reference Include="BouncyCastle.Crypto, Version=1.7.4137.9688, Culture=neutral, PublicKeyToken=a4292a325f69b123, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\BouncyCastle.1.7.0\lib\Net40-Client\BouncyCastle.Crypto.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Auth, Version=1.9.2.27817, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<Reference Include="Google.Apis.Auth, Version=1.9.3.19379, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.Apis.Auth.1.9.3\lib\net40\Google.Apis.Auth.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Auth.PlatformServices, Version=1.9.3.19383, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.Apis.Auth.1.9.3\lib\net40\Google.Apis.Auth.PlatformServices.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Core, Version=1.9.3.19379, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.Apis.Core.1.9.3\lib\portable-net40+sl50+win+wpa81+wp80\Google.Apis.Core.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Threading.Tasks, Version=1.0.12.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.Apis.Auth.1.9.2\lib\net40\Google.Apis.Auth.dll</HintPath>
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Auth.PlatformServices, Version=1.9.2.27820, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<Reference Include="Microsoft.Threading.Tasks.Extensions, Version=1.0.12.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.Apis.Auth.1.9.2\lib\net40\Google.Apis.Auth.PlatformServices.dll</HintPath>
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Core, Version=1.9.2.27816, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<Reference Include="Microsoft.Threading.Tasks.Extensions.Desktop, Version=1.0.168.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.Apis.Core.1.9.2\lib\portable-net40+sl50+win+wpa81+wp80\Google.Apis.Core.dll</HintPath>
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.Desktop.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=7.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -78,15 +91,6 @@
<HintPath>..\packages\Microsoft.Net.Http.2.2.29\lib\net45\System.Net.Http.Primitives.dll</HintPath>
</Reference>
<Reference Include="System.Net.Http.WebRequest" />
<Reference Include="Microsoft.Threading.Tasks">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Threading.Tasks.Extensions">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Threading.Tasks.Extensions.Desktop">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.Desktop.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="..\Grpc.Core\Version.cs">

@ -43,6 +43,7 @@ using Grpc.Auth;
using Grpc.Core;
using Grpc.Core.Utils;
using NUnit.Framework;
using Google.Apis.Auth.OAuth2;
namespace Grpc.IntegrationTesting
{
@ -97,10 +98,10 @@ namespace Grpc.IntegrationTesting
}
var interopClient = new InteropClient(options);
interopClient.Run();
interopClient.Run().Wait();
}
private void Run()
private async Task Run()
{
Credentials credentials = null;
if (options.useTls)
@ -120,17 +121,7 @@ namespace Grpc.IntegrationTesting
using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions))
{
TestService.TestServiceClient client = new TestService.TestServiceClient(channel);
if (options.testCase == "service_account_creds" || options.testCase == "compute_engine_creds")
{
var credential = GoogleCredential.GetApplicationDefault();
if (credential.IsCreateScopedRequired)
{
credential = credential.CreateScoped(new[] { AuthScope });
}
client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
}
RunTestCaseAsync(options.testCase, client).Wait();
await RunTestCaseAsync(options.testCase, client);
}
GrpcEnvironment.Shutdown();
}
@ -158,16 +149,19 @@ namespace Grpc.IntegrationTesting
await RunEmptyStreamAsync(client);
break;
case "service_account_creds":
RunServiceAccountCreds(client);
await RunServiceAccountCredsAsync(client);
break;
case "compute_engine_creds":
RunComputeEngineCreds(client);
await RunComputeEngineCredsAsync(client);
break;
case "jwt_token_creds":
await RunJwtTokenCredsAsync(client);
break;
case "oauth2_auth_token":
RunOAuth2AuthToken(client);
await RunOAuth2AuthTokenAsync(client);
break;
case "per_rpc_creds":
RunPerRpcCreds(client);
await RunPerRpcCredsAsync(client);
break;
case "cancel_after_begin":
await RunCancelAfterBeginAsync(client);
@ -215,7 +209,7 @@ namespace Grpc.IntegrationTesting
using (var call = client.StreamingInputCall())
{
await call.RequestStream.WriteAll(bodySizes);
await call.RequestStream.WriteAllAsync(bodySizes);
var response = await call.ResponseAsync;
Assert.AreEqual(74922, response.AggregatedPayloadSize);
@ -237,7 +231,7 @@ namespace Grpc.IntegrationTesting
using (var call = client.StreamingOutputCall(request))
{
var responseList = await call.ResponseStream.ToList();
var responseList = await call.ResponseStream.ToListAsync();
foreach (var res in responseList)
{
Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
@ -303,15 +297,19 @@ namespace Grpc.IntegrationTesting
{
await call.RequestStream.CompleteAsync();
var responseList = await call.ResponseStream.ToList();
var responseList = await call.ResponseStream.ToListAsync();
Assert.AreEqual(0, responseList.Count);
}
Console.WriteLine("Passed!");
}
public static void RunServiceAccountCreds(TestService.ITestServiceClient client)
public static async Task RunServiceAccountCredsAsync(TestService.TestServiceClient client)
{
Console.WriteLine("running service_account_creds");
var credential = await GoogleCredential.GetApplicationDefaultAsync();
credential = credential.CreateScoped(new[] { AuthScope });
client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.SetResponseSize(314159)
@ -329,9 +327,13 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!");
}
public static void RunComputeEngineCreds(TestService.ITestServiceClient client)
public static async Task RunComputeEngineCredsAsync(TestService.TestServiceClient client)
{
Console.WriteLine("running compute_engine_creds");
var credential = await GoogleCredential.GetApplicationDefaultAsync();
Assert.IsFalse(credential.IsCreateScopedRequired);
client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.SetResponseSize(314159)
@ -349,12 +351,35 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!");
}
public static void RunOAuth2AuthToken(TestService.TestServiceClient client)
public static async Task RunJwtTokenCredsAsync(TestService.TestServiceClient client)
{
Console.WriteLine("running jwt_token_creds");
var credential = await GoogleCredential.GetApplicationDefaultAsync();
// check this a credential with scope support, but don't add the scope.
Assert.IsTrue(credential.IsCreateScopedRequired);
client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.SetResponseSize(314159)
.SetPayload(CreateZerosPayload(271828))
.SetFillUsername(true)
.SetFillOauthScope(true)
.Build();
var response = client.UnaryCall(request);
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(314159, response.Payload.Body.Length);
Assert.AreEqual(ServiceAccountUser, response.Username);
Console.WriteLine("Passed!");
}
public static async Task RunOAuth2AuthTokenAsync(TestService.TestServiceClient client)
{
Console.WriteLine("running oauth2_auth_token");
var credential = GoogleCredential.GetApplicationDefault().CreateScoped(new[] { AuthScope });
Assert.IsTrue(credential.RequestAccessTokenAsync(CancellationToken.None).Result);
string oauth2Token = credential.Token.AccessToken;
ITokenAccess credential = (await GoogleCredential.GetApplicationDefaultAsync()).CreateScoped(new[] { AuthScope });
string oauth2Token = await credential.GetAccessTokenForRequestAsync();
client.HeaderInterceptor = OAuth2Interceptors.FromAccessToken(oauth2Token);
@ -370,13 +395,12 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!");
}
public static void RunPerRpcCreds(TestService.TestServiceClient client)
public static async Task RunPerRpcCredsAsync(TestService.TestServiceClient client)
{
Console.WriteLine("running per_rpc_creds");
var credential = GoogleCredential.GetApplicationDefault().CreateScoped(new[] { AuthScope });
Assert.IsTrue(credential.RequestAccessTokenAsync(CancellationToken.None).Result);
string oauth2Token = credential.Token.AccessToken;
ITokenAccess credential = (await GoogleCredential.GetApplicationDefaultAsync()).CreateScoped(new[] { AuthScope });
string oauth2Token = await credential.GetAccessTokenForRequestAsync();
var headerInterceptor = OAuth2Interceptors.FromAccessToken(oauth2Token);
var request = SimpleRequest.CreateBuilder()
@ -385,7 +409,7 @@ namespace Grpc.IntegrationTesting
.Build();
var headers = new Metadata();
headerInterceptor(headers);
headerInterceptor("", headers);
var response = client.UnaryCall(request, headers: headers);
Assert.AreEqual(AuthScopeResponse, response.OauthScope);

@ -71,7 +71,7 @@ namespace grpc.testing
public async Task<StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<StreamingInputCallRequest> requestStream, ServerCallContext context)
{
int sum = 0;
await requestStream.ForEach(async request =>
await requestStream.ForEachAsync(async request =>
{
sum += request.Payload.Body.Length;
});
@ -80,7 +80,7 @@ namespace grpc.testing
public async Task FullDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context)
{
await requestStream.ForEach(async request =>
await requestStream.ForEachAsync(async request =>
{
foreach (var responseParam in request.ResponseParametersList)
{

@ -10,6 +10,10 @@
<assemblyIdentity name="System.Net.Http" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-4.2.28.0" newVersion="4.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Google.Apis.Core" publicKeyToken="4b01fa6e34db77ab" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.9.2.38523" newVersion="1.9.2.38523" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

@ -1,8 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="BouncyCastle" version="1.7.0" targetFramework="net45" />
<package id="Google.Apis.Auth" version="1.9.2" targetFramework="net45" />
<package id="Google.Apis.Core" version="1.9.2" targetFramework="net45" />
<package id="Google.Apis.Auth" version="1.9.3" targetFramework="net45" />
<package id="Google.Apis.Core" version="1.9.3" targetFramework="net45" />
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl" version="1.1.10" targetFramework="net45" />

@ -11,7 +11,8 @@
'-pedantic',
'-g',
'-zdefs',
'-Werror'
'-Werror',
'-Wno-error=deprecated-declarations'
],
'ldflags': [
'-g'

@ -63,7 +63,7 @@ function runTest(iterations, callback) {
var timeDiff = process.hrtime(startTime);
intervals[i] = timeDiff[0] * 1000000 + timeDiff[1] / 1000;
next(i+1);
}, {}, deadline);
}, {}, {deadline: deadline});
}
}
next(0);

@ -510,10 +510,21 @@ NAN_METHOD(Call::New) {
NanUtf8String method(args[1]);
double deadline = args[2]->NumberValue();
grpc_channel *wrapped_channel = channel->GetWrappedChannel();
grpc_call *wrapped_call = grpc_channel_create_call(
wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
CompletionQueueAsyncWorker::GetQueue(), *method, channel->GetHost(),
MillisecondsToTimespec(deadline));
grpc_call *wrapped_call;
if (args[3]->IsString()) {
NanUtf8String host_override(args[3]);
wrapped_call = grpc_channel_create_call(
wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
CompletionQueueAsyncWorker::GetQueue(), *method,
*host_override, MillisecondsToTimespec(deadline));
} else if (args[3]->IsUndefined() || args[3]->IsNull()) {
wrapped_call = grpc_channel_create_call(
wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
CompletionQueueAsyncWorker::GetQueue(), *method,
NULL, MillisecondsToTimespec(deadline));
} else {
return NanThrowTypeError("Call's fourth argument must be a string");
}
call = new Call(wrapped_call);
args.This()->SetHiddenValue(NanNew("channel_"), channel_object);
}

@ -59,14 +59,12 @@ using v8::Value;
NanCallback *Channel::constructor;
Persistent<FunctionTemplate> Channel::fun_tpl;
Channel::Channel(grpc_channel *channel, NanUtf8String *host)
: wrapped_channel(channel), host(host) {}
Channel::Channel(grpc_channel *channel) : wrapped_channel(channel) {}
Channel::~Channel() {
if (wrapped_channel != NULL) {
grpc_channel_destroy(wrapped_channel);
}
delete host;
}
void Channel::Init(Handle<Object> exports) {
@ -91,8 +89,6 @@ bool Channel::HasInstance(Handle<Value> val) {
grpc_channel *Channel::GetWrappedChannel() { return this->wrapped_channel; }
char *Channel::GetHost() { return **this->host; }
NAN_METHOD(Channel::New) {
NanScope();
@ -103,8 +99,7 @@ NAN_METHOD(Channel::New) {
}
grpc_channel *wrapped_channel;
// Owned by the Channel object
NanUtf8String *host = new NanUtf8String(args[0]);
NanUtf8String *host_override = NULL;
NanUtf8String host(args[0]);
grpc_credentials *creds;
if (!Credentials::HasInstance(args[1])) {
return NanThrowTypeError(
@ -116,12 +111,9 @@ NAN_METHOD(Channel::New) {
grpc_channel_args *channel_args_ptr;
if (args[2]->IsUndefined()) {
channel_args_ptr = NULL;
wrapped_channel = grpc_insecure_channel_create(**host, NULL);
wrapped_channel = grpc_insecure_channel_create(*host, NULL);
} else if (args[2]->IsObject()) {
Handle<Object> args_hash(args[2]->ToObject()->Clone());
if (args_hash->HasOwnProperty(NanNew(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG))) {
host_override = new NanUtf8String(args_hash->Get(NanNew(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)));
}
Handle<Array> keys(args_hash->GetOwnPropertyNames());
grpc_channel_args channel_args;
channel_args.num_args = keys->Length();
@ -153,20 +145,15 @@ NAN_METHOD(Channel::New) {
return NanThrowTypeError("Channel expects a string and an object");
}
if (creds == NULL) {
wrapped_channel = grpc_insecure_channel_create(**host, channel_args_ptr);
wrapped_channel = grpc_insecure_channel_create(*host, channel_args_ptr);
} else {
wrapped_channel =
grpc_secure_channel_create(creds, **host, channel_args_ptr);
grpc_secure_channel_create(creds, *host, channel_args_ptr);
}
if (channel_args_ptr != NULL) {
free(channel_args_ptr->args);
}
Channel *channel;
if (host_override == NULL) {
channel = new Channel(wrapped_channel, host);
} else {
channel = new Channel(wrapped_channel, host_override);
}
Channel *channel = new Channel(wrapped_channel);
channel->Wrap(args.This());
NanReturnValue(args.This());
} else {

@ -53,11 +53,8 @@ class Channel : public ::node::ObjectWrap {
/* Returns the grpc_channel struct that this object wraps */
grpc_channel *GetWrappedChannel();
/* Return the hostname that this channel connects to */
char *GetHost();
private:
explicit Channel(grpc_channel *channel, NanUtf8String *host);
explicit Channel(grpc_channel *channel);
~Channel();
// Prevent copying
@ -71,7 +68,6 @@ class Channel : public ::node::ObjectWrap {
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_channel *wrapped_channel;
NanUtf8String *host;
};
} // namespace node

@ -67,7 +67,7 @@ function zeroBuffer(size) {
* primarily for use with mocha
*/
function emptyUnary(client, done) {
var call = client.emptyCall({}, function(err, resp) {
client.emptyCall({}, function(err, resp) {
assert.ifError(err);
if (done) {
done();
@ -89,7 +89,7 @@ function largeUnary(client, done) {
body: zeroBuffer(271828)
}
};
var call = client.unaryCall(arg, function(err, resp) {
client.unaryCall(arg, function(err, resp) {
assert.ifError(err);
assert.strictEqual(resp.payload.type, 'COMPRESSABLE');
assert.strictEqual(resp.payload.body.length, 314159);
@ -259,7 +259,7 @@ function cancelAfterFirstResponse(client, done) {
function timeoutOnSleepingServer(client, done) {
var deadline = new Date();
deadline.setMilliseconds(deadline.getMilliseconds() + 1);
var call = client.fullDuplexCall(null, deadline);
var call = client.fullDuplexCall(null, {deadline: deadline});
call.write({
payload: {body: zeroBuffer(27182)}
});
@ -293,12 +293,14 @@ function authTest(expected_user, scope, client, done) {
fill_username: true,
fill_oauth_scope: true
};
var call = client.unaryCall(arg, function(err, resp) {
client.unaryCall(arg, function(err, resp) {
assert.ifError(err);
assert.strictEqual(resp.payload.type, 'COMPRESSABLE');
assert.strictEqual(resp.payload.body.length, 314159);
assert.strictEqual(resp.username, expected_user);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
if (scope) {
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
}
if (done) {
done();
}
@ -328,14 +330,14 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) {
};
var makeTestCall = function(error, client_metadata) {
assert.ifError(error);
var call = client.unaryCall(arg, function(err, resp) {
client.unaryCall(arg, function(err, resp) {
assert.ifError(err);
assert.strictEqual(resp.username, expected_user);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
if (done) {
done();
}
});
}, client_metadata);
};
if (per_rpc) {
updateMetadata('', {}, makeTestCall);
@ -393,6 +395,7 @@ function runTest(address, host_override, test_case, tls, test_ca, done) {
creds = grpc.Credentials.createSsl(ca_data);
if (host_override) {
options['grpc.ssl_target_name_override'] = host_override;
options['grpc.default_authority'] = host_override;
}
} else {
creds = grpc.Credentials.createInsecure();

@ -207,6 +207,25 @@ ClientReadableStream.prototype.getPeer = getPeer;
ClientWritableStream.prototype.getPeer = getPeer;
ClientDuplexStream.prototype.getPeer = getPeer;
/**
* Get a call object built with the provided options. Keys for options are
* 'deadline', which takes a date or number, and 'host', which takes a string
* and overrides the hostname to connect to.
* @param {Object} options Options map.
*/
function getCall(channel, method, options) {
var deadline;
var host;
if (options) {
deadline = options.deadline;
host = options.host;
}
if (deadline === undefined) {
deadline = Infinity;
}
return new grpc.Call(channel, method, deadline, host);
}
/**
* Get a function that can make unary requests to the specified method.
* @param {string} method The name of the method to request
@ -226,17 +245,13 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @param {Object=} options Options map
* @return {EventEmitter} An event emitter for stream related events
*/
function makeUnaryRequest(argument, callback, metadata, deadline) {
function makeUnaryRequest(argument, callback, metadata, options) {
/* jshint validthis: true */
if (deadline === undefined) {
deadline = Infinity;
}
var emitter = new EventEmitter();
var call = new grpc.Call(this.channel, method, deadline);
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
}
@ -300,16 +315,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @param {Object=} options Options map
* @return {EventEmitter} An event emitter for stream related events
*/
function makeClientStreamRequest(callback, metadata, deadline) {
function makeClientStreamRequest(callback, metadata, options) {
/* jshint validthis: true */
if (deadline === undefined) {
deadline = Infinity;
}
var call = new grpc.Call(this.channel, method, deadline);
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
}
@ -374,16 +385,12 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
* serialize
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @param {Object} options Options map
* @return {EventEmitter} An event emitter for stream related events
*/
function makeServerStreamRequest(argument, metadata, deadline) {
function makeServerStreamRequest(argument, metadata, options) {
/* jshint validthis: true */
if (deadline === undefined) {
deadline = Infinity;
}
var call = new grpc.Call(this.channel, method, deadline);
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
}
@ -446,16 +453,12 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
* @this {SurfaceClient} Client object. Must have a channel member.
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @param {Options} options Options map
* @return {EventEmitter} An event emitter for stream related events
*/
function makeBidiStreamRequest(metadata, deadline) {
function makeBidiStreamRequest(metadata, options) {
/* jshint validthis: true */
if (deadline === undefined) {
deadline = Infinity;
}
var call = new grpc.Call(this.channel, method, deadline);
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
}
@ -523,7 +526,7 @@ var requester_makers = {
* requestSerialize: function to serialize request objects
* responseDeserialize: function to deserialize response objects
* @param {Object} methods An object mapping method names to method attributes
* @param {string} serviceName The name of the service
* @param {string} serviceName The fully qualified name of the service
* @return {function(string, Object)} New client constructor
*/
exports.makeClientConstructor = function(methods, serviceName) {
@ -548,8 +551,10 @@ exports.makeClientConstructor = function(methods, serviceName) {
}
options['grpc.primary_user_agent'] = 'grpc-node/' + version;
this.channel = new grpc.Channel(address, credentials, options);
this.server_address = address.replace(/\/$/, '');
this.auth_uri = this.server_address + '/' + serviceName;
// Remove the optional DNS scheme, trailing port, and trailing backslash
address = address.replace(/^(dns:\/{3})?([^:\/]+)(:\d+)?\/?$/, '$2');
this.server_address = address;
this.auth_uri = 'https://' + this.server_address + '/' + serviceName;
this.updateMetadata = updateMetadata;
}
@ -587,7 +592,8 @@ exports.makeClientConstructor = function(methods, serviceName) {
*/
exports.makeProtobufClientConstructor = function(service) {
var method_attrs = common.getProtobufServiceAttrs(service, service.name);
var Client = exports.makeClientConstructor(method_attrs);
var Client = exports.makeClientConstructor(
method_attrs, common.fullyQualifiedName(service));
Client.service = service;
return Client;
};

@ -84,6 +84,11 @@ describe('call', function() {
new grpc.Call(channel, 'method', 0);
});
});
it('should accept an optional fourth string parameter', function() {
assert.doesNotThrow(function() {
new grpc.Call(channel, 'method', new Date(), 'host_override');
});
});
it('should fail with a closed channel', function() {
var local_channel = new grpc.Channel('hostname', insecureCreds);
local_channel.close();

@ -74,8 +74,6 @@ describe('end-to-end', function() {
});
it('should start and end a request without error', function(complete) {
var done = multiDone(complete, 2);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'xyz';
var call = new grpc.Call(channel,
'dummy_method',
@ -126,8 +124,6 @@ describe('end-to-end', function() {
});
it('should successfully send and receive metadata', function(complete) {
var done = multiDone(complete, 2);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'xyz';
var call = new grpc.Call(channel,
'dummy_method',
@ -184,8 +180,6 @@ describe('end-to-end', function() {
var req_text = 'client_request';
var reply_text = 'server_response';
var done = multiDone(complete, 2);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'success';
var call = new grpc.Call(channel,
'dummy_method',
@ -241,8 +235,6 @@ describe('end-to-end', function() {
it('should send multiple messages', function(complete) {
var done = multiDone(complete, 2);
var requests = ['req1', 'req2'];
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'xyz';
var call = new grpc.Call(channel,
'dummy_method',

@ -48,15 +48,15 @@
typedef void (^GRXValueHandler)(id value);
typedef void (^GRXCompletionHandler)(NSError *errorOrNil);
typedef void (^GRXSingleValueHandler)(id value, NSError *errorOrNil);
typedef void (^GRXStreamHandler)(BOOL done, id value, NSError *error);
typedef void (^GRXSingleHandler)(id value, NSError *errorOrNil);
typedef void (^GRXEventHandler)(BOOL done, id value, NSError *error);
// Utility to create objects that conform to the GRXWriteable protocol, from
// blocks that handle each of the two methods of the protocol.
@interface GRXWriteable : NSObject<GRXWriteable>
+ (instancetype)writeableWithSingleValueHandler:(GRXSingleValueHandler)handler;
+ (instancetype)writeableWithStreamHandler:(GRXStreamHandler)handler;
+ (instancetype)writeableWithSingleHandler:(GRXSingleHandler)handler;
+ (instancetype)writeableWithEventHandler:(GRXEventHandler)handler;
- (instancetype)initWithValueHandler:(GRXValueHandler)valueHandler
completionHandler:(GRXCompletionHandler)completionHandler

@ -38,7 +38,7 @@
GRXCompletionHandler _completionHandler;
}
+ (instancetype)writeableWithSingleValueHandler:(GRXSingleValueHandler)handler {
+ (instancetype)writeableWithSingleHandler:(GRXSingleHandler)handler {
if (!handler) {
return [[self alloc] init];
}
@ -51,7 +51,7 @@
}];
}
+ (instancetype)writeableWithStreamHandler:(GRXStreamHandler)handler {
+ (instancetype)writeableWithEventHandler:(GRXEventHandler)handler {
if (!handler) {
return [[self alloc] init];
}

@ -55,7 +55,7 @@
return [[self alloc] init];
}
- (GRXSingleValueHandler)block {
- (GRXSingleHandler)block {
return ^(id value, NSError *errorOrNil) {
++_timesCalled;
_value = value;
@ -71,13 +71,13 @@
#pragma mark Writeable
- (void)testWriteableSingleValueHandlerIsCalledForValue {
- (void)testWriteableSingleHandlerIsCalledForValue {
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
id anyValue = @7;
// If:
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleValueHandler:handler.block];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
[writeable writeValue:anyValue];
// Then:
@ -86,13 +86,13 @@
XCTAssertEqualObjects(handler.errorOrNil, nil);
}
- (void)testWriteableSingleValueHandlerIsCalledForError {
- (void)testWriteableSingleHandlerIsCalledForError {
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil];
// If:
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleValueHandler:handler.block];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
[writeable writesFinishedWithError:anyError];
// Then:
@ -106,7 +106,7 @@
- (void)testBufferedPipePropagatesValue {
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleValueHandler:handler.block];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
id anyValue = @7;
// If:
@ -123,7 +123,7 @@
- (void)testBufferedPipePropagatesError {
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleValueHandler:handler.block];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil];
// If:

@ -113,6 +113,7 @@ Call *pygrpc_Call_new_empty(CompletionQueue *cq);
void pygrpc_Call_dealloc(Call *self);
PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Call_peer(Call *self);
extern PyTypeObject pygrpc_Call_type;
@ -129,6 +130,11 @@ Channel *pygrpc_Channel_new(
void pygrpc_Channel_dealloc(Channel *self);
Call *pygrpc_Channel_create_call(
Channel *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Channel_check_connectivity_state(Channel *self, PyObject *args,
PyObject *kwargs);
PyObject *pygrpc_Channel_watch_connectivity_state(Channel *self, PyObject *args,
PyObject *kwargs);
PyObject *pygrpc_Channel_target(Channel *self);
extern PyTypeObject pygrpc_Channel_type;
@ -181,6 +187,9 @@ pygrpc_tag *pygrpc_produce_request_tag(PyObject *user_tag, Call *empty_call);
/* Construct a tag associated with a server shutdown. */
pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag);
/* Construct a tag associated with a channel state change. */
pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag);
/* Frees all resources owned by the tag and the tag itself. */
void pygrpc_discard_tag(pygrpc_tag *tag);

@ -42,6 +42,7 @@
PyMethodDef pygrpc_Call_methods[] = {
{"start_batch", (PyCFunction)pygrpc_Call_start_batch, METH_KEYWORDS, ""},
{"cancel", (PyCFunction)pygrpc_Call_cancel, METH_KEYWORDS, ""},
{"peer", (PyCFunction)pygrpc_Call_peer, METH_NOARGS, ""},
{NULL}
};
const char pygrpc_Call_doc[] = "See grpc._adapter._types.Call.";
@ -161,3 +162,10 @@ PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs) {
}
return PyInt_FromLong(errcode);
}
PyObject *pygrpc_Call_peer(Call *self) {
char *peer = grpc_call_get_peer(self->c_call);
PyObject *py_peer = PyString_FromString(peer);
gpr_free(peer);
return py_peer;
}

@ -36,10 +36,14 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
PyMethodDef pygrpc_Channel_methods[] = {
{"create_call", (PyCFunction)pygrpc_Channel_create_call, METH_KEYWORDS, ""},
{"check_connectivity_state", (PyCFunction)pygrpc_Channel_check_connectivity_state, METH_KEYWORDS, ""},
{"watch_connectivity_state", (PyCFunction)pygrpc_Channel_watch_connectivity_state, METH_KEYWORDS, ""},
{"target", (PyCFunction)pygrpc_Channel_target, METH_NOARGS, ""},
{NULL}
};
const char pygrpc_Channel_doc[] = "See grpc._adapter._types.Channel.";
@ -122,7 +126,7 @@ Call *pygrpc_Channel_create_call(
const char *host;
double deadline;
char *keywords[] = {"cq", "method", "host", "deadline", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!ssd:create_call", keywords,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!szd:create_call", keywords,
&pygrpc_CompletionQueue_type, &cq, &method, &host, &deadline)) {
return NULL;
}
@ -132,3 +136,51 @@ Call *pygrpc_Channel_create_call(
pygrpc_cast_double_to_gpr_timespec(deadline));
return call;
}
PyObject *pygrpc_Channel_check_connectivity_state(
Channel *self, PyObject *args, PyObject *kwargs) {
PyObject *py_try_to_connect;
int try_to_connect;
char *keywords[] = {"try_to_connect", NULL};
grpc_connectivity_state state;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:connectivity_state", keywords,
&py_try_to_connect)) {
return NULL;
}
if (!PyBool_Check(py_try_to_connect)) {
Py_XDECREF(py_try_to_connect);
return NULL;
}
try_to_connect = Py_True == py_try_to_connect;
Py_DECREF(py_try_to_connect);
state = grpc_channel_check_connectivity_state(self->c_chan, try_to_connect);
return PyInt_FromLong(state);
}
PyObject *pygrpc_Channel_watch_connectivity_state(
Channel *self, PyObject *args, PyObject *kwargs) {
PyObject *tag;
double deadline;
int last_observed_state;
CompletionQueue *completion_queue;
char *keywords[] = {"last_observed_state", "deadline",
"completion_queue", "tag"};
if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "idO!O:watch_connectivity_state", keywords,
&last_observed_state, &deadline, &pygrpc_CompletionQueue_type,
&completion_queue, &tag)) {
return NULL;
}
grpc_channel_watch_connectivity_state(
self->c_chan, (grpc_connectivity_state)last_observed_state,
pygrpc_cast_double_to_gpr_timespec(deadline), completion_queue->c_cq,
pygrpc_produce_channel_state_change_tag(tag));
Py_RETURN_NONE;
}
PyObject *pygrpc_Channel_target(Channel *self) {
char *target = grpc_channel_get_target(self->c_chan);
PyObject *py_target = PyString_FromString(target);
gpr_free(target);
return py_target;
}

@ -96,7 +96,7 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
PyObject *py_args;
grpc_channel_args c_args;
char *keywords[] = {"cq", "args", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O:Channel", keywords,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O:Server", keywords,
&pygrpc_CompletionQueue_type, &cq, &py_args)) {
return NULL;
}

@ -88,6 +88,19 @@ pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag) {
return tag;
}
pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag) {
pygrpc_tag *tag = gpr_malloc(sizeof(pygrpc_tag));
tag->user_tag = user_tag;
Py_XINCREF(tag->user_tag);
tag->call = NULL;
tag->ops = NULL;
tag->nops = 0;
grpc_call_details_init(&tag->request_call_details);
grpc_metadata_array_init(&tag->request_metadata);
tag->is_new_call = 0;
return tag;
}
void pygrpc_discard_tag(pygrpc_tag *tag) {
if (!tag) {
return;
@ -139,7 +152,7 @@ PyObject *pygrpc_consume_event(grpc_event event) {
}
int pygrpc_produce_op(PyObject *op, grpc_op *result) {
static const int OP_TUPLE_SIZE = 5;
static const int OP_TUPLE_SIZE = 6;
static const int STATUS_TUPLE_SIZE = 2;
static const int TYPE_INDEX = 0;
static const int INITIAL_METADATA_INDEX = 1;
@ -148,6 +161,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
static const int STATUS_INDEX = 4;
static const int STATUS_CODE_INDEX = 0;
static const int STATUS_DETAILS_INDEX = 1;
static const int WRITE_FLAGS_INDEX = 5;
int type;
Py_ssize_t message_size;
char *message;
@ -170,7 +184,10 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
return 0;
}
c_op.op = type;
c_op.flags = 0;
c_op.flags = PyInt_AsLong(PyTuple_GET_ITEM(op, WRITE_FLAGS_INDEX));
if (PyErr_Occurred()) {
return 0;
}
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
if (!pygrpc_cast_pyseq_to_send_metadata(

@ -127,7 +127,7 @@ class Call(object):
def write(self, message, tag):
return self._internal.start_batch([
_types.OpArgs.send_message(message)
_types.OpArgs.send_message(message, 0)
], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED))
def complete(self, tag):

@ -75,6 +75,9 @@ class Call(_types.Call):
else:
return self.call.cancel(code, details)
def peer(self):
return self.call.peer()
class Channel(_types.Channel):
@ -88,6 +91,17 @@ class Channel(_types.Channel):
def create_call(self, completion_queue, method, host, deadline=None):
return Call(self.channel.create_call(completion_queue.completion_queue, method, host, deadline))
def check_connectivity_state(self, try_to_connect):
return self.channel.check_connectivity_state(try_to_connect)
def watch_connectivity_state(self, last_observed_state, deadline,
completion_queue, tag):
self.channel.watch_connectivity_state(
last_observed_state, deadline, completion_queue.completion_queue, tag)
def target(self):
return self.channel.target()
_NO_TAG = object()

@ -31,13 +31,12 @@ import abc
import collections
import enum
# TODO(atash): decide whether or not to move these enums to the _c module to
# force build errors with upstream changes.
class GrpcChannelArgumentKeys(enum.Enum):
"""Mirrors keys used in grpc_channel_args for GRPC-specific arguments."""
SSL_TARGET_NAME_OVERRIDE = 'grpc.ssl_target_name_override'
@enum.unique
class CallError(enum.IntEnum):
"""Mirrors grpc_call_error in the C core."""
@ -53,6 +52,7 @@ class CallError(enum.IntEnum):
ERROR_INVALID_FLAGS = 9
ERROR_INVALID_METADATA = 10
@enum.unique
class StatusCode(enum.IntEnum):
"""Mirrors grpc_status_code in the C core."""
@ -74,6 +74,14 @@ class StatusCode(enum.IntEnum):
DATA_LOSS = 15
UNAUTHENTICATED = 16
@enum.unique
class OpWriteFlags(enum.IntEnum):
"""Mirrors defined write-flag constants in the C core."""
WRITE_BUFFER_HINT = 1
WRITE_NO_COMPRESS = 2
@enum.unique
class OpType(enum.IntEnum):
"""Mirrors grpc_op_type in the C core."""
@ -86,12 +94,24 @@ class OpType(enum.IntEnum):
RECV_STATUS_ON_CLIENT = 6
RECV_CLOSE_ON_SERVER = 7
@enum.unique
class EventType(enum.IntEnum):
"""Mirrors grpc_completion_type in the C core."""
QUEUE_SHUTDOWN = 0
QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong
OP_COMPLETE = 2
QUEUE_SHUTDOWN = 0
QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong
OP_COMPLETE = 2
@enum.unique
class ConnectivityState(enum.IntEnum):
"""Mirrors grpc_connectivity_state in the C core."""
IDLE = 0
CONNECTING = 1
READY = 2
TRANSIENT_FAILURE = 3
FATAL_FAILURE = 4
class Status(collections.namedtuple(
'Status', [
@ -105,6 +125,7 @@ class Status(collections.namedtuple(
details (str): ...
"""
class CallDetails(collections.namedtuple(
'CallDetails', [
'method',
@ -119,6 +140,7 @@ class CallDetails(collections.namedtuple(
deadline (float): ...
"""
class OpArgs(collections.namedtuple(
'OpArgs', [
'type',
@ -126,6 +148,7 @@ class OpArgs(collections.namedtuple(
'trailing_metadata',
'message',
'status',
'write_flags',
])):
"""Arguments passed into a GRPC operation.
@ -138,39 +161,40 @@ class OpArgs(collections.namedtuple(
message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None.
status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else
is None.
write_flags (int): a bit OR'ing of 0 or more OpWriteFlags values.
"""
@staticmethod
def send_initial_metadata(initial_metadata):
return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None)
return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None, 0)
@staticmethod
def send_message(message):
return OpArgs(OpType.SEND_MESSAGE, None, None, message, None)
def send_message(message, flags):
return OpArgs(OpType.SEND_MESSAGE, None, None, message, None, flags)
@staticmethod
def send_close_from_client():
return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None)
return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None, 0)
@staticmethod
def send_status_from_server(trailing_metadata, status_code, status_details):
return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details))
return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details), 0)
@staticmethod
def recv_initial_metadata():
return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None);
return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None, 0);
@staticmethod
def recv_message():
return OpArgs(OpType.RECV_MESSAGE, None, None, None, None)
return OpArgs(OpType.RECV_MESSAGE, None, None, None, None, 0)
@staticmethod
def recv_status_on_client():
return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None)
return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None, 0)
@staticmethod
def recv_close_on_server():
return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None)
return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None, 0)
class OpResult(collections.namedtuple(
@ -290,6 +314,15 @@ class Call:
"""
return CallError.ERROR
@abc.abstractmethod
def peer(self):
"""Get the peer of this call.
Returns:
str: the peer of this call.
"""
return None
class Channel:
__metaclass__ = abc.ABCMeta
@ -321,6 +354,40 @@ class Channel:
"""
return None
@abc.abstractmethod
def check_connectivity_state(self, try_to_connect):
"""Check and optionally repair the connectivity state of the channel.
Args:
try_to_connect (bool): whether or not to try to connect the channel if
disconnected.
Returns:
ConnectivityState: state of the channel at the time of this invocation.
"""
return None
@abc.abstractmethod
def watch_connectivity_state(self, last_observed_state, deadline,
completion_queue, tag):
"""Watch for connectivity state changes from the last_observed_state.
Args:
last_observed_state (ConnectivityState): ...
deadline (float): ...
completion_queue (CompletionQueue): ...
tag (object) ...
"""
@abc.abstractmethod
def target(self):
"""Get the target of this channel.
Returns:
str: the target of this channel.
"""
return None
class Server:
__metaclass__ = abc.ABCMeta

@ -117,7 +117,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
client_initial_metadata = [(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]
client_start_batch_result = client_call.start_batch([
_types.OpArgs.send_initial_metadata(client_initial_metadata),
_types.OpArgs.send_message(REQUEST),
_types.OpArgs.send_message(REQUEST, 0),
_types.OpArgs.send_close_from_client(),
_types.OpArgs.recv_initial_metadata(),
_types.OpArgs.recv_message(),
@ -144,6 +144,15 @@ class InsecureServerInsecureClient(unittest.TestCase):
self.assertEquals(HOST, request_event.call_details.host)
self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)
# Check that the channel is connected, and that both it and the call have
# the proper target and peer; do this after the first flurry of messages to
# avoid the possibility that connection was delayed by the core until the
# first message was sent.
self.assertEqual(_types.ConnectivityState.READY,
self.client_channel.check_connectivity_state(False))
self.assertIsNotNone(self.client_channel.target())
self.assertIsNotNone(client_call.peer())
server_call_tag = object()
server_call = request_event.call
server_initial_metadata = [(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)]
@ -151,7 +160,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
server_start_batch_result = server_call.start_batch([
_types.OpArgs.send_initial_metadata(server_initial_metadata),
_types.OpArgs.recv_message(),
_types.OpArgs.send_message(RESPONSE),
_types.OpArgs.send_message(RESPONSE, 0),
_types.OpArgs.recv_close_on_server(),
_types.OpArgs.send_status_from_server(server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
], server_call_tag)

@ -34,15 +34,13 @@ import abc
import unittest # pylint: disable=unused-import
from grpc.framework.face import exceptions
from grpc_test.framework.common import test_constants
from grpc_test.framework.face.testing import control
from grpc_test.framework.face.testing import coverage
from grpc_test.framework.face.testing import digest
from grpc_test.framework.face.testing import stock_service
from grpc_test.framework.face.testing import test_case
_TIMEOUT = 3
_LONG_TIMEOUT = 45
class BlockingInvocationInlineServiceTestCase(
test_case.FaceTestCase, coverage.BlockingCoverage):
@ -79,7 +77,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response = self.stub.blocking_value_in_value_out(
name, request, _LONG_TIMEOUT)
name, request, test_constants.LONG_TIMEOUT)
test_messages.verify(request, response, self)
@ -90,7 +88,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _LONG_TIMEOUT)
name, request, test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
@ -102,7 +100,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response = self.stub.blocking_stream_in_value_out(
name, iter(requests), _LONG_TIMEOUT)
name, iter(requests), test_constants.LONG_TIMEOUT)
test_messages.verify(requests, response, self)
@ -113,7 +111,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _LONG_TIMEOUT)
name, iter(requests), test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
@ -126,12 +124,12 @@ class BlockingInvocationInlineServiceTestCase(
second_request = test_messages.request()
first_response = self.stub.blocking_value_in_value_out(
name, first_request, _TIMEOUT)
name, first_request, test_constants.SHORT_TIMEOUT)
test_messages.verify(first_request, first_response, self)
second_response = self.stub.blocking_value_in_value_out(
name, second_request, _TIMEOUT)
name, second_request, test_constants.SHORT_TIMEOUT)
test_messages.verify(second_request, second_response, self)
@ -144,7 +142,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
multi_callable = self.stub.unary_unary_multi_callable(name)
multi_callable(request, _TIMEOUT)
multi_callable(request, test_constants.SHORT_TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -155,7 +153,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
name, request, test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self):
@ -167,7 +165,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
multi_callable = self.stub.stream_unary_multi_callable(name)
multi_callable(iter(requests), _TIMEOUT)
multi_callable(iter(requests), test_constants.SHORT_TIMEOUT)
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -178,7 +176,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
name, iter(requests), test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self):
@ -188,7 +186,8 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
self.stub.blocking_value_in_value_out(name, request,
test_constants.SHORT_TIMEOUT)
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -198,7 +197,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
name, request, test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
@ -208,7 +207,8 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
self.stub.blocking_stream_in_value_out(name, iter(requests),
test_constants.SHORT_TIMEOUT)
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -218,5 +218,5 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
name, iter(requests), test_constants.SHORT_TIMEOUT)
list(response_iterator)

@ -33,6 +33,7 @@ import abc
import unittest
from grpc.framework.face import interfaces
from grpc_test.framework.common import test_constants
from grpc_test.framework.face.testing import callback as testing_callback
from grpc_test.framework.face.testing import control
from grpc_test.framework.face.testing import coverage
@ -40,8 +41,6 @@ from grpc_test.framework.face.testing import digest
from grpc_test.framework.face.testing import stock_service
from grpc_test.framework.face.testing import test_case
_TIMEOUT = 3
class EventInvocationSynchronousEventServiceTestCase(
test_case.FaceTestCase, coverage.FullCoverage):
@ -79,7 +78,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
self.stub.event_value_in_value_out(
name, request, callback.complete, callback.abort, _TIMEOUT)
name, request, callback.complete, callback.abort,
test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
response = callback.response()
@ -93,7 +93,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
self.stub.event_value_in_stream_out(
name, request, callback, callback.abort, _TIMEOUT)
name, request, callback, callback.abort,
test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
responses = callback.responses()
@ -107,7 +108,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_value_out(
name, callback.complete, callback.abort, _TIMEOUT)
name, callback.complete, callback.abort,
test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
@ -124,7 +126,7 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
name, callback, callback.abort, _TIMEOUT)
name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
@ -147,11 +149,11 @@ class EventInvocationSynchronousEventServiceTestCase(
first_callback.complete(first_response)
self.stub.event_value_in_value_out(
name, second_request, second_callback.complete,
second_callback.abort, _TIMEOUT)
second_callback.abort, test_constants.SHORT_TIMEOUT)
self.stub.event_value_in_value_out(
name, first_request, make_second_invocation, first_callback.abort,
_TIMEOUT)
test_constants.SHORT_TIMEOUT)
second_callback.block_until_terminated()
first_response = first_callback.response()
@ -168,7 +170,8 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.pause():
self.stub.event_value_in_value_out(
name, request, callback.complete, callback.abort, _TIMEOUT)
name, request, callback.complete, callback.abort,
test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
@ -182,7 +185,8 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.pause():
self.stub.event_value_in_stream_out(
name, request, callback, callback.abort, _TIMEOUT)
name, request, callback, callback.abort,
test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
@ -194,7 +198,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
self.stub.event_stream_in_value_out(
name, callback.complete, callback.abort, _TIMEOUT)
name, callback.complete, callback.abort,
test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
@ -207,7 +212,7 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
name, callback, callback.abort, _TIMEOUT)
name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
callback.block_until_terminated()
@ -223,10 +228,12 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
self.stub.event_value_in_value_out(
name, request, callback.complete, callback.abort, _TIMEOUT)
name, request, callback.complete, callback.abort,
test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
callback.abortion())
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -237,10 +244,12 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
self.stub.event_value_in_stream_out(
name, request, callback, callback.abort, _TIMEOUT)
name, request, callback, callback.abort,
test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
callback.abortion())
def testFailedStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
@ -251,13 +260,15 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
unused_call, request_consumer = self.stub.event_stream_in_value_out(
name, callback.complete, callback.abort, _TIMEOUT)
name, callback.complete, callback.abort,
test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
callback.abortion())
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -268,7 +279,7 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
name, callback, callback.abort, _TIMEOUT)
name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
@ -287,10 +298,10 @@ class EventInvocationSynchronousEventServiceTestCase(
self.stub.event_value_in_value_out(
name, first_request, first_callback.complete, first_callback.abort,
_TIMEOUT)
test_constants.SHORT_TIMEOUT)
self.stub.event_value_in_value_out(
name, second_request, second_callback.complete,
second_callback.abort, _TIMEOUT)
second_callback.abort, test_constants.SHORT_TIMEOUT)
first_callback.block_until_terminated()
second_callback.block_until_terminated()
@ -312,7 +323,8 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.pause():
call = self.stub.event_value_in_value_out(
name, request, callback.complete, callback.abort, _TIMEOUT)
name, request, callback.complete, callback.abort,
test_constants.SHORT_TIMEOUT)
call.cancel()
callback.block_until_terminated()
@ -326,7 +338,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
call = self.stub.event_value_in_stream_out(
name, request, callback, callback.abort, _TIMEOUT)
name, request, callback, callback.abort,
test_constants.SHORT_TIMEOUT)
call.cancel()
callback.block_until_terminated()
@ -340,7 +353,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
call, request_consumer = self.stub.event_stream_in_value_out(
name, callback.complete, callback.abort, _TIMEOUT)
name, callback.complete, callback.abort,
test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
call.cancel()
@ -355,7 +369,7 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
call, unused_request_consumer = self.stub.event_stream_in_stream_out(
name, callback, callback.abort, _TIMEOUT)
name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
call.cancel()
callback.block_until_terminated()

@ -37,13 +37,13 @@ import unittest
from grpc.framework.face import exceptions
from grpc.framework.foundation import future
from grpc.framework.foundation import logging_pool
from grpc_test.framework.common import test_constants
from grpc_test.framework.face.testing import control
from grpc_test.framework.face.testing import coverage
from grpc_test.framework.face.testing import digest
from grpc_test.framework.face.testing import stock_service
from grpc_test.framework.face.testing import test_case
_TIMEOUT = 3
_MAXIMUM_POOL_SIZE = 10
@ -110,7 +110,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
request = test_messages.request()
response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT)
name, request, test_constants.SHORT_TIMEOUT)
response = response_future.result()
test_messages.verify(request, response, self)
@ -122,7 +122,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
name, request, test_constants.SHORT_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
@ -138,7 +138,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_future = self.stub.future_stream_in_value_out(
name, request_iterator, _TIMEOUT)
name, request_iterator, test_constants.SHORT_TIMEOUT)
response = response_future.result()
test_messages.verify(requests, response, self)
@ -154,7 +154,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
name, request_iterator, _TIMEOUT)
name, request_iterator, test_constants.SHORT_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
@ -167,13 +167,13 @@ class FutureInvocationAsynchronousEventServiceTestCase(
second_request = test_messages.request()
first_response_future = self.stub.future_value_in_value_out(
name, first_request, _TIMEOUT)
name, first_request, test_constants.SHORT_TIMEOUT)
first_response = first_response_future.result()
test_messages.verify(first_request, first_response, self)
second_response_future = self.stub.future_value_in_value_out(
name, second_request, _TIMEOUT)
name, second_request, test_constants.SHORT_TIMEOUT)
second_response = second_response_future.result()
test_messages.verify(second_request, second_response, self)
@ -186,7 +186,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
multi_callable = self.stub.unary_unary_multi_callable(name)
response_future = multi_callable.future(request, _TIMEOUT)
response_future = multi_callable.future(request,
test_constants.SHORT_TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
@ -200,7 +201,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
name, request, test_constants.SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(response_iterator)
@ -212,7 +213,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
multi_callable = self.stub.stream_unary_multi_callable(name)
response_future = multi_callable.future(iter(requests), _TIMEOUT)
response_future = multi_callable.future(iter(requests),
test_constants.SHORT_TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
@ -226,7 +228,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
name, iter(requests), test_constants.SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(response_iterator)
@ -238,7 +240,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.fail():
response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT)
name, request, test_constants.SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
@ -261,7 +263,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# expiration of the RPC.
with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
name, request, test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
@ -272,7 +274,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.fail():
response_future = self.stub.future_stream_in_value_out(
name, iter(requests), _TIMEOUT)
name, iter(requests), test_constants.SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
@ -295,7 +297,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# expiration of the RPC.
with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
name, iter(requests), test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testParallelInvocations(self):
@ -305,10 +307,11 @@ class FutureInvocationAsynchronousEventServiceTestCase(
first_request = test_messages.request()
second_request = test_messages.request()
# TODO(bug 2039): use LONG_TIMEOUT instead
first_response_future = self.stub.future_value_in_value_out(
name, first_request, _TIMEOUT)
name, first_request, test_constants.SHORT_TIMEOUT)
second_response_future = self.stub.future_value_in_value_out(
name, second_request, _TIMEOUT)
name, second_request, test_constants.SHORT_TIMEOUT)
first_response = first_response_future.result()
second_response = second_response_future.result()
@ -327,7 +330,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT)
name, request, test_constants.SHORT_TIMEOUT)
cancel_method_return_value = response_future.cancel()
self.assertFalse(cancel_method_return_value)
@ -341,7 +344,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
name, request, test_constants.SHORT_TIMEOUT)
response_iterator.cancel()
with self.assertRaises(future.CancelledError):
@ -355,7 +358,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_future = self.stub.future_stream_in_value_out(
name, iter(requests), _TIMEOUT)
name, iter(requests), test_constants.SHORT_TIMEOUT)
cancel_method_return_value = response_future.cancel()
self.assertFalse(cancel_method_return_value)
@ -369,7 +372,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
name, iter(requests), test_constants.SHORT_TIMEOUT)
response_iterator.cancel()
with self.assertRaises(future.CancelledError):

@ -203,7 +203,10 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL;
char *method_chars = StringValueCStr(method);
char *host_chars = StringValueCStr(host);
char *host_chars = NULL;
if (host != Qnil) {
host_chars = StringValueCStr(host);
}
cq = grpc_rb_get_wrapped_completion_queue(cqueue);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);

@ -410,7 +410,7 @@ module GRPC
# @param timeout [TimeConst]
def new_active_call(method, marshal, unmarshal, timeout = nil)
deadline = from_relative_time(timeout.nil? ? @timeout : timeout)
call = @ch.create_call(@queue, method, @host, deadline)
call = @ch.create_call(@queue, method, nil, deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false)
end
end

@ -137,7 +137,7 @@ describe GRPC::Core::Call do
end
def make_test_call
@ch.create_call(client_queue, 'dummy_method', 'dummy_host', deadline)
@ch.create_call(client_queue, 'dummy_method', nil, deadline)
end
def deadline

@ -117,7 +117,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
ch.create_call(cq, 'dummy_method', 'dummy_host', deadline)
ch.create_call(cq, 'dummy_method', nil, deadline)
end
expect(&blk).to_not raise_error
end
@ -128,7 +128,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
ch.create_call(cq, 'dummy_method', 'dummy_host', deadline)
ch.create_call(cq, 'dummy_method', nil, deadline)
end
expect(&blk).to raise_error(RuntimeError)
end

@ -61,7 +61,7 @@ shared_context 'setup: tags' do
end
def new_client_call
@ch.create_call(@client_queue, '/method', 'foo.test.google.fr', deadline)
@ch.create_call(@client_queue, '/method', nil, deadline)
end
end

@ -338,7 +338,7 @@ describe GRPC::ActiveCall do
end
def make_test_call
@ch.create_call(@client_queue, '/method', 'a.dummy.host', deadline)
@ch.create_call(@client_queue, '/method', nil, deadline)
end
def deadline

@ -112,7 +112,7 @@ int main(int argc, char **argv) {
while (!sync.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&sync.pollset, &worker,
gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset));

@ -117,7 +117,7 @@ class Proxy : public ::grpc::cpp::test::util::TestService::Service {
}
private:
std::unique_ptr<::grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr< ::grpc::cpp::test::util::TestService::Stub> stub_;
};
class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {

@ -32,6 +32,8 @@ FROM golang:1.4
# Get the source from GitHub
RUN go get google.golang.org/grpc
RUN go get golang.org/x/oauth2
RUN go get google.golang.org/cloud
# Add a service_account directory containing the auth creds file
ADD service_account service_account

@ -0,0 +1,34 @@
#!/bin/bash
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cp -R /var/local/git-clone/grpc-go/. /go/
go get golang.org/x/oauth2
go get google.golang.org/cloud
cd src/google.golang.org/grpc/interop/client && go install
Loading…
Cancel
Save