Merge github.com:grpc/grpc into y12kdm3

pull/2818/head
Craig Tiller 10 years ago
commit 65d3ab2add
  1. 4
      src/core/channel/client_channel.c
  2. 4
      src/core/surface/call.c
  3. 1
      src/core/surface/server.c
  4. 84
      src/csharp/Grpc.Auth/AuthInterceptors.cs
  5. 42
      src/csharp/Grpc.Auth/Grpc.Auth.csproj
  6. 115
      src/csharp/Grpc.Auth/OAuth2Interceptors.cs
  7. 18
      src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
  8. 2
      src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
  9. 2
      src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
  10. 2
      src/csharp/Grpc.Core/ChannelOptions.cs
  11. 18
      src/csharp/Grpc.Core/ClientBase.cs
  12. 29
      src/csharp/Grpc.Core/Method.cs
  13. 12
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  14. 26
      src/csharp/ext/grpc_csharp_ext.c
  15. 16
      src/node/ext/call.cc
  16. 13
      src/node/ext/call.h
  17. 11
      src/node/ext/node_grpc.cc
  18. 2
      src/node/ext/server.cc
  19. 5
      src/node/index.js
  20. 18
      src/node/src/client.js
  21. 26
      src/node/src/server.js
  22. 1
      src/php/ext/grpc/call.c
  23. 1
      src/python/grpcio/grpc/_adapter/_c/utility.c
  24. 30
      src/python/grpcio/grpc/early_adopter/implementations.py
  25. 1
      src/ruby/ext/grpc/rb_call.c
  26. 3
      src/ruby/lib/grpc/logconfig.rb
  27. 8
      test/core/end2end/fixtures/proxy.c
  28. 7
      test/core/end2end/tests/default_host.c
  29. 6
      test/core/end2end/tests/request_response_with_payload_and_call_creds.c
  30. 25
      test/cpp/interop/interop_client.cc
  31. 17
      tools/run_tests/run_tests.py

@ -505,13 +505,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
grpc_connectivity_state_set(&chand->state_tracker, state,
"new_lb+resolver");
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
grpc_connectivity_state_set(&chand->state_tracker, state,
"new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(chand, lb_policy, state);
}

@ -1573,7 +1573,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
const grpc_op *op;
grpc_ioreq *req;
void (*finish_func)(grpc_call *, int, void *) = finish_batch;
GPR_ASSERT(!reserved);
if (reserved != NULL) return GRPC_CALL_ERROR;
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
@ -1588,6 +1589,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* rewrite batch ops into ioreq ops */
for (in = 0, out = 0; in < nops; in++) {
op = &ops[in];
if (op->reserved != NULL) return GRPC_CALL_ERROR;
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* Flag validation: currently allow no flags */

@ -1135,6 +1135,7 @@ grpc_call_error grpc_server_request_call(
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
grpc_cq_begin_op(cq_for_notification);
details->reserved = NULL;
rc->type = BATCH_CALL;
rc->server = server;
rc->tag = tag;

@ -0,0 +1,84 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Threading;
using Google.Apis.Auth.OAuth2;
using Grpc.Core;
using Grpc.Core.Utils;
namespace Grpc.Auth
{
/// <summary>
/// Factory methods to create authorization interceptors.
/// </summary>
public static class AuthInterceptors
{
private const string AuthorizationHeader = "Authorization";
private const string Schema = "Bearer";
/// <summary>
/// Creates interceptor that will obtain access token from any credential type that implements
/// <c>ITokenAccess</c>. (e.g. <c>GoogleCredential</c>).
/// </summary>
public static HeaderInterceptor FromCredential(ITokenAccess credential)
{
return new HeaderInterceptor((method, authUri, metadata) =>
{
// TODO(jtattermusch): Rethink synchronous wait to obtain the result.
var accessToken = credential.GetAccessTokenForRequestAsync(authUri, CancellationToken.None)
.ConfigureAwait(false).GetAwaiter().GetResult();
metadata.Add(CreateBearerTokenHeader(accessToken));
});
}
/// <summary>
/// Creates OAuth2 interceptor that will use given access token as authorization.
/// </summary>
/// <param name="accessToken">OAuth2 access token.</param>
public static HeaderInterceptor FromAccessToken(string accessToken)
{
Preconditions.CheckNotNull(accessToken);
return new HeaderInterceptor((method, authUri, metadata) =>
{
metadata.Add(CreateBearerTokenHeader(accessToken));
});
}
private static Metadata.Entry CreateBearerTokenHeader(string accessToken)
{
return new Metadata.Entry(AuthorizationHeader, Schema + " " + accessToken);
}
}
}

@ -3,8 +3,6 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>10.0.0</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{AE21D0EE-9A2C-4C15-AB7F-5224EED5B0EA}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>Grpc.Auth</RootNamespace>
@ -41,57 +39,47 @@
<AssemblyOriginatorKeyFile>C:\keys\Grpc.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
<Reference Include="BouncyCastle.Crypto, Version=1.7.4137.9688, Culture=neutral, PublicKeyToken=a4292a325f69b123, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="System" />
<Reference Include="System.Net" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Net.Http.WebRequest" />
<Reference Include="BouncyCastle.Crypto">
<HintPath>..\packages\BouncyCastle.1.7.0\lib\Net40-Client\BouncyCastle.Crypto.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Auth, Version=1.9.3.19379, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="Google.Apis.Auth">
<HintPath>..\packages\Google.Apis.Auth.1.9.3\lib\net40\Google.Apis.Auth.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Auth.PlatformServices, Version=1.9.3.19383, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="Google.Apis.Auth.PlatformServices">
<HintPath>..\packages\Google.Apis.Auth.1.9.3\lib\net40\Google.Apis.Auth.PlatformServices.dll</HintPath>
</Reference>
<Reference Include="Google.Apis.Core, Version=1.9.3.19379, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="Google.Apis.Core">
<HintPath>..\packages\Google.Apis.Core.1.9.3\lib\portable-net40+sl50+win+wpa81+wp80\Google.Apis.Core.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Threading.Tasks, Version=1.0.12.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="Microsoft.Threading.Tasks">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Threading.Tasks.Extensions, Version=1.0.12.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="Microsoft.Threading.Tasks.Extensions">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Threading.Tasks.Extensions.Desktop, Version=1.0.168.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="Microsoft.Threading.Tasks.Extensions.Desktop">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.Desktop.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=7.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="Newtonsoft.Json">
<HintPath>..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Net" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Net.Http.Extensions, Version=2.2.29.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="System.Net.Http.Extensions">
<HintPath>..\packages\Microsoft.Net.Http.2.2.29\lib\net45\System.Net.Http.Extensions.dll</HintPath>
</Reference>
<Reference Include="System.Net.Http.Primitives, Version=4.2.29.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<Reference Include="System.Net.Http.Primitives">
<HintPath>..\packages\Microsoft.Net.Http.2.2.29\lib\net45\System.Net.Http.Primitives.dll</HintPath>
</Reference>
<Reference Include="System.Net.Http.WebRequest" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\Grpc.Core\Version.cs">
<Link>Version.cs</Link>
</Compile>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="OAuth2Interceptors.cs" />
<Compile Include="AuthInterceptors.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -1,115 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security.Cryptography.X509Certificates;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Google.Apis.Auth.OAuth2;
using Google.Apis.Util;
using Grpc.Core;
using Grpc.Core.Utils;
namespace Grpc.Auth
{
public static class OAuth2Interceptors
{
/// <summary>
/// Creates OAuth2 interceptor that will obtain access token from GoogleCredentials.
/// </summary>
public static MetadataInterceptorDelegate FromCredential(GoogleCredential googleCredential)
{
var interceptor = new OAuth2Interceptor(googleCredential, SystemClock.Default);
return new MetadataInterceptorDelegate(interceptor.InterceptHeaders);
}
/// <summary>
/// Creates OAuth2 interceptor that will use given OAuth2 token.
/// </summary>
/// <param name="oauth2Token"></param>
/// <returns></returns>
public static MetadataInterceptorDelegate FromAccessToken(string oauth2Token)
{
Preconditions.CheckNotNull(oauth2Token);
return new MetadataInterceptorDelegate((authUri, metadata) =>
{
metadata.Add(OAuth2Interceptor.CreateBearerTokenHeader(oauth2Token));
});
}
/// <summary>
/// Injects OAuth2 authorization header into initial metadata (= request headers).
/// </summary>
private class OAuth2Interceptor
{
private const string AuthorizationHeader = "Authorization";
private const string Schema = "Bearer";
private ITokenAccess credential;
private IClock clock;
public OAuth2Interceptor(ITokenAccess credential, IClock clock)
{
this.credential = credential;
this.clock = clock;
}
/// <summary>
/// Gets access token and requests refreshing it if is going to expire soon.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public string GetAccessToken(string authUri, CancellationToken cancellationToken)
{
// TODO(jtattermusch): Rethink synchronous wait to obtain the result.
return credential.GetAccessTokenForRequestAsync(authUri, cancellationToken: cancellationToken).GetAwaiter().GetResult();
}
public void InterceptHeaders(string authUri, Metadata metadata)
{
var accessToken = GetAccessToken(authUri, CancellationToken.None);
metadata.Add(CreateBearerTokenHeader(accessToken));
}
public static Metadata.Entry CreateBearerTokenHeader(string accessToken)
{
return new Metadata.Entry(AuthorizationHeader, Schema + " " + accessToken);
}
}
}
}

@ -88,6 +88,24 @@ namespace Grpc.Core
return responseAsync.GetAwaiter();
}
/// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
}
/// <summary>
/// Gets the call trailing metadata if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
}
/// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything.

@ -32,8 +32,6 @@
#endregion
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Grpc.Core
{

@ -32,8 +32,6 @@
#endregion
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Grpc.Core
{

@ -71,7 +71,7 @@ namespace Grpc.Core
/// Creates a channel option with an integer value.
/// </summary>
/// <param name="name">Name.</param>
/// <param name="stringValue">String value.</param>
/// <param name="intValue">Integer value.</param>
public ChannelOption(string name, int intValue)
{
this.type = OptionType.Integer;

@ -32,15 +32,15 @@
#endregion
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using System.Threading.Tasks;
namespace Grpc.Core
{
public delegate void MetadataInterceptorDelegate(string authUri, Metadata metadata);
/// <summary>
/// Interceptor for call headers.
/// </summary>
public delegate void HeaderInterceptor(IMethod method, string authUri, Metadata metadata);
/// <summary>
/// Base class for client-side stubs.
@ -60,10 +60,10 @@ namespace Grpc.Core
}
/// <summary>
/// Can be used to register a custom header (initial metadata) interceptor.
/// The delegate each time before a new call on this client is started.
/// Can be used to register a custom header (request metadata) interceptor.
/// The interceptor is invoked each time a new call on this client is started.
/// </summary>
public MetadataInterceptorDelegate HeaderInterceptor
public HeaderInterceptor HeaderInterceptor
{
get;
set;
@ -107,7 +107,7 @@ namespace Grpc.Core
options = options.WithHeaders(new Metadata());
}
var authUri = authUriBase != null ? authUriBase + method.ServiceName : null;
interceptor(authUri, options.Headers);
interceptor(method, authUri, options.Headers);
}
return new CallInvocationDetails<TRequest, TResponse>(channel, method, Host, options);
}

@ -54,10 +54,37 @@ namespace Grpc.Core
DuplexStreaming
}
/// <summary>
/// A non-generic representation of a remote method.
/// </summary>
public interface IMethod
{
/// <summary>
/// Gets the type of the method.
/// </summary>
MethodType Type { get; }
/// <summary>
/// Gets the name of the service to which this method belongs.
/// </summary>
string ServiceName { get; }
/// <summary>
/// Gets the unqualified name of the method.
/// </summary>
string Name { get; }
/// <summary>
/// Gets the fully qualified name of the method. On the server side, methods are dispatched
/// based on this name.
/// </summary>
string FullName { get; }
}
/// <summary>
/// A description of a remote method.
/// </summary>
public class Method<TRequest, TResponse>
public class Method<TRequest, TResponse> : IMethod
{
readonly MethodType type;
readonly string serviceName;

@ -308,7 +308,7 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running service_account_creds");
var credential = await GoogleCredential.GetApplicationDefaultAsync();
credential = credential.CreateScoped(new[] { AuthScope });
client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
client.HeaderInterceptor = AuthInterceptors.FromCredential(credential);
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
@ -332,7 +332,7 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running compute_engine_creds");
var credential = await GoogleCredential.GetApplicationDefaultAsync();
Assert.IsFalse(credential.IsCreateScopedRequired);
client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
client.HeaderInterceptor = AuthInterceptors.FromCredential(credential);
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
@ -357,7 +357,7 @@ namespace Grpc.IntegrationTesting
var credential = await GoogleCredential.GetApplicationDefaultAsync();
// check this a credential with scope support, but don't add the scope.
Assert.IsTrue(credential.IsCreateScopedRequired);
client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
client.HeaderInterceptor = AuthInterceptors.FromCredential(credential);
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
@ -381,7 +381,7 @@ namespace Grpc.IntegrationTesting
ITokenAccess credential = (await GoogleCredential.GetApplicationDefaultAsync()).CreateScoped(new[] { AuthScope });
string oauth2Token = await credential.GetAccessTokenForRequestAsync();
client.HeaderInterceptor = OAuth2Interceptors.FromAccessToken(oauth2Token);
client.HeaderInterceptor = AuthInterceptors.FromAccessToken(oauth2Token);
var request = SimpleRequest.CreateBuilder()
.SetFillUsername(true)
@ -401,7 +401,7 @@ namespace Grpc.IntegrationTesting
ITokenAccess credential = (await GoogleCredential.GetApplicationDefaultAsync()).CreateScoped(new[] { AuthScope });
string oauth2Token = await credential.GetAccessTokenForRequestAsync();
var headerInterceptor = OAuth2Interceptors.FromAccessToken(oauth2Token);
var headerInterceptor = AuthInterceptors.FromAccessToken(oauth2Token);
var request = SimpleRequest.CreateBuilder()
.SetFillUsername(true)
@ -409,7 +409,7 @@ namespace Grpc.IntegrationTesting
.Build();
var headers = new Metadata();
headerInterceptor("", headers);
headerInterceptor(null, "", headers);
var response = client.UnaryCall(request, headers: headers);
Assert.AreEqual(AuthScopeResponse, response.OauthScope);

@ -510,22 +510,27 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[1].flags = write_flags;
ops[1].reserved = NULL;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
ops[2].reserved = NULL;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[3].flags = 0;
ops[3].reserved = NULL;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message = &(ctx->recv_message);
ops[4].flags = 0;
ops[4].reserved = NULL;
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[5].data.recv_status_on_client.trailing_metadata =
@ -538,6 +543,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[5].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
ops[5].flags = 0;
ops[5].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@ -556,14 +562,17 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[1].flags = 0;
ops[1].reserved = NULL;
ops[2].op = GRPC_OP_RECV_MESSAGE;
ops[2].data.recv_message = &(ctx->recv_message);
ops[2].flags = 0;
ops[2].reserved = NULL;
ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[3].data.recv_status_on_client.trailing_metadata =
@ -576,6 +585,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
ops[3].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
ops[3].flags = 0;
ops[3].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@ -593,18 +603,22 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[1].flags = write_flags;
ops[1].reserved = NULL;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
ops[2].reserved = NULL;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[3].flags = 0;
ops[3].reserved = NULL;
ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[4].data.recv_status_on_client.trailing_metadata =
@ -617,6 +631,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[4].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
ops[4].flags = 0;
ops[4].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@ -635,10 +650,12 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[1].flags = 0;
ops[1].reserved = NULL;
ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[2].data.recv_status_on_client.trailing_metadata =
@ -651,6 +668,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
ops[2].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
ops[2].flags = 0;
ops[2].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@ -668,10 +686,12 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
ops[0].flags = write_flags;
ops[0].reserved = NULL;
ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].data.send_initial_metadata.count = 0;
ops[1].data.send_initial_metadata.metadata = NULL;
ops[1].flags = 0;
ops[1].reserved = NULL;
return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
@ -683,6 +703,7 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[0].flags = 0;
ops[0].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@ -706,10 +727,12 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].data.send_initial_metadata.count = 0;
ops[1].data.send_initial_metadata.metadata = NULL;
ops[1].flags = 0;
ops[1].reserved = NULL;
return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
@ -721,6 +744,7 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
ops[0].flags = 0;
ops[0].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
}
@ -733,6 +757,7 @@ grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
ops[0].data.recv_close_on_server.cancelled =
(&ctx->recv_close_on_server_cancelled);
ops[0].flags = 0;
ops[0].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@ -751,6 +776,7 @@ grpcsharp_call_send_initial_metadata(grpc_call *call,
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);

@ -207,6 +207,13 @@ class SendMessageOp : public Op {
if (!::node::Buffer::HasInstance(value)) {
return false;
}
Handle<Object> object_value = value->ToObject();
if (object_value->HasOwnProperty(NanNew("grpcWriteFlags"))) {
Handle<Value> flag_value = object_value->Get(NanNew("grpcWriteFlags"));
if (flag_value->IsUint32()) {
out->flags = flag_value->Uint32Value() & GRPC_WRITE_USED_MASK;
}
}
out->data.send_message = BufferToByteBuffer(value);
Persistent<Value> *handle = new Persistent<Value>();
NanAssignPersistent(*handle, value);
@ -457,10 +464,6 @@ void Call::Init(Handle<Object> exports) {
NanNew<FunctionTemplate>(GetPeer)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
Handle<Function> ctr = tpl->GetFunction();
ctr->Set(NanNew("WRITE_BUFFER_HINT"),
NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
ctr->Set(NanNew("WRITE_NO_COMPRESS"),
NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
exports->Set(NanNew("Call"), ctr);
constructor = new NanCallback(ctr);
}
@ -581,6 +584,7 @@ NAN_METHOD(Call::StartBatch) {
uint32_t type = keys->Get(i)->Uint32Value();
ops[i].op = static_cast<grpc_op_type>(type);
ops[i].flags = 0;
ops[i].reserved = NULL;
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
op.reset(new SendMetadataOp());
@ -619,7 +623,7 @@ NAN_METHOD(Call::StartBatch) {
call->wrapped_call, &ops[0], nops, new struct tag(
callback, op_vector.release(), resources), NULL);
if (error != GRPC_CALL_OK) {
return NanThrowError("startBatch failed", error);
return NanThrowError(nanErrorWithCode("startBatch failed", error));
}
CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
@ -633,7 +637,7 @@ NAN_METHOD(Call::Cancel) {
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL);
if (error != GRPC_CALL_OK) {
return NanThrowError("cancel failed", error);
return NanThrowError(nanErrorWithCode("cancel failed", error));
}
NanReturnUndefined();
}

@ -51,6 +51,19 @@ namespace node {
using std::unique_ptr;
using std::shared_ptr;
/**
* Helper function for throwing errors with a grpc_call_error value.
* Modified from the answer by Gus Goose to
* http://stackoverflow.com/questions/31794200.
*/
inline v8::Local<v8::Value> nanErrorWithCode(const char *msg,
grpc_call_error code) {
NanEscapableScope();
v8::Local<v8::Object> err = NanError(msg).As<v8::Object>();
err->Set(NanNew("code"), NanNew<v8::Uint32>(code));
return NanEscapeScope(err);
}
v8::Handle<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array);
class PersistentHolder {

@ -196,6 +196,16 @@ void InitConnectivityStateConstants(Handle<Object> exports) {
channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE);
}
void InitWriteFlags(Handle<Object> exports) {
NanScope();
Handle<Object> write_flags = NanNew<Object>();
exports->Set(NanNew("writeFlags"), write_flags);
Handle<Value> BUFFER_HINT(NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
write_flags->Set(NanNew("BUFFER_HINT"), BUFFER_HINT);
Handle<Value> NO_COMPRESS(NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
write_flags->Set(NanNew("NO_COMPRESS"), NO_COMPRESS);
}
void init(Handle<Object> exports) {
NanScope();
grpc_init();
@ -204,6 +214,7 @@ void init(Handle<Object> exports) {
InitOpTypeConstants(exports);
InitPropagateConstants(exports);
InitConnectivityStateConstants(exports);
InitWriteFlags(exports);
grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports);

@ -235,7 +235,7 @@ NAN_METHOD(Server::RequestCall) {
new struct tag(new NanCallback(args[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr)));
if (error != GRPC_CALL_OK) {
return NanThrowError("requestCall failed", error);
return NanThrowError(nanErrorWithCode("requestCall failed", error));
}
CompletionQueueAsyncWorker::Next();
NanReturnUndefined();

@ -144,6 +144,11 @@ exports.propagate = grpc.propagate;
*/
exports.callError = grpc.callError;
/**
* Write flag name to code number mapping
*/
exports.writeFlags = grpc.writeFlags;
/**
* Credentials factories
*/

@ -79,13 +79,19 @@ function ClientWritableStream(call, serialize) {
* implementation of a method needed for implementing stream.Writable.
* @access private
* @param {Buffer} chunk The chunk to write
* @param {string} encoding Ignored
* @param {string} encoding Used to pass write flags
* @param {function(Error=)} callback Called when the write is complete
*/
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
var message = this.serialize(chunk);
if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we
* can get to checking that it is valid flags */
message.grpcWriteFlags = encoding;
}
batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, event) {
if (err) {
// Something has gone wrong. Stop writing by failing to call callback
@ -273,8 +279,10 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
return;
}
var client_batch = {};
var message = serialize(argument);
message.grpcWriteFlags = options.flags;
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
@ -407,9 +415,11 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
return;
}
var start_batch = {};
var message = serialize(argument);
message.grpcWriteFlags = options.flags;
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {

@ -115,8 +115,10 @@ function waitForCancel(call, emitter) {
* @param {function(*):Buffer=} serialize Serialization function for the
* response
* @param {Object=} metadata Optional trailing metadata to send with status
* @param {number=} flags Flags for modifying how the message is sent.
* Defaults to 0.
*/
function sendUnaryResponse(call, value, serialize, metadata) {
function sendUnaryResponse(call, value, serialize, metadata, flags) {
var end_batch = {};
var status = {
code: grpc.status.OK,
@ -130,7 +132,9 @@ function sendUnaryResponse(call, value, serialize, metadata) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
call.metadataSent = true;
}
end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
var message = serialize(value);
message.grpcWriteFlags = flags;
end_batch[grpc.opType.SEND_MESSAGE] = message;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
}
@ -254,7 +258,7 @@ function ServerWritableStream(call, serialize) {
* for implementing stream.Writable.
* @access private
* @param {Buffer} chunk The chunk of data to write
* @param {string} encoding Ignored
* @param {string} encoding Used to pass write flags
* @param {function(Error=)} callback Callback to indicate that the write is
* complete
*/
@ -265,7 +269,13 @@ function _write(chunk, encoding, callback) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
this.call.metadataSent = true;
}
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
var message = this.serialize(chunk);
if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we
* can get to checking that it is valid flags */
message.grpcWriteFlags = encoding;
}
batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, value) {
if (err) {
this.emit('error', err);
@ -450,14 +460,14 @@ function handleUnary(call, handler, metadata) {
if (emitter.cancelled) {
return;
}
handler.func(emitter, function sendUnaryData(err, value, trailer) {
handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
if (err) {
if (trailer) {
err.metadata = trailer;
}
handleError(call, err);
} else {
sendUnaryResponse(call, value, handler.serialize, trailer);
sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
});
});
@ -514,7 +524,7 @@ function handleClientStreaming(call, handler, metadata) {
});
waitForCancel(call, stream);
stream.metadata = metadata;
handler.func(stream, function(err, value, trailer) {
handler.func(stream, function(err, value, trailer, flags) {
stream.terminate();
if (err) {
if (trailer) {
@ -522,7 +532,7 @@ function handleClientStreaming(call, handler, metadata) {
}
handleError(call, err);
} else {
sendUnaryResponse(call, value, handler.serialize, trailer);
sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
});
}

@ -398,6 +398,7 @@ PHP_METHOD(Call, startBatch) {
}
ops[op_num].op = (grpc_op_type)index;
ops[op_num].flags = 0;
ops[op_num].reserved = NULL;
op_num++;
}
error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped,

@ -184,6 +184,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
return 0;
}
c_op.op = type;
c_op.reserved = NULL;
c_op.flags = PyInt_AsLong(PyTuple_GET_ITEM(op, WRITE_FLAGS_INDEX));
if (PyErr_Occurred()) {
return 0;

@ -41,13 +41,15 @@ from grpc.framework.base import util as _base_utilities
from grpc.framework.face import implementations as _face_implementations
from grpc.framework.foundation import logging_pool
_THREAD_POOL_SIZE = 8
_DEFAULT_THREAD_POOL_SIZE = 8
_ONE_DAY_IN_SECONDS = 24 * 60 * 60
class _Server(interfaces.Server):
def __init__(self, breakdown, port, private_key, certificate_chain):
def __init__(
self, breakdown, port, private_key, certificate_chain,
thread_pool_size=_DEFAULT_THREAD_POOL_SIZE):
self._lock = threading.Lock()
self._breakdown = breakdown
self._port = port
@ -56,6 +58,7 @@ class _Server(interfaces.Server):
else:
self._key_chain_pairs = ((private_key, certificate_chain),)
self._pool_size = thread_pool_size
self._pool = None
self._back = None
self._fore_link = None
@ -63,7 +66,7 @@ class _Server(interfaces.Server):
def _start(self):
with self._lock:
if self._pool is None:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._pool = logging_pool.pool(self._pool_size)
servicer = _face_implementations.servicer(
self._pool, self._breakdown.implementations, None)
self._back = _base_implementations.back_link(
@ -114,7 +117,8 @@ class _Stub(interfaces.Stub):
def __init__(
self, breakdown, host, port, secure, root_certificates, private_key,
certificate_chain, metadata_transformer=None, server_host_override=None):
certificate_chain, metadata_transformer=None, server_host_override=None,
thread_pool_size=_DEFAULT_THREAD_POOL_SIZE):
self._lock = threading.Lock()
self._breakdown = breakdown
self._host = host
@ -126,6 +130,7 @@ class _Stub(interfaces.Stub):
self._metadata_transformer = metadata_transformer
self._server_host_override = server_host_override
self._pool_size = thread_pool_size
self._pool = None
self._front = None
self._rear_link = None
@ -134,7 +139,7 @@ class _Stub(interfaces.Stub):
def __enter__(self):
with self._lock:
if self._pool is None:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._pool = logging_pool.pool(self._pool_size)
self._front = _base_implementations.front_link(
self._pool, self._pool, self._pool)
self._rear_link = _rear.RearLink(
@ -193,7 +198,7 @@ class _Stub(interfaces.Stub):
def stub(
service_name, methods, host, port, metadata_transformer=None, secure=False,
root_certificates=None, private_key=None, certificate_chain=None,
server_host_override=None):
server_host_override=None, thread_pool_size=_DEFAULT_THREAD_POOL_SIZE):
"""Constructs an interfaces.Stub.
Args:
@ -216,6 +221,8 @@ def stub(
certificate chain should be used.
server_host_override: (For testing only) the target name used for SSL
host name checking.
thread_pool_size: The maximum number of threads to allow in the backing
thread pool.
Returns:
An interfaces.Stub affording RPC invocation.
@ -224,11 +231,13 @@ def stub(
return _Stub(
breakdown, host, port, secure, root_certificates, private_key,
certificate_chain, server_host_override=server_host_override,
metadata_transformer=metadata_transformer)
metadata_transformer=metadata_transformer,
thread_pool_size=thread_pool_size)
def server(
service_name, methods, port, private_key=None, certificate_chain=None):
service_name, methods, port, private_key=None, certificate_chain=None,
thread_pool_size=_DEFAULT_THREAD_POOL_SIZE):
"""Constructs an interfaces.Server.
Args:
@ -242,9 +251,12 @@ def server(
private_key: A pem-encoded private key, or None for an insecure server.
certificate_chain: A pem-encoded certificate chain, or None for an insecure
server.
thread_pool_size: The maximum number of threads to allow in the backing
thread pool.
Returns:
An interfaces.Server that will serve secure traffic.
"""
breakdown = _face_utilities.break_down_service(service_name, methods)
return _Server(breakdown, port, private_key, certificate_chain)
return _Server(breakdown, port, private_key, certificate_chain,
thread_pool_size=thread_pool_size)

@ -526,6 +526,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
};
st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
st->ops[st->op_num].flags = 0;
st->ops[st->op_num].reserved = NULL;
st->op_num++;
}
}

@ -54,5 +54,6 @@ module GRPC
LOGGER = NoopLogger.new
end
include DefaultLogger unless method_defined?(:logger)
# Inject the noop #logger if no module-level logger method has been injected.
extend DefaultLogger unless methods.include?(:logger)
end

@ -174,6 +174,7 @@ static void on_p2s_recv_initial_metadata(void *arg, int success) {
if (!pc->proxy->shutdown) {
op.op = GRPC_OP_SEND_INITIAL_METADATA;
op.flags = 0;
op.reserved = NULL;
op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
refpc(pc, "on_c2p_sent_initial_metadata");
@ -201,6 +202,7 @@ static void on_p2s_sent_message(void *arg, int success) {
if (!pc->proxy->shutdown && success) {
op.op = GRPC_OP_RECV_MESSAGE;
op.flags = 0;
op.reserved = NULL;
op.data.recv_message = &pc->c2p_msg;
refpc(pc, "on_c2p_recv_msg");
err = grpc_call_start_batch(pc->c2p, &op, 1,
@ -225,6 +227,7 @@ static void on_c2p_recv_msg(void *arg, int success) {
if (pc->c2p_msg != NULL) {
op.op = GRPC_OP_SEND_MESSAGE;
op.flags = 0;
op.reserved = NULL;
op.data.send_message = pc->c2p_msg;
refpc(pc, "on_p2s_sent_message");
err = grpc_call_start_batch(pc->p2s, &op, 1,
@ -233,6 +236,7 @@ static void on_c2p_recv_msg(void *arg, int success) {
} else {
op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op.flags = 0;
op.reserved = NULL;
refpc(pc, "on_p2s_sent_close");
err = grpc_call_start_batch(pc->p2s, &op, 1,
new_closure(on_p2s_sent_close, pc), NULL);
@ -254,6 +258,7 @@ static void on_c2p_sent_message(void *arg, int success) {
if (!pc->proxy->shutdown && success) {
op.op = GRPC_OP_RECV_MESSAGE;
op.flags = 0;
op.reserved = NULL;
op.data.recv_message = &pc->p2s_msg;
refpc(pc, "on_p2s_recv_msg");
err = grpc_call_start_batch(pc->p2s, &op, 1,
@ -272,6 +277,7 @@ static void on_p2s_recv_msg(void *arg, int success) {
if (!pc->proxy->shutdown && success && pc->p2s_msg) {
op.op = GRPC_OP_SEND_MESSAGE;
op.flags = 0;
op.reserved = NULL;
op.data.send_message = pc->p2s_msg;
refpc(pc, "on_c2p_sent_message");
err = grpc_call_start_batch(pc->c2p, &op, 1,
@ -295,6 +301,7 @@ static void on_p2s_status(void *arg, int success) {
GPR_ASSERT(success);
op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op.flags = 0;
op.reserved = NULL;
op.data.send_status_from_server.trailing_metadata_count =
pc->p2s_trailing_metadata.count;
op.data.send_status_from_server.trailing_metadata =
@ -334,6 +341,7 @@ static void on_new_call(void *arg, int success) {
gpr_ref_init(&pc->refs, 1);
op.flags = 0;
op.reserved = NULL;
op.op = GRPC_OP_RECV_INITIAL_METADATA;
op.data.recv_initial_metadata = &pc->p2s_initial_metadata;

@ -135,13 +135,16 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
@ -149,6 +152,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op->data.recv_status_on_client.status_details = &details;
op->data.recv_status_on_client.status_details_capacity = &details_capacity;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, op - ops, tag(1), NULL);
GPR_ASSERT(error == GRPC_CALL_OK);
@ -173,16 +177,19 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
op->data.send_status_from_server.status_details = "xyz";
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(s, ops, op - ops, tag(102), NULL);
GPR_ASSERT(error == GRPC_CALL_OK);

@ -438,25 +438,31 @@ static void test_request_with_server_rejecting_client_creds(
op->data.recv_status_on_client.status_details = &details;
op->data.recv_status_on_client.status_details_capacity = &details_capacity;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = request_payload;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &response_payload_recv;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, op - ops, tag(1), NULL);
GPR_ASSERT(error == GRPC_CALL_OK);

@ -41,6 +41,7 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/credentials.h>
@ -271,18 +272,18 @@ void InteropClient::DoLargeUnary() {
void InteropClient::DoLargeCompressedUnary() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (const auto payload_type : payload_types) {
for (const auto compression_type : compression_types) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
CompressionType_Name(compression_type).c_str(),
PayloadType_Name(payload_type).c_str());
CompressionType_Name(compression_types[j]).c_str(),
PayloadType_Name(payload_types[i]).c_str());
gpr_log(GPR_INFO, "Sending a large compressed unary rpc %s.", log_suffix);
SimpleRequest request;
SimpleResponse response;
request.set_response_type(payload_type);
request.set_response_compression(compression_type);
request.set_response_type(payload_types[i]);
request.set_response_compression(compression_types[j]);
PerformLargeUnary(&request, &response);
gpr_log(GPR_INFO, "Large compressed unary done %s.", log_suffix);
gpr_free(log_suffix);
@ -347,21 +348,21 @@ void InteropClient::DoResponseCompressedStreaming() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (const auto payload_type : payload_types) {
for (const auto compression_type : compression_types) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
ClientContext context;
InteropClientContextInspector inspector(context);
StreamingOutputCallRequest request;
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
CompressionType_Name(compression_type).c_str(),
PayloadType_Name(payload_type).c_str());
CompressionType_Name(compression_types[j]).c_str(),
PayloadType_Name(payload_types[i]).c_str());
gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix);
request.set_response_type(payload_type);
request.set_response_compression(compression_type);
request.set_response_type(payload_types[i]);
request.set_response_compression(compression_types[j]);
for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
ResponseParameters* response_parameter =

@ -143,7 +143,10 @@ class CLanguage(object):
binary = 'vsprojects/test_bin/%s.exe' % (target['name'])
else:
binary = 'bins/%s/%s' % (config.build_config, target['name'])
out.append(config.job_spec([binary], [binary]))
if os.path.isfile(binary):
out.append(config.job_spec([binary], [binary]))
else:
print "\nWARNING: binary not found, skipping", binary
return sorted(out)
def make_targets(self):
@ -482,12 +485,6 @@ build_steps.extend(set(
for cfg in build_configs
for l in languages
for cmdline in l.build_steps()))
one_run = set(
spec
for config in run_configs
for language in languages
for spec in language.test_specs(config, args.travis)
if re.search(args.regex, spec.shortname))
runs_per_test = args.runs_per_test
forever = args.forever
@ -583,6 +580,12 @@ def _build_and_run(
_start_port_server(port_server_port)
try:
infinite_runs = runs_per_test == 0
one_run = set(
spec
for config in run_configs
for language in languages
for spec in language.test_specs(config, args.travis)
if re.search(args.regex, spec.shortname))
# When running on travis, we want out test runs to be as similar as possible
# for reproducibility purposes.
if travis:

Loading…
Cancel
Save