Merge branch 'master' of https://github.com/grpc/grpc into ct-you-complete-me

pull/1731/head
Masood Malekghassemi 10 years ago
commit a3697b451b
  1. 122
      doc/connectivity-semantics-and-api.md
  2. 4
      include/grpc/grpc.h
  3. 5
      include/grpc/support/slice.h
  4. 11
      src/compiler/cpp_generator.cc
  5. 3
      src/core/support/log_win32.c
  6. 8
      src/core/transport/stream_op.h
  7. 105
      src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs
  8. 6
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  9. 75
      src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs
  10. 40
      src/csharp/Grpc.Core/Channel.cs
  11. 115
      src/csharp/Grpc.Core/ChannelArgs.cs
  12. 178
      src/csharp/Grpc.Core/ChannelOptions.cs
  13. 6
      src/csharp/Grpc.Core/Grpc.Core.csproj
  14. 8
      src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs
  15. 4
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  16. 11
      src/csharp/Grpc.Core/Server.cs
  17. 10
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  18. 9
      src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
  19. 18
      src/csharp/ext/grpc_csharp_ext.c
  20. 137
      src/php/README.md
  21. 4
      src/php/bin/run_gen_code_test.sh
  22. 7
      src/php/composer.json
  23. 2
      src/php/tests/generated_code/AbstractGeneratedCodeTest.php
  24. 2
      src/php/tests/interop/interop_client.php
  25. 6
      src/python/README.md
  26. 22
      src/python/src/README.rst
  27. 14
      src/python/src/grpc/_adapter/_c/utility.c
  28. 292
      test/compiler/python_plugin_test.py
  29. 2
      test/cpp/qps/client_async.cc
  30. 55
      test/cpp/qps/server_async.cc
  31. 28
      tools/jenkins/run_jenkins.sh
  32. 2
      tools/run_tests/build_python.sh
  33. 16
      tools/run_tests/jobset.py
  34. 9
      tools/run_tests/run_tests.py

@ -0,0 +1,122 @@
gRPC Connectivity Semantics and API
===================================
This document describes the connectivity semantics for gRPC channels and the
corresponding impact on RPCs. We then discuss an API.
States of Connectivity
----------------------
gRPC Channels provide the abstraction over which clients can communicate with
servers.The client-side channel object can be constructed using little more
than a DNS name. Channels encapsulate a range of functionality including name
resolution, establishing a TCP connection (with retries and backoff) and TLS
handshakes. Channels can also handle errors on established connections and
reconnect, or in the case of HTTP/2 GO_AWAY, re-resolve the name and reconnect.
To hide the details of all this activity from the user of the gRPC API (i.e.,
application code) while exposing meaningful information about the state of a
channel, we use a state machine with four states, defined below:
CONNECTING: The channel is trying to establish a connection and is waiting to
make progress on one of the steps involved in name resolution, TCP connection
establishment or TLS handshake. This is the initial state for all channels upon
creation.
READY: The channel has successfully established a connection all the way
through TLS handshake (or equivalent) and all subsequent attempt to communicate
have succeeded (or are pending without any known failure ).
TRANSIENT_FAILURE: There has been some transient failure (such as a TCP 3-way
handshake timing out or a socket error). Channels in this state will eventually
switch to the CONNECTING state and try to establish a connection again. Since
retries are done with exponential backoff, channels that fail to connect will
start out spending very little time in this state but as the attempts fail
repeatedly, the channel will spend increasingly large amounts of time in this
state. For many non-fatal failures (e.g., TCP connection attempts timing out
because the server is not yet available), the channel may be stuck in this
state for an indefinitely large amount of time.
FATAL_FAILURE: There has been a fatal failure and the channel will never
attempt to establish a connection again. (e.g., a server presenting an invalid
TLS certificate)
Channels that enter this state never leave this state.
The following table lists the legal transitions from one state to another and
corresponding reasons. Empty cells denote disallowed transitions.
<table style='border: 1px solid black'>
<tr>
<th>From/To</th>
<th>CONNECTING</th>
<th>READY</th>
<th>TRANSIENT_FAILURE</th>
<th>FATAL_FAILURE</th>
</tr>
<tr>
<th>CONNECTING</th>
<td>Incremental progress during connection establishment</td>
<td>All steps needed to establish a connection succeeded</td>
<td>Any failure in any of the steps needed to establish connection</td>
<td>Fatal failure encountered while attempting a connection.</td>
</tr>
<tr>
<th>READY</th>
<td></td>
<td>Incremental successful communication on established channel.</td>
<td>Any failure encountered while expecting successful communication on
established channel.</td>
<td></td>
</tr>
<tr>
<th>TRANSIENT_FAILURE</th>
<td>Wait time required to implement (exponential) backoff is over.</td>
<td></td>
<td></td>
<td></td>
</tr>
<tr>
<th>FATAL_FAILURE</th>
<td></td>
<td></td>
<td></td>
<td></td>
</tr>
</table>
Channel State API
-----------------
All gRPC libraries will expose a channel-level API method to poll the current
state of a channel. In C++, this method is called GetCurrentState and returns
an enum for one of the four legal states.
All libraries should also expose an API that enables the application (user of
the gRPC API) to be notified when the channel state changes. Since state
changes can be rapid and race with any such notification, the notification
should just inform the user that some state change has happened, leaving it to
the user to poll the channel for the current state.
The synchronous version of this API is:
```cpp
bool WaitForStateChange(gpr_timespec deadline, ChannelState source_state);
```
which returns true when the state changes to something other than the
source_state and false if the deadline expires. Asynchronous and futures based
APIs should have a corresponding method that allows the application to be
notified when the state of a channel changes.
Note that a notification is delivered every time there is a transition from any
state to any *other* state. On the other hand the rules for legal state
transition, require a transition from CONNECTING to TRANSIENT_FAILURE and back
to CONNECTING for every recoverable failure, even if the corresponding
exponential backoff requires no wait before retry. The combined effect is that
the application may receive state change notifications that appear spurious.
e.g., an application waiting for state changes on a channel that is CONNECTING
may receive a state change notification but find the channel in the same
CONNECTING state on polling for current state because the channel may have
spent infinitesimally small amount of time in the TRANSIENT_FAILURE state.

@ -221,7 +221,7 @@ typedef enum {
GRPC_OP_SEND_INITIAL_METADATA = 0,
/* Send a message: 0 or more of these operations can occur for each call */
GRPC_OP_SEND_MESSAGE,
/* Send a close from the server: one and only one instance MUST be sent from
/* Send a close from the client: one and only one instance MUST be sent from
the client,
unless the call was cancelled - in which case this can be skipped */
GRPC_OP_SEND_CLOSE_FROM_CLIENT,
@ -240,7 +240,7 @@ typedef enum {
the status will indicate some failure.
*/
GRPC_OP_RECV_STATUS_ON_CLIENT,
/* Receive status on the server: one and only one must be made on the server
/* Receive close on the server: one and only one must be made on the server
*/
GRPC_OP_RECV_CLOSE_ON_SERVER
} grpc_op_type;

@ -110,8 +110,9 @@ gpr_slice gpr_slice_ref(gpr_slice s);
/* Decrement the ref count of s. If the ref count of s reaches zero, all
slices sharing the ref count are destroyed, and considered no longer
initialized. If s is ultimately derived from a call to gpr_slice_new(start,
len, dest) where dest!=NULL , then (*dest)(start, len) is called. Requires
s initialized. */
len, dest) where dest!=NULL , then (*dest)(start) is called, else if s is
ultimately derived from a call to gpr_slice_new_with_len(start, len, dest)
where dest!=NULL , then (*dest)(start, len). Requires s initialized. */
void gpr_slice_unref(gpr_slice s);
/* Create a slice pointing at some data. Calls malloc to allocate a refcount

@ -849,6 +849,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
"::grpc::Status $ns$$Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"const $Request$* request, $Response$* response) {\n");
printer->Print(" (void) context;\n");
printer->Print(" (void) request;\n");
printer->Print(" (void) response;\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");
@ -859,6 +862,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ServerContext* context, "
"::grpc::ServerReader< $Request$>* reader, "
"$Response$* response) {\n");
printer->Print(" (void) context;\n");
printer->Print(" (void) reader;\n");
printer->Print(" (void) response;\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");
@ -869,6 +875,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ServerContext* context, "
"const $Request$* request, "
"::grpc::ServerWriter< $Response$>* writer) {\n");
printer->Print(" (void) context;\n");
printer->Print(" (void) request;\n");
printer->Print(" (void) writer;\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");
@ -879,6 +888,8 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ServerContext* context, "
"::grpc::ServerReaderWriter< $Response$, $Request$>* "
"stream) {\n");
printer->Print(" (void) context;\n");
printer->Print(" (void) stream;\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");

@ -42,6 +42,7 @@
#include <grpc/support/log_win32.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/string_util.h>
#include "src/core/support/string.h"
#include "src/core/support/string_win32.h"
@ -106,7 +107,7 @@ char *gpr_format_message(DWORD messageid) {
NULL, messageid,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR)(&tmessage), 0, NULL);
if (status == 0) return gpr_strdup("Unable to retreive error string");
if (status == 0) return gpr_strdup("Unable to retrieve error string");
message = gpr_tchar_to_char(tmessage);
LocalFree(tmessage);
return message;

@ -58,7 +58,7 @@ typedef enum grpc_stream_op_code {
GRPC_OP_SLICE
} grpc_stream_op_code;
/* Arguments for GRPC_OP_BEGIN */
/* Arguments for GRPC_OP_BEGIN_MESSAGE */
typedef struct grpc_begin_message {
/* How many bytes of data will this message contain */
gpr_uint32 length;
@ -126,10 +126,8 @@ typedef struct grpc_stream_op {
} data;
} grpc_stream_op;
/* A stream op buffer is a wrapper around stream operations that is dynamically
extendable.
TODO(ctiller): inline a few elements into the struct, to avoid common case
per-call allocations. */
/** A stream op buffer is a wrapper around stream operations that is
* dynamically extendable. */
typedef struct grpc_stream_op_buffer {
grpc_stream_op *ops;
size_t nops;

@ -0,0 +1,105 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class ChannelOptionsTest
{
[Test]
public void IntOption()
{
var option = new ChannelOption("somename", 1);
Assert.AreEqual(ChannelOption.OptionType.Integer, option.Type);
Assert.AreEqual("somename", option.Name);
Assert.AreEqual(1, option.IntValue);
Assert.Throws(typeof(InvalidOperationException), () => { var s = option.StringValue; });
}
[Test]
public void StringOption()
{
var option = new ChannelOption("somename", "ABCDEF");
Assert.AreEqual(ChannelOption.OptionType.String, option.Type);
Assert.AreEqual("somename", option.Name);
Assert.AreEqual("ABCDEF", option.StringValue);
Assert.Throws(typeof(InvalidOperationException), () => { var s = option.IntValue; });
}
[Test]
public void ConstructorPreconditions()
{
Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, "abc"); });
Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, 1); });
Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption("abc", null); });
}
[Test]
public void CreateChannelArgsNull()
{
var channelArgs = ChannelOptions.CreateChannelArgs(null);
Assert.IsTrue(channelArgs.IsInvalid);
}
[Test]
public void CreateChannelArgsEmpty()
{
var options = new List<ChannelOption>();
var channelArgs = ChannelOptions.CreateChannelArgs(options);
channelArgs.Dispose();
}
[Test]
public void CreateChannelArgs()
{
var options = new List<ChannelOption>
{
new ChannelOption("ABC", "XYZ"),
new ChannelOption("somename", "IJKLM"),
new ChannelOption("intoption", 12345),
new ChannelOption("GHIJK", 12345),
};
var channelArgs = ChannelOptions.CreateChannelArgs(options);
channelArgs.Dispose();
}
}
}

@ -3,8 +3,6 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>8.0.30703</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{86EC5CB4-4EA2-40A2-8057-86542A0353BB}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>Grpc.Core.Tests</RootNamespace>
@ -48,6 +46,8 @@
<Compile Include="Internal\MetadataArraySafeHandleTest.cs" />
<Compile Include="Internal\CompletionQueueSafeHandleTest.cs" />
<Compile Include="Internal\CompletionQueueEventTest.cs" />
<Compile Include="Internal\ChannelArgsSafeHandleTest.cs" />
<Compile Include="ChannelOptionsTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
@ -63,4 +63,4 @@
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<ItemGroup />
</Project>
</Project>

@ -0,0 +1,75 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class ChannelArgsSafeHandleTest
{
[Test]
public void CreateEmptyAndDestroy()
{
var channelArgs = ChannelArgsSafeHandle.Create(0);
channelArgs.Dispose();
}
[Test]
public void CreateNonEmptyAndDestroy()
{
var channelArgs = ChannelArgsSafeHandle.Create(5);
channelArgs.Dispose();
}
[Test]
public void CreateNullAndDestroy()
{
var channelArgs = ChannelArgsSafeHandle.CreateNull();
channelArgs.Dispose();
}
[Test]
public void CreateFillAndDestroy()
{
var channelArgs = ChannelArgsSafeHandle.Create(3);
channelArgs.SetInteger(0, "somekey", 12345);
channelArgs.SetString(1, "somekey", "abcdefghijkl");
channelArgs.SetString(2, "somekey", "XYZ");
channelArgs.Dispose();
}
}
}

@ -29,6 +29,7 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@ -50,10 +51,10 @@ namespace Grpc.Core
/// </summary>
/// <param name="host">The DNS name of IP address of the host.</param>
/// <param name="credentials">Optional credentials to create a secure channel.</param>
/// <param name="channelArgs">Optional channel arguments.</param>
public Channel(string host, Credentials credentials = null, ChannelArgs channelArgs = null)
/// <param name="options">Channel options.</param>
public Channel(string host, Credentials credentials = null, IEnumerable<ChannelOption> options = null)
{
using (ChannelArgsSafeHandle nativeChannelArgs = CreateNativeChannelArgs(channelArgs))
using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options))
{
if (credentials != null)
{
@ -67,7 +68,7 @@ namespace Grpc.Core
this.handle = ChannelSafeHandle.Create(host, nativeChannelArgs);
}
}
this.target = GetOverridenTarget(host, channelArgs);
this.target = GetOverridenTarget(host, options);
}
/// <summary>
@ -76,9 +77,9 @@ namespace Grpc.Core
/// <param name="host">DNS name or IP address</param>
/// <param name="port">the port</param>
/// <param name="credentials">Optional credentials to create a secure channel.</param>
/// <param name="channelArgs">Optional channel arguments.</param>
public Channel(string host, int port, Credentials credentials = null, ChannelArgs channelArgs = null) :
this(string.Format("{0}:{1}", host, port), credentials, channelArgs)
/// <param name="options">Channel options.</param>
public Channel(string host, int port, Credentials credentials = null, IEnumerable<ChannelOption> options = null) :
this(string.Format("{0}:{1}", host, port), credentials, options)
{
}
@ -112,22 +113,25 @@ namespace Grpc.Core
}
}
private static string GetOverridenTarget(string target, ChannelArgs args)
/// <summary>
/// Look for SslTargetNameOverride option and return its value instead of originalTarget
/// if found.
/// </summary>
private static string GetOverridenTarget(string originalTarget, IEnumerable<ChannelOption> options)
{
if (args != null && !string.IsNullOrEmpty(args.GetSslTargetNameOverride()))
if (options == null)
{
return args.GetSslTargetNameOverride();
return originalTarget;
}
return target;
}
private static ChannelArgsSafeHandle CreateNativeChannelArgs(ChannelArgs args)
{
if (args == null)
foreach (var option in options)
{
return ChannelArgsSafeHandle.CreateNull();
if (option.Type == ChannelOption.OptionType.String
&& option.Name == ChannelOptions.SslTargetNameOverride)
{
return option.StringValue;
}
}
return args.ToNativeChannelArgs();
return originalTarget;
}
}
}

@ -1,115 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core
{
/// <summary>
/// gRPC channel options.
/// </summary>
public class ChannelArgs
{
public const string SslTargetNameOverrideKey = "grpc.ssl_target_name_override";
readonly ImmutableDictionary<string, string> stringArgs;
private ChannelArgs(ImmutableDictionary<string, string> stringArgs)
{
this.stringArgs = stringArgs;
}
public string GetSslTargetNameOverride()
{
string result;
if (stringArgs.TryGetValue(SslTargetNameOverrideKey, out result))
{
return result;
}
return null;
}
public static Builder CreateBuilder()
{
return new Builder();
}
public class Builder
{
readonly Dictionary<string, string> stringArgs = new Dictionary<string, string>();
// TODO: AddInteger not supported yet.
public Builder AddString(string key, string value)
{
stringArgs.Add(key, value);
return this;
}
public ChannelArgs Build()
{
return new ChannelArgs(stringArgs.ToImmutableDictionary());
}
}
/// <summary>
/// Creates native object for the channel arguments.
/// </summary>
/// <returns>The native channel arguments.</returns>
internal ChannelArgsSafeHandle ToNativeChannelArgs()
{
ChannelArgsSafeHandle nativeArgs = null;
try
{
nativeArgs = ChannelArgsSafeHandle.Create(stringArgs.Count);
int i = 0;
foreach (var entry in stringArgs)
{
nativeArgs.SetString(i, entry.Key, entry.Value);
i++;
}
return nativeArgs;
}
catch (Exception)
{
if (nativeArgs != null)
{
nativeArgs.Dispose();
}
throw;
}
}
}
}

@ -0,0 +1,178 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
/// Channel option specified when creating a channel.
/// Corresponds to grpc_channel_args from grpc/grpc.h.
/// </summary>
public sealed class ChannelOption
{
public enum OptionType
{
Integer,
String
}
private readonly OptionType type;
private readonly string name;
private readonly int intValue;
private readonly string stringValue;
/// <summary>
/// Creates a channel option with a string value.
/// </summary>
/// <param name="name">Name.</param>
/// <param name="stringValue">String value.</param>
public ChannelOption(string name, string stringValue)
{
this.type = OptionType.String;
this.name = Preconditions.CheckNotNull(name);
this.stringValue = Preconditions.CheckNotNull(stringValue);
}
/// <summary>
/// Creates a channel option with an integer value.
/// </summary>
/// <param name="name">Name.</param>
/// <param name="stringValue">String value.</param>
public ChannelOption(string name, int intValue)
{
this.type = OptionType.Integer;
this.name = Preconditions.CheckNotNull(name);
this.intValue = intValue;
}
public OptionType Type
{
get
{
return type;
}
}
public string Name
{
get
{
return name;
}
}
public int IntValue
{
get
{
Preconditions.CheckState(type == OptionType.Integer);
return intValue;
}
}
public string StringValue
{
get
{
Preconditions.CheckState(type == OptionType.String);
return stringValue;
}
}
}
public static class ChannelOptions
{
// Override SSL target check. Only to be used for testing.
public const string SslTargetNameOverride = "grpc.ssl_target_name_override";
// Enable census for tracing and stats collection
public const string Census = "grpc.census";
// Maximum number of concurrent incoming streams to allow on a http2 connection
public const string MaxConcurrentStreams = "grpc.max_concurrent_streams";
// Maximum message length that the channel can receive
public const string MaxMessageLength = "grpc.max_message_length";
// Initial sequence number for http2 transports
public const string Http2InitialSequenceNumber = "grpc.http2.initial_sequence_number";
/// <summary>
/// Creates native object for a collection of channel options.
/// </summary>
/// <returns>The native channel arguments.</returns>
internal static ChannelArgsSafeHandle CreateChannelArgs(IEnumerable<ChannelOption> options)
{
if (options == null)
{
return ChannelArgsSafeHandle.CreateNull();
}
var optionList = new List<ChannelOption>(options); // It's better to do defensive copy
ChannelArgsSafeHandle nativeArgs = null;
try
{
nativeArgs = ChannelArgsSafeHandle.Create(optionList.Count);
for (int i = 0; i < optionList.Count; i++)
{
var option = optionList[i];
if (option.Type == ChannelOption.OptionType.Integer)
{
nativeArgs.SetInteger(i, option.Name, option.IntValue);
}
else if (option.Type == ChannelOption.OptionType.String)
{
nativeArgs.SetString(i, option.Name, option.StringValue);
}
else
{
throw new InvalidOperationException("Unknown option type");
}
}
return nativeArgs;
}
catch (Exception)
{
if (nativeArgs != null)
{
nativeArgs.Dispose();
}
throw;
}
}
}
}

@ -5,8 +5,6 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>8.0.30703</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>Grpc.Core</RootNamespace>
@ -78,7 +76,6 @@
<Compile Include="Internal\CredentialsSafeHandle.cs" />
<Compile Include="Credentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
<Compile Include="ChannelArgs.cs" />
<Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
@ -103,6 +100,7 @@
<Compile Include="Internal\CompletionQueueEvent.cs" />
<Compile Include="Internal\CompletionRegistry.cs" />
<Compile Include="Internal\BatchContextSafeHandle.cs" />
<Compile Include="ChannelOptions.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
@ -132,4 +130,4 @@
</Target>
<Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" />
<Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" />
</Project>
</Project>

@ -45,6 +45,9 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
static extern void grpcsharp_channel_args_set_string(ChannelArgsSafeHandle args, UIntPtr index, string key, string value);
[DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
static extern void grpcsharp_channel_args_set_integer(ChannelArgsSafeHandle args, UIntPtr index, string key, int value);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_channel_args_destroy(IntPtr args);
@ -67,6 +70,11 @@ namespace Grpc.Core.Internal
grpcsharp_channel_args_set_string(this, new UIntPtr((uint)index), key, value);
}
public void SetInteger(int index, string key, int value)
{
grpcsharp_channel_args_set_integer(this, new UIntPtr((uint)index), key, value);
}
protected override bool ReleaseHandle()
{
grpcsharp_channel_args_destroy(handle);

@ -45,7 +45,7 @@ namespace Grpc.Core.Internal
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
@ -72,7 +72,7 @@ namespace Grpc.Core.Internal
{
}
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args)
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
{
return grpcsharp_server_create(cq, args);
}

@ -61,9 +61,16 @@ namespace Grpc.Core
bool startRequested;
bool shutdownRequested;
public Server()
/// <summary>
/// Create a new server.
/// </summary>
/// <param name="options">Channel options.</param>
public Server(IEnumerable<ChannelOption> options = null)
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
using (var channelArgs = ChannelOptions.CreateChannelArgs(options))
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), channelArgs);
}
}
/// <summary>

@ -110,14 +110,16 @@ namespace Grpc.IntegrationTesting
credentials = TestCredentials.CreateTestClientCredentials(options.useTestCa);
}
ChannelArgs channelArgs = null;
List<ChannelOption> channelOptions = null;
if (!string.IsNullOrEmpty(options.serverHostOverride))
{
channelArgs = ChannelArgs.CreateBuilder()
.AddString(ChannelArgs.SslTargetNameOverrideKey, options.serverHostOverride).Build();
channelOptions = new List<ChannelOption>
{
new ChannelOption(ChannelOptions.SslTargetNameOverride, options.serverHostOverride)
};
}
using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelArgs))
using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions))
{
var stubConfig = StubConfiguration.Default;
if (options.testCase == "service_account_creds" || options.testCase == "compute_engine_creds")

@ -62,10 +62,11 @@ namespace Grpc.IntegrationTesting
int port = server.AddListeningPort(host, Server.PickUnusedPort, TestCredentials.CreateTestServerCredentials());
server.Start();
var channelArgs = ChannelArgs.CreateBuilder()
.AddString(ChannelArgs.SslTargetNameOverrideKey, TestCredentials.DefaultHostOverride).Build();
channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), channelArgs);
var options = new List<ChannelOption>
{
new ChannelOption(ChannelOptions.SslTargetNameOverride, TestCredentials.DefaultHostOverride)
};
channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), options);
client = TestService.NewStub(channel);
}

@ -65,8 +65,6 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) {
return bb;
}
typedef void(GPR_CALLTYPE *callback_funcptr)(gpr_int32 success, void *batch_context);
/*
* Helper to maintain lifetime of batch op inputs and store batch op outputs.
*/
@ -354,6 +352,16 @@ grpcsharp_channel_args_set_string(grpc_channel_args *args, size_t index,
args->args[index].value.string = gpr_strdup(value);
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_channel_args_set_integer(grpc_channel_args *args, size_t index,
const char *key, int value) {
GPR_ASSERT(args);
GPR_ASSERT(index < args->num_args);
args->args[index].type = GRPC_ARG_INTEGER;
args->args[index].key = gpr_strdup(key);
args->args[index].value.integer = value;
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_channel_args_destroy(grpc_channel_args *args) {
size_t i;
@ -722,10 +730,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_redirect_log(grpcsharp_log_func func) {
gpr_set_log_function(grpcsharp_log_handler);
}
typedef void(GPR_CALLTYPE *test_callback_funcptr)(gpr_int32 success);
/* For testing */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_test_callback(callback_funcptr callback) {
callback(1, NULL);
grpcsharp_test_callback(test_callback_funcptr callback) {
callback(1);
}
/* For testing */

@ -7,51 +7,122 @@ This directory contains source code for PHP implementation of gRPC layered on sh
Pre-Alpha : This gRPC PHP implementation is work-in-progress and is not expected to work yet.
## LAYOUT
Directory structure is as generated by the PHP utility
[ext_skel](http://php.net/manual/en/internals2.buildsys.skeleton.php)
## ENVIRONMENT
Install `php5` and `php5-dev`.
To run the tests, additionally install `php5-readline` and `phpunit`.
To run the tests, additionally install `phpunit`.
Alternatively, build and install PHP 5.5 or later from source with standard
configuration options.
To also download and install protoc and the PHP code generator.
## Build from Homebrew
On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. Run the following command to
install gRPC.
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s php
```
This will download and run the [gRPC install script][] and compile the gRPC PHP extension.
## Build from Source
Clone this repository
```
$ git clone https://github.com/grpc/grpc.git
```
Build and install the Protocol Buffers compiler (protoc)
```
$ cd grpc
$ git pull --recurse-submodules && git submodule update --init --recursive
$ cd third_party/protobuf
$ ./autogen.sh
$ ./configure
$ make
$ make check
$ sudo make install
```
Build and install the gRPC C core
```sh
$ cd grpc
$ make
$ sudo make install
```
Build the gRPC PHP extension
```bash
apt-get install -y procps
curl -sSL https://get.rvm.io | sudo bash -s stable --ruby
git clone git@github.com:google/protobuf.git
cd protobuf
./configure
make
make install
git clone git@github.com:murgatroid99/Protobuf-PHP.git
cd Protobuf-PHP
rake pear:package version=1.0
pear install Protobuf-1.0.tgz
```sh
$ cd grpc/src/php/ext/grpc
$ phpize
$ ./configure
$ make
$ sudo make install
```
## BUILDING
In your php.ini file, add the line `extension=grpc.so` to load the extension
at PHP startup.
1. In ./ext/grpc, run the command `phpize` (distributed with PHP)
2. Run `./ext/grpc/configure`
3. In ./ext/grpc, run `make` and `sudo make install`
4. In your php.ini file, add the line `extension=grpc.so` to load the
extension at PHP startup.
Install Composer
## PHPUnit
```sh
$ cd grpc/src/php
$ curl -sS https://getcomposer.org/installer | php
$ php composer.phar install
```
## Unit Tests
Run unit tests
```sh
$ cd grpc/src/php
$ ./bin/run_tests.sh
```
## Generated Code Tests
Install `protoc-gen-php`
```sh
$ cd grpc/src/php/vendor/datto/protobuf-php
$ gem install rake ronn
$ rake pear:package version=1.0
$ sudo pear install Protobuf-1.0.tgz
```
Generate client stub code
```sh
$ cd grpc/src/php
$ ./bin/generate_proto_php.sh
```
Run a local server serving the math services
- Please see [Node][] on how to run an example server
```sh
$ cd grpc/src/node
$ npm install
$ nodejs examples/math_server.js
```
Run the generated code tests
```sh
$ cd grpc/src/php
$ ./bin/run_gen_code_test.sh
```
This repo now has PHPUnit tests, which can by run by executing
`./bin/run_tests.sh` after building.
[homebrew]:http://brew.sh
[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[Node]:https://github.com/grpc/grpc/tree/master/src/node/examples
There is also a generated code test (`./bin/run_gen_code_test.sh`), which tests
the stub `./tests/generated_code/math.php` against a running localhost server
serving the math service. That stub is generated from
`./tests/generated_code/math.proto`.

@ -30,8 +30,8 @@
cd $(dirname $0)
GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \
-d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \
-d extension=grpc.so `which phpunit` -v --debug --strict \
../tests/generated_code/GeneratedCodeTest.php
GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \
-d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \
-d extension=grpc.so `which phpunit` -v --debug --strict \
../tests/generated_code/GeneratedCodeWithCallbackTest.php

@ -4,8 +4,15 @@
"version": "0.5.0",
"homepage": "http://grpc.io",
"license": "BSD-3-Clause",
"repositories": [
{
"type": "vcs",
"url": "https://github.com/stanley-cheung/Protobuf-PHP"
}
],
"require": {
"php": ">=5.5.0",
"datto/protobuf-php": "dev-master",
"google/auth": "dev-master"
},
"autoload": {

@ -32,8 +32,6 @@
*
*/
require_once realpath(dirname(__FILE__) . '/../../vendor/autoload.php');
require 'DrSlump/Protobuf.php';
\DrSlump\Protobuf::autoload();
require 'math.php';
abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase {
/* These tests require that a server exporting the math service must be

@ -32,8 +32,6 @@
*
*/
require_once realpath(dirname(__FILE__) . '/../../vendor/autoload.php');
require 'DrSlump/Protobuf.php';
\DrSlump\Protobuf::autoload();
require 'empty.php';
require 'message_set.php';
require 'messages.php';

@ -20,6 +20,10 @@ $ curl -fsSL https://goo.gl/getgrpc | bash -s python
```
This will download and run the [gRPC install script][], then install the latest version of the gRPC Python package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python.
EXAMPLES
--------
Please read our online documentation for a [Quick Start][] and a [detailed example][]
BUILDING FROM SOURCE
---------------------
- Clone this repository
@ -58,3 +62,5 @@ $ ../../tools/distrib/python/submit.py
[homebrew]:http://brew.sh
[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html
[detailed example]:http://www.grpc.io/docs/installation/python.html

@ -6,22 +6,18 @@ Package for GRPC Python.
Dependencies
------------
Ensure that you have installed GRPC core.
On debian linux systems, install from our released deb package:
Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_.
Run the following command to install gRPC Python.
::
$ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc_0.5.0_amd64.deb
$ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc-dev_0.5.0_amd64.deb
$ sudo dpkg -i libgrpc_0.5.0_amd64.deb libgrpc-dev_0.5.0_amd64.deb
Otherwise, install from source:
$ curl -fsSL https://goo.gl/getgrpc | bash -s python
::
This will download and run the [gRPC install script][] to install grpc core. The script then uses pip to install this package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python.
git clone https://github.com/grpc/grpc.git
cd grpc
./configure
make && make install
Otherwise, `install from source`_
.. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source
.. _homebrew: http://brew.sh
.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation
.. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install

@ -40,7 +40,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "grpc/_adapter/_c/types.h"
@ -158,9 +158,10 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
return 0;
}
if (PyTuple_Size(op) != OP_TUPLE_SIZE) {
char buf[64];
snprintf(buf, sizeof(buf), "expected tuple op of length %d", OP_TUPLE_SIZE);
char *buf;
gpr_asprintf(&buf, "expected tuple op of length %d", OP_TUPLE_SIZE);
PyErr_SetString(PyExc_ValueError, buf);
gpr_free(buf);
return 0;
}
type = PyInt_AsLong(PyTuple_GET_ITEM(op, TYPE_INDEX));
@ -355,9 +356,14 @@ double pygrpc_cast_gpr_timespec_to_double(gpr_timespec timespec) {
return timespec.tv_sec + 1e-9*timespec.tv_nsec;
}
/* Because C89 doesn't have a way to check for infinity... */
static int pygrpc_isinf(double x) {
return x * 0 != 0;
}
gpr_timespec pygrpc_cast_double_to_gpr_timespec(double seconds) {
gpr_timespec result;
if isinf(seconds) {
if (pygrpc_isinf(seconds)) {
result = seconds > 0.0 ? gpr_inf_future : gpr_inf_past;
} else {
result.tv_sec = (time_t)seconds;

@ -36,6 +36,7 @@ import shutil
import subprocess
import sys
import tempfile
import threading
import time
import unittest
@ -49,13 +50,13 @@ STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub'
SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server'
STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub'
# Timeouts and delays.
SHORT_TIMEOUT = 0.1
NORMAL_TIMEOUT = 1
LONG_TIMEOUT = 2
DOES_NOT_MATTER_DELAY = 0
# The timeout used in tests of RPCs that are supposed to expire.
SHORT_TIMEOUT = 2
# The timeout used in tests of RPCs that are not supposed to expire. The
# absurdly large value doesn't matter since no passing execution of this test
# module will ever wait the duration.
LONG_TIMEOUT = 600
NO_DELAY = 0
LONG_DELAY = 1
# Build mode environment variable set by tools/run_tests/run_tests.py.
_build_mode = os.environ['CONFIG']
@ -64,47 +65,54 @@ _build_mode = os.environ['CONFIG']
class _ServicerMethods(object):
def __init__(self, test_pb2, delay):
self._condition = threading.Condition()
self._delay = delay
self._paused = False
self._failed = False
self.test_pb2 = test_pb2
self.delay = delay
self._fail = False
self._test_pb2 = test_pb2
@contextlib.contextmanager
def pause(self): # pylint: disable=invalid-name
self._paused = True
with self._condition:
self._paused = True
yield
self._paused = False
with self._condition:
self._paused = False
self._condition.notify_all()
@contextlib.contextmanager
def fail(self): # pylint: disable=invalid-name
self._failed = True
with self._condition:
self._fail = True
yield
self._failed = False
with self._condition:
self._fail = False
def _control(self): # pylint: disable=invalid-name
if self._failed:
raise ValueError()
time.sleep(self.delay)
while self._paused:
time.sleep(0)
def UnaryCall(self, request, unused_context):
response = self.test_pb2.SimpleResponse()
response.payload.payload_type = self.test_pb2.COMPRESSABLE
with self._condition:
if self._fail:
raise ValueError()
while self._paused:
self._condition.wait()
time.sleep(self._delay)
def UnaryCall(self, request, unused_rpc_context):
response = self._test_pb2.SimpleResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * request.response_size
self._control()
return response
def StreamingOutputCall(self, request, unused_context):
def StreamingOutputCall(self, request, unused_rpc_context):
for parameter in request.response_parameters:
response = self.test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self.test_pb2.COMPRESSABLE
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def StreamingInputCall(self, request_iter, unused_context):
response = self.test_pb2.StreamingInputCallResponse()
def StreamingInputCall(self, request_iter, unused_rpc_context):
response = self._test_pb2.StreamingInputCallResponse()
aggregated_payload_size = 0
for request in request_iter:
aggregated_payload_size += len(request.payload.payload_compressable)
@ -112,21 +120,21 @@ class _ServicerMethods(object):
self._control()
return response
def FullDuplexCall(self, request_iter, unused_context):
def FullDuplexCall(self, request_iter, unused_rpc_context):
for request in request_iter:
for parameter in request.response_parameters:
response = self.test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self.test_pb2.COMPRESSABLE
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def HalfDuplexCall(self, request_iter, unused_context):
def HalfDuplexCall(self, request_iter, unused_rpc_context):
responses = []
for request in request_iter:
for parameter in request.response_parameters:
response = self.test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self.test_pb2.COMPRESSABLE
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
responses.append(response)
@ -147,12 +155,11 @@ def _CreateService(test_pb2, delay):
waiting for the service.
Args:
test_pb2: the test_pb2 module generated by this test
delay: delay in seconds per response from the servicer
timeout: how long the stub will wait for the servicer by default.
test_pb2: The test_pb2 module generated by this test.
delay: Delay in seconds per response from the servicer.
Yields:
A three-tuple (servicer_methods, servicer, stub), where the servicer is
A (servicer_methods, servicer, stub) three-tuple where servicer_methods is
the back-end of the service bound to the stub and the server and stub
are both activated and ready for use.
"""
@ -185,7 +192,7 @@ def _CreateService(test_pb2, delay):
yield servicer_methods, stub, server
def StreamingInputRequest(test_pb2):
def _streaming_input_request_iterator(test_pb2):
for _ in range(3):
request = test_pb2.StreamingInputCallRequest()
request.payload.payload_type = test_pb2.COMPRESSABLE
@ -193,7 +200,7 @@ def StreamingInputRequest(test_pb2):
yield request
def StreamingOutputRequest(test_pb2):
def _streaming_output_request(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
sizes = [1, 2, 3]
request.response_parameters.add(size=sizes[0], interval_us=0)
@ -202,7 +209,7 @@ def StreamingOutputRequest(test_pb2):
return request
def FullDuplexRequest(test_pb2):
def _full_duplex_request_iterator(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
@ -250,7 +257,7 @@ class PythonPluginTest(unittest.TestCase):
if exc.errno != errno.ENOENT:
raise
# TODO(atash): Figure out which of theses tests is hanging flakily with small
# TODO(atash): Figure out which of these tests is hanging flakily with small
# probability.
def testImportAttributes(self):
@ -265,37 +272,36 @@ class PythonPluginTest(unittest.TestCase):
def testUpDown(self):
import test_pb2
with _CreateService(
test_pb2, DOES_NOT_MATTER_DELAY) as (servicer, stub, unused_server):
test_pb2, NO_DELAY) as (servicer, stub, unused_server):
request = test_pb2.SimpleRequest(response_size=13)
def testUnaryCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
request = test_pb2.SimpleRequest(response_size=13)
response = stub.UnaryCall(request, NORMAL_TIMEOUT)
expected_response = servicer.UnaryCall(request, None)
response = stub.UnaryCall(request, timeout)
expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testUnaryCallAsync(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2, LONG_DELAY) as (
servicer, stub, unused_server):
start_time = time.clock()
response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
# Check that we didn't block on the asynchronous call.
self.assertGreater(LONG_DELAY, time.clock() - start_time)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
# Check that the call does not block waiting for the server to respond.
with methods.pause():
response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
response = response_future.result()
expected_response = servicer.UnaryCall(request, None)
expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testUnaryCallAsyncExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
# set the timeout super low...
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
request = test_pb2.SimpleRequest(response_size=13)
with servicer.pause():
with methods.pause():
response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
@ -305,9 +311,9 @@ class PythonPluginTest(unittest.TestCase):
def testUnaryCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with servicer.pause():
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
response_future = stub.UnaryCall.async(request, 1)
response_future.cancel()
self.assertTrue(response_future.cancelled())
@ -315,30 +321,31 @@ class PythonPluginTest(unittest.TestCase):
def testUnaryCallAsyncFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with servicer.fail():
response_future = stub.UnaryCall.async(request, NORMAL_TIMEOUT)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testStreamingOutputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputRequest(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
responses = stub.StreamingOutputCall(request, NORMAL_TIMEOUT)
expected_responses = servicer.StreamingOutputCall(request, None)
for check in itertools.izip_longest(expected_responses, responses):
expected_response, response = check
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
responses = stub.StreamingOutputCall(request, LONG_TIMEOUT)
expected_responses = methods.StreamingOutputCall(
request, 'not a real RpcContext!')
for expected_response, response in itertools.izip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testStreamingOutputCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputRequest(test_pb2)
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with servicer.pause():
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(responses)
@ -347,9 +354,9 @@ class PythonPluginTest(unittest.TestCase):
'forever and fix.')
def testStreamingOutputCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputRequest(test_pb2)
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
unused_servicer, stub, unused_server):
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
unused_methods, stub, unused_server):
responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
next(responses)
responses.cancel()
@ -360,10 +367,10 @@ class PythonPluginTest(unittest.TestCase):
'instead of raising the proper error.')
def testStreamingOutputCallFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputRequest(test_pb2)
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with servicer.fail():
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
responses = stub.StreamingOutputCall(request, 1)
self.assertIsNotNone(responses)
with self.assertRaises(exceptions.ServicerError):
@ -373,34 +380,32 @@ class PythonPluginTest(unittest.TestCase):
'forever and fix.')
def testStreamingInputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
response = stub.StreamingInputCall(StreamingInputRequest(test_pb2),
NORMAL_TIMEOUT)
expected_response = servicer.StreamingInputCall(
StreamingInputRequest(test_pb2), None)
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
response = stub.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
expected_response = methods.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallAsync(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, LONG_DELAY) as (
servicer, stub, unused_server):
start_time = time.clock()
response_future = stub.StreamingInputCall.async(
StreamingInputRequest(test_pb2), LONG_TIMEOUT)
self.assertGreater(LONG_DELAY, time.clock() - start_time)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
response_future = stub.StreamingInputCall.async(
_streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
response = response_future.result()
expected_response = servicer.StreamingInputCall(
StreamingInputRequest(test_pb2), None)
expected_response = methods.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallAsyncExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
# set the timeout super low...
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with servicer.pause():
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
response_future = stub.StreamingInputCall.async(
StreamingInputRequest(test_pb2), SHORT_TIMEOUT)
_streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
self.assertIsInstance(
@ -408,11 +413,12 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with servicer.pause():
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
response_future = stub.StreamingInputCall.async(
StreamingInputRequest(test_pb2), NORMAL_TIMEOUT)
_streaming_input_request_iterator(test_pb2), timeout)
response_future.cancel()
self.assertTrue(response_future.cancelled())
with self.assertRaises(future.CancelledError):
@ -420,33 +426,33 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallAsyncFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with servicer.fail():
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
response_future = stub.StreamingInputCall.async(
StreamingInputRequest(test_pb2), SHORT_TIMEOUT)
_streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testFullDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
responses = stub.FullDuplexCall(FullDuplexRequest(test_pb2),
NORMAL_TIMEOUT)
expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2),
None)
for check in itertools.izip_longest(expected_responses, responses):
expected_response, response = check
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
responses = stub.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), LONG_TIMEOUT)
expected_responses = methods.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
for expected_response, response in itertools.izip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testFullDuplexCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = FullDuplexRequest(test_pb2)
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with servicer.pause():
responses = stub.FullDuplexCall(request, SHORT_TIMEOUT)
request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(responses)
@ -454,9 +460,9 @@ class PythonPluginTest(unittest.TestCase):
'forever and fix.')
def testFullDuplexCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
request = FullDuplexRequest(test_pb2)
responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT)
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
request_iterator = _full_duplex_request_iterator(test_pb2)
responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
next(responses)
responses.cancel()
with self.assertRaises(future.CancelledError):
@ -466,11 +472,11 @@ class PythonPluginTest(unittest.TestCase):
'and fix.')
def testFullDuplexCallFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = FullDuplexRequest(test_pb2)
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
with servicer.fail():
responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT)
request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
self.assertIsNotNone(responses)
with self.assertRaises(exceptions.ServicerError):
next(responses)
@ -479,9 +485,9 @@ class PythonPluginTest(unittest.TestCase):
'forever and fix.')
def testHalfDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
servicer, stub, unused_server):
def HalfDuplexRequest():
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
@ -489,30 +495,38 @@ class PythonPluginTest(unittest.TestCase):
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT)
expected_responses = servicer.HalfDuplexCall(HalfDuplexRequest(), None)
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), LONG_TIMEOUT)
expected_responses = methods.HalfDuplexCall(
half_duplex_request_iterator(), 'not a real RpcContext!')
for check in itertools.izip_longest(expected_responses, responses):
expected_response, response = check
self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self):
import test_pb2 # pylint: disable=g-import-not-at-top
wait_flag = [False]
condition = threading.Condition()
wait_cell = [False]
@contextlib.contextmanager
def wait(): # pylint: disable=invalid-name
# Where's Python 3's 'nonlocal' statement when you need it?
wait_flag[0] = True
with condition:
wait_cell[0] = True
yield
wait_flag[0] = False
def HalfDuplexRequest():
with condition:
wait_cell[0] = False
condition.notify_all()
def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
while wait_flag[0]:
time.sleep(0.1)
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
with condition:
while wait_cell[0]:
condition.wait()
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
with wait():
responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT)
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), SHORT_TIMEOUT)
# half-duplex waits for the client to send all info
with self.assertRaises(exceptions.ExpirationError):
next(responses)

@ -62,7 +62,7 @@ typedef std::list<grpc_time> deadline_list;
class ClientRpcContext {
public:
ClientRpcContext(int ch) : channel_id_(ch) {}
explicit ClientRpcContext(int ch) : channel_id_(ch) {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, Histogram* hist) = 0;

@ -73,31 +73,35 @@ class AsyncQpsServerTest : public Server {
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
srv_cq_ = builder.AddCompletionQueue();
for (int i = 0; i < config.threads(); i++) {
srv_cqs_.emplace_back(std::move(builder.AddCompletionQueue()));
}
server_ = builder.BuildAndStart();
using namespace std::placeholders;
request_unary_ =
std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_,
_1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4);
request_streaming_ =
std::bind(&TestService::AsyncService::RequestStreamingCall,
&async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3);
for (int i = 0; i < 100; i++) {
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary_, ProcessRPC));
contexts_.push_front(
new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
request_streaming_, ProcessRPC));
for (int i = 0; i < 10; i++) {
for (int j = 0; j < config.threads(); j++) {
auto request_unary = std::bind(
&TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
_2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
auto request_streaming = std::bind(
&TestService::AsyncService::RequestStreamingCall, &async_service_,
_1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary, ProcessRPC));
contexts_.push_front(
new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
request_streaming, ProcessRPC));
}
}
for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
while (srv_cq_->Next(&got_tag, &ok)) {
while (srv_cqs_[i]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok);
@ -125,11 +129,13 @@ class AsyncQpsServerTest : public Server {
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
srv_cq_->Shutdown();
bool ok;
void *got_tag;
while (srv_cq_->Next(&got_tag, &ok))
;
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
(*cq)->Shutdown();
bool ok;
void *got_tag;
while ((*cq)->Next(&got_tag, &ok))
;
}
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
@ -306,15 +312,8 @@ class AsyncQpsServerTest : public Server {
}
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
TestService::AsyncService async_service_;
std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
std::function<void(
ServerContext *,
grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)>
request_streaming_;
std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_;

@ -41,13 +41,35 @@ if [ "$platform" == "linux" ]
then
echo "building $language on Linux"
if [ "$ghprbPullId" != "" ]
then
# if we are building a pull request, grab corresponding refs.
FETCH_PULL_REQUEST_CMD="&& git fetch $GIT_URL refs/pull/$ghprbPullId/merge refs/pull/$ghprbPullId/head"
fi
# Make sure the CID file is gone.
rm -f docker.cid
# Run tests inside docker
docker run grpc/grpc_jenkins_slave bash -c -l "git clone --recursive $GIT_URL /var/local/git/grpc \
&& cd /var/local/git/grpc && git checkout -f $GIT_COMMIT \
docker run --cidfile=docker.cid grpc/grpc_jenkins_slave bash -c -l "git clone --recursive $GIT_URL /var/local/git/grpc \
&& cd /var/local/git/grpc \
$FETCH_PULL_REQUEST_CMD \
&& git checkout -f $GIT_COMMIT \
&& git submodule update \
&& nvm use 0.12 \
&& rvm use ruby-2.1 \
&& tools/run_tests/run_tests.py -t -l $language"
&& tools/run_tests/run_tests.py -t -l $language" || DOCKER_FAILED="true"
DOCKER_CID=`cat docker.cid`
if [ "$DOCKER_FAILED" == "" ]
then
echo "Docker finished successfully, deleting the container $DOCKER_CID"
docker rm $DOCKER_CID
else
echo "Docker exited with failure, keeping container $DOCKER_CID."
echo "You can SSH to the worker and use 'docker start CID' and 'docker exec -i -t CID bash' to debug the problem."
fi
elif [ "$platform" == "windows" ]
then
echo "building $language on Windows"

@ -38,5 +38,5 @@ rm -rf python2.7_virtual_environment
virtualenv -p /usr/bin/python2.7 python2.7_virtual_environment
source python2.7_virtual_environment/bin/activate
pip install -r src/python/requirements.txt
CFLAGS=-I$root/include LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src
CFLAGS="-I$root/include -std=c89" LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src
pip install src/python/interop

@ -223,6 +223,7 @@ class Jobset(object):
self._travis = travis
self._cache = cache
self._stop_on_failure = stop_on_failure
self._hashes = {}
def start(self, spec):
"""Start a job. Return True on success, False on failure."""
@ -231,11 +232,15 @@ class Jobset(object):
self.reap()
if self.cancelled(): return False
if spec.hash_targets:
bin_hash = hashlib.sha1()
for fn in spec.hash_targets:
with open(which(fn)) as f:
bin_hash.update(f.read())
bin_hash = bin_hash.hexdigest()
if spec.identity() in self._hashes:
bin_hash = self._hashes[spec.identity()]
else:
bin_hash = hashlib.sha1()
for fn in spec.hash_targets:
with open(which(fn)) as f:
bin_hash.update(f.read())
bin_hash = bin_hash.hexdigest()
self._hashes[spec.identity()] = bin_hash
should_run = self._cache.should_run(spec.identity(), bin_hash)
else:
bin_hash = None
@ -266,6 +271,7 @@ class Jobset(object):
for job in self._running:
job.kill()
dead.add(job)
break
for job in dead:
self._completed += 1
self._running.remove(job)

@ -313,7 +313,7 @@ _CONFIGS = {
'dbg': SimpleConfig('dbg'),
'opt': SimpleConfig('opt'),
'tsan': SimpleConfig('tsan', environ={
'TSAN_OPTIONS': 'suppressions=tools/tsan_suppressions.txt'}),
'TSAN_OPTIONS': 'suppressions=tools/tsan_suppressions.txt:halt_on_error=1'}),
'msan': SimpleConfig('msan'),
'ubsan': SimpleConfig('ubsan'),
'asan': SimpleConfig('asan', environ={
@ -449,6 +449,7 @@ class TestCache(object):
def __init__(self, use_cache_results):
self._last_successful_run = {}
self._use_cache_results = use_cache_results
self._last_save = time.time()
def should_run(self, cmdline, bin_hash):
if cmdline not in self._last_successful_run:
@ -461,7 +462,8 @@ class TestCache(object):
def finished(self, cmdline, bin_hash):
self._last_successful_run[cmdline] = bin_hash
self.save()
if time.time() - self._last_save > 1:
self.save()
def dump(self):
return [{'cmdline': k, 'hash': v}
@ -473,6 +475,7 @@ class TestCache(object):
def save(self):
with open('.run_tests_cache', 'w') as f:
f.write(json.dumps(self.dump()))
self._last_save = time.time()
def maybe_load(self):
if os.path.exists('.run_tests_cache'):
@ -515,6 +518,8 @@ def _build_and_run(check_cancelled, newline_on_success, travis, cache):
for antagonist in antagonists:
antagonist.kill()
if cache: cache.save()
return 0

Loading…
Cancel
Save