Merge github.com:grpc/grpc into reject-the-stuffs

pull/3021/head
Craig Tiller 9 years ago
commit c530d503d6
  1. 6
      doc/connection-backoff-interop-test-description.md
  2. 9
      doc/connection-backoff.md
  3. 33
      src/csharp/Grpc.Core.Tests/ChannelTest.cs
  4. 15
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  5. 8
      src/csharp/Grpc.Core.Tests/CompressionTest.cs
  6. 8
      src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
  7. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  8. 26
      src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
  9. 8
      src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
  10. 5
      src/csharp/Grpc.Core.Tests/ServerTest.cs
  11. 77
      src/csharp/Grpc.Core.Tests/ShutdownTest.cs
  12. 8
      src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
  13. 50
      src/csharp/Grpc.Core/Channel.cs
  14. 37
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  15. 8
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  16. 6
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  17. 17
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  18. 16
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  19. 7
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  20. 14
      src/csharp/Grpc.Core/Internal/DebugStats.cs
  21. 3
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  22. 10
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  23. 4
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  24. 14
      src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
  25. 57
      src/csharp/Grpc.Core/Server.cs
  26. 20
      src/csharp/Grpc.Examples.MathClient/MathClient.cs
  27. 1
      src/csharp/Grpc.Examples.MathServer/MathServer.cs
  28. 3
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  29. 3
      src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
  30. 36
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  31. 9
      src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
  32. 2
      src/csharp/Grpc.IntegrationTesting/InteropServer.cs
  33. 3
      src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
  34. 63
      src/node/ext/server_credentials.cc
  35. 10
      src/node/health_check/health.js
  36. 5
      src/node/health_check/health.proto
  37. 4
      src/node/interop/interop_server.js
  38. 24
      src/node/test/health_test.js
  39. 4
      src/node/test/server_test.js
  40. 8
      src/php/tests/generated_code/AbstractGeneratedCodeTest.php
  41. 4
      src/php/tests/generated_code/GeneratedCodeTest.php
  42. 4
      src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php
  43. 30
      src/python/grpcio/grpc/framework/core/__init__.py
  44. 59
      src/python/grpcio/grpc/framework/core/_constants.py
  45. 92
      src/python/grpcio/grpc/framework/core/_context.py
  46. 97
      src/python/grpcio/grpc/framework/core/_emission.py
  47. 251
      src/python/grpcio/grpc/framework/core/_end.py
  48. 152
      src/python/grpcio/grpc/framework/core/_expiration.py
  49. 410
      src/python/grpcio/grpc/framework/core/_ingestion.py
  50. 308
      src/python/grpcio/grpc/framework/core/_interfaces.py
  51. 192
      src/python/grpcio/grpc/framework/core/_operation.py
  52. 137
      src/python/grpcio/grpc/framework/core/_reception.py
  53. 212
      src/python/grpcio/grpc/framework/core/_termination.py
  54. 294
      src/python/grpcio/grpc/framework/core/_transmission.py
  55. 46
      src/python/grpcio/grpc/framework/core/_utilities.py
  56. 62
      src/python/grpcio/grpc/framework/core/implementations.py
  57. 39
      src/python/grpcio/grpc/framework/interfaces/base/base.py
  58. 2
      src/python/grpcio/grpc/framework/interfaces/links/links.py
  59. 165
      src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
  60. 30
      src/python/grpcio_test/grpc_test/framework/core/__init__.py
  61. 96
      src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
  62. 6
      src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
  63. 101
      src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
  64. 57
      src/ruby/ext/grpc/rb_call.c
  65. 5
      src/ruby/lib/grpc/generic/active_call.rb
  66. 8
      src/ruby/spec/call_spec.rb
  67. 28
      src/ruby/spec/generic/active_call_spec.rb
  68. 8
      tools/run_tests/run_python.sh

@ -31,9 +31,9 @@ Clients should accept these arguments:
* --server_retry_port=PORT
* The server port to connect to for testing backoffs. For example, "8081"
The client must connect to the control port without TLS. The client should
either assert on the server returned backoff status or check the returned
backoffs on its own.
The client must connect to the control port without TLS. The client must connect
to the retry port with TLS. The client should either assert on the server
returned backoff status or check the returned backoffs on its own.
Procedure of client:

@ -44,3 +44,12 @@ different jitter logic.
Alternate implementations must ensure that connection backoffs started at the
same time disperse, and must not attempt connections substantially more often
than the above algorithm.
## Reset Backoff
The back off should be reset to INITIAL_BACKOFF at some time point, so that the
reconnecting behavior is consistent no matter the connection is a newly started
one or a previously disconnected one.
We choose to reset the Backoff when the SETTINGS frame is received, at that time
point, we know for sure that this connection was accepted by the server.

@ -41,12 +41,6 @@ namespace Grpc.Core.Tests
{
public class ChannelTest
{
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void Constructor_RejectsInvalidParams()
{
@ -56,36 +50,33 @@ namespace Grpc.Core.Tests
[Test]
public void State_IdleAfterCreation()
{
using (var channel = new Channel("localhost", Credentials.Insecure))
{
Assert.AreEqual(ChannelState.Idle, channel.State);
}
var channel = new Channel("localhost", Credentials.Insecure);
Assert.AreEqual(ChannelState.Idle, channel.State);
channel.ShutdownAsync().Wait();
}
[Test]
public void WaitForStateChangedAsync_InvalidArgument()
{
using (var channel = new Channel("localhost", Credentials.Insecure))
{
Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
}
var channel = new Channel("localhost", Credentials.Insecure);
Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
channel.ShutdownAsync().Wait();
}
[Test]
public void ResolvedTarget()
{
using (var channel = new Channel("127.0.0.1", Credentials.Insecure))
{
Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1"));
}
var channel = new Channel("127.0.0.1", Credentials.Insecure);
Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1"));
channel.ShutdownAsync().Wait();
}
[Test]
public void Dispose_IsIdempotent()
public void Shutdown_AllowedOnlyOnce()
{
var channel = new Channel("localhost", Credentials.Insecure);
channel.Dispose();
channel.Dispose();
channel.ShutdownAsync().Wait();
Assert.Throws(typeof(InvalidOperationException), () => channel.ShutdownAsync().GetAwaiter().GetResult());
}
}
}

@ -63,16 +63,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public async Task UnaryCall()
{
@ -207,13 +201,6 @@ namespace Grpc.Core.Tests
CollectionAssert.AreEqual(headers[1].ValueBytes, trailers[1].ValueBytes);
}
[Test]
public void UnaryCall_DisposedChannel()
{
channel.Dispose();
Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public void UnaryCallPerformance()
{

@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void WriteOptions_Unary()
{

@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public async Task PropagateCancellation()
{

@ -64,6 +64,7 @@
<Link>Version.cs</Link>
</Compile>
<Compile Include="ClientBaseTest.cs" />
<Compile Include="ShutdownTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" />

@ -43,33 +43,39 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAndShutdownGrpcEnvironment()
{
var env = GrpcEnvironment.GetInstance();
var env = GrpcEnvironment.AddRef();
Assert.IsNotNull(env.CompletionQueue);
GrpcEnvironment.Shutdown();
GrpcEnvironment.Release();
}
[Test]
public void SubsequentInvocations()
{
var env1 = GrpcEnvironment.GetInstance();
var env2 = GrpcEnvironment.GetInstance();
var env1 = GrpcEnvironment.AddRef();
var env2 = GrpcEnvironment.AddRef();
Assert.IsTrue(object.ReferenceEquals(env1, env2));
GrpcEnvironment.Shutdown();
GrpcEnvironment.Shutdown();
GrpcEnvironment.Release();
GrpcEnvironment.Release();
}
[Test]
public void InitializeAfterShutdown()
{
var env1 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
var env1 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
var env2 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
var env2 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
Assert.IsFalse(object.ReferenceEquals(env1, env2));
}
[Test]
public void ReleaseWithoutAddRef()
{
Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
}
[Test]
public void GetCoreVersionString()
{

@ -69,16 +69,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void WriteResponseHeaders_NullNotAllowed()
{

@ -51,7 +51,6 @@ namespace Grpc.Core.Tests
};
server.Start();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]
@ -67,8 +66,7 @@ namespace Grpc.Core.Tests
Assert.Greater(boundPort.BoundPort, 0);
server.Start();
server.ShutdownAsync();
GrpcEnvironment.Shutdown();
server.ShutdownAsync().Wait();
}
[Test]
@ -83,7 +81,6 @@ namespace Grpc.Core.Tests
Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build()));
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
}

@ -0,0 +1,77 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ShutdownTest
{
const string Host = "127.0.0.1";
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper(Host);
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[Test]
public async Task AbandonedCall()
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
await requestStream.ToListAsync();
});
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddMilliseconds(1))));
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
}
}

@ -65,16 +65,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void InfiniteDeadline()
{

@ -45,14 +45,19 @@ namespace Grpc.Core
/// <summary>
/// gRPC Channel
/// </summary>
public class Channel : IDisposable
public class Channel
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
readonly object myLock = new object();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
readonly string target;
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options;
bool shutdownRequested;
bool disposed;
/// <summary>
@ -65,7 +70,7 @@ namespace Grpc.Core
public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null)
{
this.target = Preconditions.CheckNotNull(target, "target");
this.environment = GrpcEnvironment.GetInstance();
this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
EnsureUserAgentChannelOption(this.options);
@ -172,12 +177,26 @@ namespace Grpc.Core
}
/// <summary>
/// Destroys the underlying channel.
/// Waits until there are no more active calls for this channel and then cleans up
/// resources used by this channel.
/// </summary>
public void Dispose()
public async Task ShutdownAsync()
{
Dispose(true);
GC.SuppressFinalize(this);
lock (myLock)
{
Preconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0)
{
Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount);
}
handle.Dispose();
await Task.Run(() => GrpcEnvironment.Release());
}
internal ChannelSafeHandle Handle
@ -196,13 +215,20 @@ namespace Grpc.Core
}
}
protected virtual void Dispose(bool disposing)
internal void AddCallReference(object call)
{
if (disposing && handle != null && !disposed)
{
disposed = true;
handle.Dispose();
}
activeCallCounter.Increment();
bool success = false;
handle.DangerousAddRef(ref success);
Preconditions.CheckState(success);
}
internal void RemoveCallReference(object call)
{
handle.DangerousRelease();
activeCallCounter.Decrement();
}
private static void EnsureUserAgentChannelOption(List<ChannelOption> options)

@ -58,6 +58,7 @@ namespace Grpc.Core
static object staticLock = new object();
static GrpcEnvironment instance;
static int refCount;
static ILogger logger = new ConsoleLogger();
@ -67,13 +68,14 @@ namespace Grpc.Core
bool isClosed;
/// <summary>
/// Returns an instance of initialized gRPC environment.
/// Subsequent invocations return the same instance unless Shutdown has been called first.
/// Returns a reference-counted instance of initialized gRPC environment.
/// Subsequent invocations return the same instance unless reference count has dropped to zero previously.
/// </summary>
internal static GrpcEnvironment GetInstance()
internal static GrpcEnvironment AddRef()
{
lock (staticLock)
{
refCount++;
if (instance == null)
{
instance = new GrpcEnvironment();
@ -83,14 +85,16 @@ namespace Grpc.Core
}
/// <summary>
/// Shuts down the gRPC environment if it was initialized before.
/// Blocks until the environment has been fully shutdown.
/// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
/// (and blocks until the environment has been fully shutdown).
/// </summary>
public static void Shutdown()
internal static void Release()
{
lock (staticLock)
{
if (instance != null)
Preconditions.CheckState(refCount > 0);
refCount--;
if (refCount == 0)
{
instance.Close();
instance = null;
@ -125,12 +129,10 @@ namespace Grpc.Core
private GrpcEnvironment()
{
NativeLogRedirector.Redirect();
grpcsharp_init();
GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
threadPool.Start();
// TODO: use proper logging here
Logger.Info("gRPC initialized.");
}
/// <summary>
@ -175,6 +177,17 @@ namespace Grpc.Core
return Marshal.PtrToStringAnsi(ptr);
}
internal static void GrpcNativeInit()
{
grpcsharp_init();
}
internal static void GrpcNativeShutdown()
{
grpcsharp_shutdown();
}
/// <summary>
/// Shuts down this environment.
/// </summary>
@ -185,12 +198,10 @@ namespace Grpc.Core
throw new InvalidOperationException("Close has already been called");
}
threadPool.Stop();
grpcsharp_shutdown();
GrpcNativeShutdown();
isClosed = true;
debugStats.CheckOK();
Logger.Info("gRPC shutdown.");
}
}
}

@ -311,9 +311,9 @@ namespace Grpc.Core.Internal
}
}
protected override void OnReleaseResources()
protected override void OnAfterReleaseResources()
{
details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
details.Channel.RemoveCallReference(this);
}
private void Initialize(CompletionQueueSafeHandle cq)
@ -323,7 +323,9 @@ namespace Grpc.Core.Internal
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
details.Channel.AddCallReference(this);
InitializeInternal(call);
RegisterCancellationCallback();
}

@ -189,15 +189,15 @@ namespace Grpc.Core.Internal
private void ReleaseResources()
{
OnReleaseResources();
if (call != null)
{
call.Dispose();
}
disposed = true;
OnAfterReleaseResources();
}
protected virtual void OnReleaseResources()
protected virtual void OnAfterReleaseResources()
{
}
@ -212,7 +212,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
protected void CheckReadingAllowed()
protected virtual void CheckReadingAllowed()
{
Preconditions.CheckState(started);
Preconditions.CheckState(!disposed);

@ -50,16 +50,19 @@ namespace Grpc.Core.Internal
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly GrpcEnvironment environment;
readonly Server server;
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer)
{
this.environment = Preconditions.CheckNotNull(environment);
this.server = Preconditions.CheckNotNull(server);
}
public void Initialize(CallSafeHandle call)
{
call.SetCompletionRegistry(environment.CompletionRegistry);
environment.DebugStats.ActiveServerCalls.Increment();
server.AddCallReference(this);
InitializeInternal(call);
}
@ -168,9 +171,15 @@ namespace Grpc.Core.Internal
}
}
protected override void OnReleaseResources()
protected override void CheckReadingAllowed()
{
base.CheckReadingAllowed();
Preconditions.CheckArgument(!cancelRequested);
}
protected override void OnAfterReleaseResources()
{
environment.DebugStats.ActiveServerCalls.Decrement();
server.RemoveCallReference(this);
}
/// <summary>

@ -134,7 +134,7 @@ namespace Grpc.Core.Internal
}
// Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew()
public ServerRpcNew GetServerRpcNew(Server server)
{
var call = grpcsharp_batch_context_server_rpc_new_call(this);
@ -145,7 +145,7 @@ namespace Grpc.Core.Internal
IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
return new ServerRpcNew(call, method, host, deadline, metadata);
return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
// Gets data of receive_close_on_server completion.
@ -198,14 +198,16 @@ namespace Grpc.Core.Internal
/// </summary>
internal struct ServerRpcNew
{
readonly Server server;
readonly CallSafeHandle call;
readonly string method;
readonly string host;
readonly Timespec deadline;
readonly Metadata requestMetadata;
public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
{
this.server = server;
this.call = call;
this.method = method;
this.host = host;
@ -213,6 +215,14 @@ namespace Grpc.Core.Internal
this.requestMetadata = requestMetadata;
}
public Server Server
{
get
{
return this.server;
}
}
public CallSafeHandle Call
{
get

@ -68,11 +68,17 @@ namespace Grpc.Core.Internal
public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return grpcsharp_insecure_channel_create(target, channelArgs);
}
public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
@ -107,6 +113,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);
GrpcEnvironment.GrpcNativeShutdown();
return true;
}
}

@ -38,10 +38,6 @@ namespace Grpc.Core.Internal
{
internal class DebugStats
{
public readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
/// <summary>
@ -49,16 +45,6 @@ namespace Grpc.Core.Internal
/// </summary>
public void CheckOK()
{
var remainingClientCalls = ActiveClientCalls.Count;
if (remainingClientCalls != 0)
{
DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
}
var remainingServerCalls = ActiveServerCalls.Count;
if (remainingServerCalls != 0)
{
DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
}
var pendingBatchCompletions = PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0)
{

@ -83,8 +83,6 @@ namespace Grpc.Core.Internal
lock (myLock)
{
cq.Shutdown();
Logger.Info("Waiting for GRPC threads to finish.");
foreach (var thread in threads)
{
thread.Join();
@ -136,7 +134,6 @@ namespace Grpc.Core.Internal
}
}
while (ev.type != GRPCCompletionType.Shutdown);
Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name);
}
}
}

@ -67,7 +67,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@ -123,7 +123,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@ -179,7 +179,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@ -239,7 +239,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@ -278,7 +278,7 @@ namespace Grpc.Core.Internal
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload, environment);
(payload) => payload, (payload) => payload, environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();

@ -74,6 +74,9 @@ namespace Grpc.Core.Internal
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return grpcsharp_server_create(cq, args);
}
@ -109,6 +112,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle()
{
grpcsharp_server_destroy(handle);
GrpcEnvironment.GrpcNativeShutdown();
return true;
}

@ -51,7 +51,19 @@ namespace Grpc.Core.Logging
private ConsoleLogger(Type forType)
{
this.forType = forType;
this.forTypeString = forType != null ? forType.FullName + " " : "";
if (forType != null)
{
var namespaceStr = forType.Namespace ?? "";
if (namespaceStr.Length > 0)
{
namespaceStr += ".";
}
this.forTypeString = namespaceStr + forType.Name + " ";
}
else
{
this.forTypeString = "";
}
}
/// <summary>

@ -50,6 +50,8 @@ namespace Grpc.Core
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
readonly ServiceDefinitionCollection serviceDefinitions;
readonly ServerPortCollection ports;
readonly GrpcEnvironment environment;
@ -73,7 +75,7 @@ namespace Grpc.Core
{
this.serviceDefinitions = new ServiceDefinitionCollection(this);
this.ports = new ServerPortCollection(this);
this.environment = GrpcEnvironment.GetInstance();
this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
@ -105,6 +107,17 @@ namespace Grpc.Core
}
}
/// <summary>
/// To allow awaiting termination of the server.
/// </summary>
public Task ShutdownTask
{
get
{
return shutdownTcs.Task;
}
}
/// <summary>
/// Starts the server.
/// </summary>
@ -136,18 +149,9 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment);
await shutdownTcs.Task;
handle.Dispose();
}
DisposeHandle();
/// <summary>
/// To allow awaiting termination of the server.
/// </summary>
public Task ShutdownTask
{
get
{
return shutdownTcs.Task;
}
await Task.Run(() => GrpcEnvironment.Release());
}
/// <summary>
@ -166,7 +170,22 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment);
handle.CancelAllCalls();
await shutdownTcs.Task;
handle.Dispose();
DisposeHandle();
}
internal void AddCallReference(object call)
{
activeCallCounter.Increment();
bool success = false;
handle.DangerousAddRef(ref success);
Preconditions.CheckState(success);
}
internal void RemoveCallReference(object call)
{
handle.DangerousRelease();
activeCallCounter.Decrement();
}
/// <summary>
@ -227,6 +246,16 @@ namespace Grpc.Core
}
}
private void DisposeHandle()
{
var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0)
{
Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount);
}
handle.Dispose();
}
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
@ -254,7 +283,7 @@ namespace Grpc.Core
{
if (success)
{
ServerRpcNew newRpc = ctx.GetServerRpcNew();
ServerRpcNew newRpc = ctx.GetServerRpcNew(this);
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)

@ -39,23 +39,21 @@ namespace math
{
public static void Main(string[] args)
{
using (Channel channel = new Channel("127.0.0.1", 23456, Credentials.Insecure))
{
Math.IMathClient client = new Math.MathClient(channel);
MathExamples.DivExample(client);
var channel = new Channel("127.0.0.1", 23456, Credentials.Insecure);
Math.IMathClient client = new Math.MathClient(channel);
MathExamples.DivExample(client);
MathExamples.DivAsyncExample(client).Wait();
MathExamples.DivAsyncExample(client).Wait();
MathExamples.FibExample(client).Wait();
MathExamples.FibExample(client).Wait();
MathExamples.SumExample(client).Wait();
MathExamples.SumExample(client).Wait();
MathExamples.DivManyExample(client).Wait();
MathExamples.DivManyExample(client).Wait();
MathExamples.DependendRequestsExample(client).Wait();
}
MathExamples.DependendRequestsExample(client).Wait();
GrpcEnvironment.Shutdown();
channel.ShutdownAsync().Wait();
}
}
}

@ -56,7 +56,6 @@ namespace math
Console.ReadKey();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
}

@ -68,9 +68,8 @@ namespace math.Tests
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]

@ -71,10 +71,9 @@ namespace Grpc.HealthCheck.Tests
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]

@ -120,12 +120,10 @@ namespace Grpc.IntegrationTesting
};
}
using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions))
{
TestService.TestServiceClient client = new TestService.TestServiceClient(channel);
await RunTestCaseAsync(options.testCase, client);
}
GrpcEnvironment.Shutdown();
var channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions);
TestService.TestServiceClient client = new TestService.TestServiceClient(channel);
await RunTestCaseAsync(options.testCase, client);
channel.ShutdownAsync().Wait();
}
private async Task RunTestCaseAsync(string testCase, TestService.TestServiceClient client)
@ -171,6 +169,9 @@ namespace Grpc.IntegrationTesting
case "cancel_after_first_response":
await RunCancelAfterFirstResponseAsync(client);
break;
case "timeout_on_sleeping_server":
await RunTimeoutOnSleepingServerAsync(client);
break;
case "benchmark_empty_unary":
RunBenchmarkEmptyUnary(client);
break;
@ -460,6 +461,29 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!");
}
public static async Task RunTimeoutOnSleepingServerAsync(TestService.ITestServiceClient client)
{
Console.WriteLine("running timeout_on_sleeping_server");
var deadline = DateTime.UtcNow.AddMilliseconds(1);
using (var call = client.FullDuplexCall(deadline: deadline))
{
try
{
await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
.SetPayload(CreateZerosPayload(27182)).Build());
}
catch (InvalidOperationException)
{
// Deadline was reached before write has started. Eat the exception and continue.
}
var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.DeadlineExceeded, ex.Status.StatusCode);
}
Console.WriteLine("Passed!");
}
// This is not an official interop test, but it's useful.
public static void RunBenchmarkEmptyUnary(TestService.ITestServiceClient client)
{

@ -75,9 +75,8 @@ namespace Grpc.IntegrationTesting
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]
@ -127,5 +126,11 @@ namespace Grpc.IntegrationTesting
{
await InteropClient.RunCancelAfterFirstResponseAsync(client);
}
[Test]
public async Task TimeoutOnSleepingServerAsync()
{
await InteropClient.RunTimeoutOnSleepingServerAsync(client);
}
}
}

@ -107,8 +107,6 @@ namespace Grpc.IntegrationTesting
server.Start();
server.ShutdownTask.Wait();
GrpcEnvironment.Shutdown();
}
private static ServerOptions ParseArguments(string[] args)

@ -85,9 +85,8 @@ namespace Grpc.IntegrationTesting
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]

@ -41,6 +41,7 @@
namespace grpc {
namespace node {
using v8::Array;
using v8::Exception;
using v8::External;
using v8::Function;
@ -52,6 +53,7 @@ using v8::Local;
using v8::Object;
using v8::ObjectTemplate;
using v8::Persistent;
using v8::String;
using v8::Value;
NanCallback *ServerCredentials::constructor;
@ -122,25 +124,66 @@ NAN_METHOD(ServerCredentials::CreateSsl) {
// TODO: have the node API support multiple key/cert pairs.
NanScope();
char *root_certs = NULL;
grpc_ssl_pem_key_cert_pair key_cert_pair;
if (::node::Buffer::HasInstance(args[0])) {
root_certs = ::node::Buffer::Data(args[0]);
} else if (!(args[0]->IsNull() || args[0]->IsUndefined())) {
return NanThrowTypeError(
"createSSl's first argument must be a Buffer if provided");
}
if (!::node::Buffer::HasInstance(args[1])) {
return NanThrowTypeError("createSsl's second argument must be a Buffer");
if (!args[1]->IsArray()) {
return NanThrowTypeError(
"createSsl's second argument must be a list of objects");
}
int force_client_auth = 0;
if (args[2]->IsBoolean()) {
force_client_auth = (int)args[2]->BooleanValue();
} else if (!(args[2]->IsUndefined() || args[2]->IsNull())) {
return NanThrowTypeError(
"createSsl's third argument must be a boolean if provided");
}
key_cert_pair.private_key = ::node::Buffer::Data(args[1]);
if (!::node::Buffer::HasInstance(args[2])) {
return NanThrowTypeError("createSsl's third argument must be a Buffer");
Handle<Array> pair_list = Local<Array>::Cast(args[1]);
uint32_t key_cert_pair_count = pair_list->Length();
grpc_ssl_pem_key_cert_pair *key_cert_pairs = new grpc_ssl_pem_key_cert_pair[
key_cert_pair_count];
Handle<String> key_key = NanNew("private_key");
Handle<String> cert_key = NanNew("cert_chain");
for(uint32_t i = 0; i < key_cert_pair_count; i++) {
if (!pair_list->Get(i)->IsObject()) {
delete key_cert_pairs;
return NanThrowTypeError("Key/cert pairs must be objects");
}
Handle<Object> pair_obj = pair_list->Get(i)->ToObject();
if (!pair_obj->HasOwnProperty(key_key)) {
delete key_cert_pairs;
return NanThrowTypeError(
"Key/cert pairs must have a private_key and a cert_chain");
}
if (!pair_obj->HasOwnProperty(cert_key)) {
delete key_cert_pairs;
return NanThrowTypeError(
"Key/cert pairs must have a private_key and a cert_chain");
}
if (!::node::Buffer::HasInstance(pair_obj->Get(key_key))) {
delete key_cert_pairs;
return NanThrowTypeError("private_key must be a Buffer");
}
if (!::node::Buffer::HasInstance(pair_obj->Get(cert_key))) {
delete key_cert_pairs;
return NanThrowTypeError("cert_chain must be a Buffer");
}
key_cert_pairs[i].private_key = ::node::Buffer::Data(
pair_obj->Get(key_key));
key_cert_pairs[i].cert_chain = ::node::Buffer::Data(
pair_obj->Get(cert_key));
}
key_cert_pair.cert_chain = ::node::Buffer::Data(args[2]);
// TODO Add a force_client_auth parameter and pass it as the last parameter
// here.
grpc_server_credentials *creds =
grpc_ssl_server_credentials_create(root_certs, &key_cert_pair, 1, 0);
grpc_ssl_server_credentials_create(root_certs,
key_cert_pairs,
key_cert_pair_count,
force_client_auth);
delete key_cert_pairs;
if (creds == NULL) {
NanReturnNull();
}

@ -45,17 +45,13 @@ function HealthImplementation(statusMap) {
this.statusMap = _.clone(statusMap);
}
HealthImplementation.prototype.setStatus = function(host, service, status) {
if (!this.statusMap[host]) {
this.statusMap[host] = {};
}
this.statusMap[host][service] = status;
HealthImplementation.prototype.setStatus = function(service, status) {
this.statusMap[service] = status;
};
HealthImplementation.prototype.check = function(call, callback){
var host = call.request.host;
var service = call.request.service;
var status = _.get(this.statusMap, [host, service], null);
var status = _.get(this.statusMap, service, null);
if (status === null) {
callback({code:grpc.status.NOT_FOUND});
} else {

@ -32,8 +32,7 @@ syntax = "proto3";
package grpc.health.v1alpha;
message HealthCheckRequest {
string host = 1;
string service = 2;
string service = 1;
}
message HealthCheckResponse {
@ -47,4 +46,4 @@ message HealthCheckResponse {
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}
}

@ -169,8 +169,8 @@ function getServer(port, tls) {
var key_data = fs.readFileSync(key_path);
var pem_data = fs.readFileSync(pem_path);
server_creds = grpc.ServerCredentials.createSsl(null,
key_data,
pem_data);
[{private_key: key_data,
cert_chain: pem_data}]);
} else {
server_creds = grpc.ServerCredentials.createInsecure();
}

@ -41,13 +41,9 @@ var grpc = require('../');
describe('Health Checking', function() {
var statusMap = {
'': {
'': 'SERVING',
'grpc.test.TestService': 'NOT_SERVING',
},
virtual_host: {
'grpc.test.TestService': 'SERVING'
}
'': 'SERVING',
'grpc.test.TestServiceNotServing': 'NOT_SERVING',
'grpc.test.TestServiceServing': 'SERVING'
};
var healthServer = new grpc.Server();
healthServer.addProtoService(health.service,
@ -71,15 +67,15 @@ describe('Health Checking', function() {
});
});
it('should say that a disabled service is NOT_SERVING', function(done) {
healthClient.check({service: 'grpc.test.TestService'},
healthClient.check({service: 'grpc.test.TestServiceNotServing'},
function(err, response) {
assert.ifError(err);
assert.strictEqual(response.status, 'NOT_SERVING');
done();
});
});
it('should say that a service on another host is SERVING', function(done) {
healthClient.check({host: 'virtual_host', service: 'grpc.test.TestService'},
it('should say that an enabled service is SERVING', function(done) {
healthClient.check({service: 'grpc.test.TestServiceServing'},
function(err, response) {
assert.ifError(err);
assert.strictEqual(response.status, 'SERVING');
@ -93,12 +89,4 @@ describe('Health Checking', function() {
done();
});
});
it('should get NOT_FOUND if the host is not registered', function(done) {
healthClient.check({host: 'wrong_host', service: 'grpc.test.TestService'},
function(err, response) {
assert(err);
assert.strictEqual(err.code, grpc.status.NOT_FOUND);
done();
});
});
});

@ -70,7 +70,9 @@ describe('server', function() {
var pem_path = path.join(__dirname, '../test/data/server1.pem');
var key_data = fs.readFileSync(key_path);
var pem_data = fs.readFileSync(pem_path);
var creds = grpc.ServerCredentials.createSsl(null, key_data, pem_data);
var creds = grpc.ServerCredentials.createSsl(null,
[{private_key: key_data,
cert_chain: pem_data}]);
assert.doesNotThrow(function() {
port = server.addHttp2Port('0.0.0.0:0', creds);
});

@ -39,6 +39,14 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase {
protected static $client;
protected static $timeout;
public function testWaitForNotReady() {
$this->assertFalse(self::$client->waitForReady(1));
}
public function testWaitForReady() {
$this->assertTrue(self::$client->waitForReady(250000));
}
public function testSimpleRequest() {
$div_arg = new math\DivArgs();
$div_arg->setDividend(7);

@ -35,7 +35,7 @@ require 'AbstractGeneratedCodeTest.php';
class GeneratedCodeTest extends AbstractGeneratedCodeTest {
public static function setUpBeforeClass() {
self::$client = new math\MathClient(new Grpc\BaseStub(
getenv('GRPC_TEST_HOST'), []));
self::$client = new math\MathClient(
getenv('GRPC_TEST_HOST'), []);
}
}

@ -35,13 +35,13 @@ require 'AbstractGeneratedCodeTest.php';
class GeneratedCodeWithCallbackTest extends AbstractGeneratedCodeTest {
public static function setUpBeforeClass() {
self::$client = new math\MathClient(new Grpc\BaseStub(
self::$client = new math\MathClient(
getenv('GRPC_TEST_HOST'), ['update_metadata' =>
function($a_hash,
$client = array()) {
$a_copy = $a_hash;
$a_copy['foo'] = ['bar'];
return $a_copy;
}]));
}]);
}
}

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,59 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Private constants for the package."""
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.links import links
TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = {
base.Subscription.Kind.NONE: links.Ticket.Subscription.NONE,
base.Subscription.Kind.TERMINATION_ONLY:
links.Ticket.Subscription.TERMINATION,
base.Subscription.Kind.FULL: links.Ticket.Subscription.FULL,
}
# Mapping from abortive operation outcome to ticket termination to be
# sent to the other side of the operation, or None to indicate that no
# ticket should be sent to the other side in the event of such an
# outcome.
ABORTION_OUTCOME_TO_TICKET_TERMINATION = {
base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION,
base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION,
base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
base.Outcome.REMOTE_SHUTDOWN: None,
base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE,
base.Outcome.TRANSMISSION_FAILURE: None,
base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
}
INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:'
TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE = (
'Exception calling termination callback!')

@ -0,0 +1,92 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for operation context."""
import time
# _interfaces is referenced from specification in this module.
from grpc.framework.core import _interfaces # pylint: disable=unused-import
from grpc.framework.interfaces.base import base
class OperationContext(base.OperationContext):
"""An implementation of interfaces.OperationContext."""
def __init__(
self, lock, termination_manager, transmission_manager,
expiration_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
"""
self._lock = lock
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
def _abort(self, outcome):
with self._lock:
if self._termination_manager.outcome is None:
self._termination_manager.abort(outcome)
self._transmission_manager.abort(outcome)
self._expiration_manager.terminate()
def outcome(self):
"""See base.OperationContext.outcome for specification."""
with self._lock:
return self._termination_manager.outcome
def add_termination_callback(self, callback):
"""See base.OperationContext.add_termination_callback."""
with self._lock:
if self._termination_manager.outcome is None:
self._termination_manager.add_callback(callback)
return None
else:
return self._termination_manager.outcome
def time_remaining(self):
"""See base.OperationContext.time_remaining for specification."""
with self._lock:
deadline = self._expiration_manager.deadline()
return max(0.0, deadline - time.time())
def cancel(self):
"""See base.OperationContext.cancel for specification."""
self._abort(base.Outcome.CANCELLED)
def fail(self, exception):
"""See base.OperationContext.fail for specification."""
self._abort(base.Outcome.LOCAL_FAILURE)

@ -0,0 +1,97 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for handling emitted values."""
from grpc.framework.core import _interfaces
from grpc.framework.interfaces.base import base
class EmissionManager(_interfaces.EmissionManager):
"""An EmissionManager implementation."""
def __init__(
self, lock, termination_manager, transmission_manager,
expiration_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
"""
self._lock = lock
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
self._ingestion_manager = None
self._initial_metadata_seen = False
self._payload_seen = False
self._completion_seen = False
def set_ingestion_manager(self, ingestion_manager):
"""Sets the ingestion manager with which this manager will cooperate.
Args:
ingestion_manager: The _interfaces.IngestionManager for the operation.
"""
self._ingestion_manager = ingestion_manager
def advance(
self, initial_metadata=None, payload=None, completion=None,
allowance=None):
initial_metadata_present = initial_metadata is not None
payload_present = payload is not None
completion_present = completion is not None
allowance_present = allowance is not None
with self._lock:
if self._termination_manager.outcome is None:
if (initial_metadata_present and (
self._initial_metadata_seen or self._payload_seen or
self._completion_seen) or
payload_present and self._completion_seen or
completion_present and self._completion_seen or
allowance_present and allowance <= 0):
self._termination_manager.abort(base.Outcome.LOCAL_FAILURE)
self._transmission_manager.abort(base.Outcome.LOCAL_FAILURE)
self._expiration_manager.terminate()
else:
self._initial_metadata_seen |= initial_metadata_present
self._payload_seen |= payload_present
self._completion_seen |= completion_present
if completion_present:
self._termination_manager.emission_complete()
self._ingestion_manager.local_emissions_done()
self._transmission_manager.advance(
initial_metadata, payload, completion, allowance)
if allowance_present:
self._ingestion_manager.add_local_allowance(allowance)

@ -0,0 +1,251 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Implementation of base.End."""
import abc
import enum
import threading
import uuid
from grpc.framework.core import _operation
from grpc.framework.core import _utilities
from grpc.framework.foundation import callable_util
from grpc.framework.foundation import later
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.links import links
from grpc.framework.interfaces.links import utilities
_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
class End(base.End, links.Link):
"""A bridge between base.End and links.Link.
Implementations of this interface translate arriving tickets into
calls on application objects implementing base interfaces and
translate calls from application objects implementing base interfaces
into tickets sent to a joined link.
"""
__metaclass__ = abc.ABCMeta
class _Cycle(object):
"""State for a single start-stop End lifecycle."""
def __init__(self, pool):
self.pool = pool
self.grace = False
self.futures = []
self.operations = {}
self.idle_actions = []
def _abort(operations):
for operation in operations:
operation.abort(base.Outcome.LOCAL_SHUTDOWN)
def _cancel_futures(futures):
for future in futures:
futures.cancel()
def _future_shutdown(lock, cycle, event):
def in_future():
with lock:
_abort(cycle.operations.values())
_cancel_futures(cycle.futures)
pool = cycle.pool
cycle.pool.shutdown(wait=True)
return in_future
def _termination_action(lock, stats, operation_id, cycle):
"""Constructs the termination action for a single operation.
Args:
lock: A lock to hold during the termination action.
states: A mapping from base.Outcome values to integers to increment with
the outcome given to the termination action.
operation_id: The operation ID for the termination action.
cycle: A _Cycle value to be updated during the termination action.
Returns:
A callable that takes an operation outcome as its sole parameter and that
should be used as the termination action for the operation associated
with the given operation ID.
"""
def termination_action(outcome):
with lock:
stats[outcome] += 1
cycle.operations.pop(operation_id, None)
if not cycle.operations:
for action in cycle.idle_actions:
cycle.pool.submit(action)
cycle.idle_actions = []
if cycle.grace:
_cancel_futures(cycle.futures)
return termination_action
class _End(End):
"""An End implementation."""
def __init__(self, servicer_package):
"""Constructor.
Args:
servicer_package: A _ServicerPackage for servicing operations or None if
this end will not be used to service operations.
"""
self._lock = threading.Condition()
self._servicer_package = servicer_package
self._stats = {outcome: 0 for outcome in base.Outcome}
self._mate = None
self._cycle = None
def start(self):
"""See base.End.start for specification."""
with self._lock:
if self._cycle is not None:
raise ValueError('Tried to start a not-stopped End!')
else:
self._cycle = _Cycle(logging_pool.pool(1))
def stop(self, grace):
"""See base.End.stop for specification."""
with self._lock:
if self._cycle is None:
event = threading.Event()
event.set()
return event
elif not self._cycle.operations:
event = threading.Event()
self._cycle.pool.submit(event.set)
self._cycle.pool.shutdown(wait=False)
self._cycle = None
return event
else:
self._cycle.grace = True
event = threading.Event()
self._cycle.idle_actions.append(event.set)
if 0 < grace:
future = later.later(
grace, _future_shutdown(self._lock, self._cycle, event))
self._cycle.futures.append(future)
else:
_abort(self._cycle.operations.values())
return event
def operate(
self, group, method, subscription, timeout, initial_metadata=None,
payload=None, completion=None):
"""See base.End.operate for specification."""
operation_id = uuid.uuid4()
with self._lock:
if self._cycle is None or self._cycle.grace:
raise ValueError('Can\'t operate on stopped or stopping End!')
termination_action = _termination_action(
self._lock, self._stats, operation_id, self._cycle)
operation = _operation.invocation_operate(
operation_id, group, method, subscription, timeout, initial_metadata,
payload, completion, self._mate.accept_ticket, termination_action,
self._cycle.pool)
self._cycle.operations[operation_id] = operation
return operation.context, operation.operator
def operation_stats(self):
"""See base.End.operation_stats for specification."""
with self._lock:
return dict(self._stats)
def add_idle_action(self, action):
"""See base.End.add_idle_action for specification."""
with self._lock:
if self._cycle is None:
raise ValueError('Can\'t add idle action to stopped End!')
action_with_exceptions_logged = callable_util.with_exceptions_logged(
action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)
if self._cycle.operations:
self._cycle.idle_actions.append(action_with_exceptions_logged)
else:
self._cycle.pool.submit(action_with_exceptions_logged)
def accept_ticket(self, ticket):
"""See links.Link.accept_ticket for specification."""
with self._lock:
if self._cycle is not None and not self._cycle.grace:
operation = self._cycle.operations.get(ticket.operation_id)
if operation is not None:
operation.handle_ticket(ticket)
elif self._servicer_package is not None:
termination_action = _termination_action(
self._lock, self._stats, ticket.operation_id, self._cycle)
operation = _operation.service_operate(
self._servicer_package, ticket, self._mate.accept_ticket,
termination_action, self._cycle.pool)
if operation is not None:
self._cycle.operations[ticket.operation_id] = operation
def join_link(self, link):
"""See links.Link.join_link for specification."""
with self._lock:
self._mate = utilities.NULL_LINK if link is None else link
def serviceless_end_link():
"""Constructs an End usable only for invoking operations.
Returns:
An End usable for translating operations into ticket exchange.
"""
return _End(None)
def serviceful_end_link(servicer, default_timeout, maximum_timeout):
"""Constructs an End capable of servicing operations.
Args:
servicer: An interfaces.Servicer for servicing operations.
default_timeout: A length of time in seconds to be used as the default
time alloted for a single operation.
maximum_timeout: A length of time in seconds to be used as the maximum
time alloted for a single operation.
Returns:
An End capable of servicing the operations requested of it through ticket
exchange.
"""
return _End(
_utilities.ServicerPackage(servicer, default_timeout, maximum_timeout))

@ -0,0 +1,152 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for operation expiration."""
import time
from grpc.framework.core import _interfaces
from grpc.framework.foundation import later
from grpc.framework.interfaces.base import base
class _ExpirationManager(_interfaces.ExpirationManager):
"""An implementation of _interfaces.ExpirationManager."""
def __init__(
self, commencement, timeout, maximum_timeout, lock, termination_manager,
transmission_manager):
"""Constructor.
Args:
commencement: The time in seconds since the epoch at which the operation
began.
timeout: A length of time in seconds to allow for the operation to run.
maximum_timeout: The maximum length of time in seconds to allow for the
operation to run despite what is requested via this object's
change_timout method.
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
"""
self._lock = lock
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._commencement = commencement
self._maximum_timeout = maximum_timeout
self._timeout = timeout
self._deadline = commencement + timeout
self._index = None
self._future = None
def _expire(self, index):
def expire():
with self._lock:
if self._future is not None and index == self._index:
self._future = None
self._termination_manager.expire()
self._transmission_manager.abort(base.Outcome.EXPIRED)
return expire
def start(self):
self._index = 0
self._future = later.later(self._timeout, self._expire(0))
def change_timeout(self, timeout):
if self._future is not None and timeout != self._timeout:
self._future.cancel()
new_timeout = min(timeout, self._maximum_timeout)
new_index = self._index + 1
self._timeout = new_timeout
self._deadline = self._commencement + new_timeout
self._index = new_index
delay = self._deadline - time.time()
self._future = later.later(delay, self._expire(new_index))
if new_timeout != timeout:
self._transmission_manager.timeout(new_timeout)
def deadline(self):
return self._deadline
def terminate(self):
if self._future:
self._future.cancel()
self._future = None
self._deadline_index = None
def invocation_expiration_manager(
timeout, lock, termination_manager, transmission_manager):
"""Creates an _interfaces.ExpirationManager appropriate for front-side use.
Args:
timeout: A length of time in seconds to allow for the operation to run.
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
Returns:
An _interfaces.ExpirationManager appropriate for invocation-side use.
"""
expiration_manager = _ExpirationManager(
time.time(), timeout, timeout, lock, termination_manager,
transmission_manager)
expiration_manager.start()
return expiration_manager
def service_expiration_manager(
timeout, default_timeout, maximum_timeout, lock, termination_manager,
transmission_manager):
"""Creates an _interfaces.ExpirationManager appropriate for back-side use.
Args:
timeout: A length of time in seconds to allow for the operation to run. May
be None in which case default_timeout will be used.
default_timeout: The default length of time in seconds to allow for the
operation to run if the front-side customer has not specified such a value
(or if the value they specified is not yet known).
maximum_timeout: The maximum length of time in seconds to allow for the
operation to run.
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
Returns:
An _interfaces.ExpirationManager appropriate for service-side use.
"""
expiration_manager = _ExpirationManager(
time.time(), default_timeout if timeout is None else timeout,
maximum_timeout, lock, termination_manager, transmission_manager)
expiration_manager.start()
return expiration_manager

@ -0,0 +1,410 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for ingestion during an operation."""
import abc
import collections
from grpc.framework.core import _constants
from grpc.framework.core import _interfaces
from grpc.framework.foundation import abandonment
from grpc.framework.foundation import callable_util
from grpc.framework.interfaces.base import base
_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
class _SubscriptionCreation(collections.namedtuple(
'_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))):
"""A sum type for the outcome of ingestion initialization.
Either subscription will be non-None, remote_error will be True, or abandoned
will be True.
Attributes:
subscription: A base.Subscription describing the customer's interest in
operation values from the other side.
remote_error: A boolean indicating that the subscription could not be
created due to an error on the remote side of the operation.
abandoned: A boolean indicating that subscription creation was abandoned.
"""
class _SubscriptionCreator(object):
"""Common specification of subscription-creating behavior."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def create(self, group, method):
"""Creates the base.Subscription of the local customer.
Any exceptions raised by this method should be attributed to and treated as
defects in the customer code called by this method.
Args:
group: The group identifier of the operation.
method: The method identifier of the operation.
Returns:
A _SubscriptionCreation describing the result of subscription creation.
"""
raise NotImplementedError()
class _ServiceSubscriptionCreator(_SubscriptionCreator):
"""A _SubscriptionCreator appropriate for service-side use."""
def __init__(self, servicer, operation_context, output_operator):
"""Constructor.
Args:
servicer: The base.Servicer that will service the operation.
operation_context: A base.OperationContext for the operation to be passed
to the customer.
output_operator: A base.Operator for the operation to be passed to the
customer and to be called by the customer to accept operation data
emitted by the customer.
"""
self._servicer = servicer
self._operation_context = operation_context
self._output_operator = output_operator
def create(self, group, method):
try:
subscription = self._servicer.service(
group, method, self._operation_context, self._output_operator)
except base.NoSuchMethodError:
return _SubscriptionCreation(None, True, False)
except abandonment.Abandoned:
return _SubscriptionCreation(None, False, True)
else:
return _SubscriptionCreation(subscription, False, False)
def _wrap(behavior):
def wrapped(*args, **kwargs):
try:
behavior(*args, **kwargs)
except abandonment.Abandoned:
return False
else:
return True
return wrapped
class _IngestionManager(_interfaces.IngestionManager):
"""An implementation of _interfaces.IngestionManager."""
def __init__(
self, lock, pool, subscription, subscription_creator, termination_manager,
transmission_manager, expiration_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
subscription: A base.Subscription describing the customer's interest in
operation values from the other side. May be None if
subscription_creator is not None.
subscription_creator: A _SubscriptionCreator wrapping the portion of
customer code that when called returns the base.Subscription describing
the customer's interest in operation values from the other side. May be
None if subscription is not None.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
"""
self._lock = lock
self._pool = pool
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
if subscription is None:
self._subscription_creator = subscription_creator
self._wrapped_operator = None
elif subscription.kind is base.Subscription.Kind.FULL:
self._subscription_creator = None
self._wrapped_operator = _wrap(subscription.operator.advance)
else:
# TODO(nathaniel): Support other subscriptions.
raise ValueError('Unsupported subscription "%s"!' % subscription.kind)
self._pending_initial_metadata = None
self._pending_payloads = []
self._pending_completion = None
self._local_allowance = 1
# A nonnegative integer or None, with None indicating that the local
# customer is done emitting anyway so there's no need to bother it by
# informing it that the remote customer has granted it further permission to
# emit.
self._remote_allowance = 0
self._processing = False
def _abort_internal_only(self):
self._subscription_creator = None
self._wrapped_operator = None
self._pending_initial_metadata = None
self._pending_payloads = None
self._pending_completion = None
def _abort_and_notify(self, outcome):
self._abort_internal_only()
self._termination_manager.abort(outcome)
self._transmission_manager.abort(outcome)
self._expiration_manager.terminate()
def _operator_next(self):
"""Computes the next step for full-subscription ingestion.
Returns:
An initial_metadata, payload, completion, allowance, continue quintet
indicating what operation values (if any) are available to pass into
customer code and whether or not there is anything immediately
actionable to call customer code to do.
"""
if self._wrapped_operator is None:
return None, None, None, None, False
else:
initial_metadata, payload, completion, allowance, action = [None] * 5
if self._pending_initial_metadata is not None:
initial_metadata = self._pending_initial_metadata
self._pending_initial_metadata = None
action = True
if self._pending_payloads and 0 < self._local_allowance:
payload = self._pending_payloads.pop(0)
self._local_allowance -= 1
action = True
if not self._pending_payloads and self._pending_completion is not None:
completion = self._pending_completion
self._pending_completion = None
action = True
if self._remote_allowance is not None and 0 < self._remote_allowance:
allowance = self._remote_allowance
self._remote_allowance = 0
action = True
return initial_metadata, payload, completion, allowance, bool(action)
def _operator_process(
self, wrapped_operator, initial_metadata, payload,
completion, allowance):
while True:
advance_outcome = callable_util.call_logging_exceptions(
wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE,
initial_metadata=initial_metadata, payload=payload,
completion=completion, allowance=allowance)
if advance_outcome.exception is None:
if advance_outcome.return_value:
with self._lock:
if self._termination_manager.outcome is not None:
return
if completion is not None:
self._termination_manager.ingestion_complete()
initial_metadata, payload, completion, allowance, moar = (
self._operator_next())
if not moar:
self._processing = False
return
else:
with self._lock:
if self._termination_manager.outcome is None:
self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
return
else:
with self._lock:
if self._termination_manager.outcome is None:
self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
return
def _operator_post_create(self, subscription):
wrapped_operator = _wrap(subscription.operator.advance)
with self._lock:
if self._termination_manager.outcome is not None:
return
self._wrapped_operator = wrapped_operator
self._subscription_creator = None
metadata, payload, completion, allowance, moar = self._operator_next()
if not moar:
self._processing = False
return
self._operator_process(
wrapped_operator, metadata, payload, completion, allowance)
def _create(self, subscription_creator, group, name):
outcome = callable_util.call_logging_exceptions(
subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE,
group, name)
if outcome.return_value is None:
with self._lock:
if self._termination_manager.outcome is None:
self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
elif outcome.return_value.abandoned:
with self._lock:
if self._termination_manager.outcome is None:
self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
elif outcome.return_value.remote_error:
with self._lock:
if self._termination_manager.outcome is None:
self._abort_and_notify(base.Outcome.REMOTE_FAILURE)
elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
self._operator_post_create(outcome.return_value.subscription)
else:
# TODO(nathaniel): Support other subscriptions.
raise ValueError(
'Unsupported "%s"!' % outcome.return_value.subscription.kind)
def _store_advance(self, initial_metadata, payload, completion, allowance):
if initial_metadata is not None:
self._pending_initial_metadata = initial_metadata
if payload is not None:
self._pending_payloads.append(payload)
if completion is not None:
self._pending_completion = completion
if allowance is not None and self._remote_allowance is not None:
self._remote_allowance += allowance
def _operator_advance(self, initial_metadata, payload, completion, allowance):
if self._processing:
self._store_advance(initial_metadata, payload, completion, allowance)
else:
action = False
if initial_metadata is not None:
action = True
if payload is not None:
if 0 < self._local_allowance:
self._local_allowance -= 1
action = True
else:
self._pending_payloads.append(payload)
payload = False
if completion is not None:
if self._pending_payloads:
self._pending_completion = completion
else:
action = True
if allowance is not None and self._remote_allowance is not None:
allowance += self._remote_allowance
self._remote_allowance = 0
action = True
if action:
self._pool.submit(
callable_util.with_exceptions_logged(
self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
self._wrapped_operator, initial_metadata, payload, completion,
allowance)
def set_group_and_method(self, group, method):
"""See _interfaces.IngestionManager.set_group_and_method for spec."""
if self._subscription_creator is not None and not self._processing:
self._pool.submit(
callable_util.with_exceptions_logged(
self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE),
self._subscription_creator, group, method)
self._processing = True
def add_local_allowance(self, allowance):
"""See _interfaces.IngestionManager.add_local_allowance for spec."""
if any((self._subscription_creator, self._wrapped_operator,)):
self._local_allowance += allowance
if not self._processing:
initial_metadata, payload, completion, allowance, moar = (
self._operator_next())
if moar:
self._pool.submit(
callable_util.with_exceptions_logged(
self._operator_process,
_constants.INTERNAL_ERROR_LOG_MESSAGE),
initial_metadata, payload, completion, allowance)
def local_emissions_done(self):
self._remote_allowance = None
def advance(self, initial_metadata, payload, completion, allowance):
"""See _interfaces.IngestionManager.advance for specification."""
if self._subscription_creator is not None:
self._store_advance(initial_metadata, payload, completion, allowance)
elif self._wrapped_operator is not None:
self._operator_advance(initial_metadata, payload, completion, allowance)
def invocation_ingestion_manager(
subscription, lock, pool, termination_manager, transmission_manager,
expiration_manager):
"""Creates an IngestionManager appropriate for invocation-side use.
Args:
subscription: A base.Subscription indicating the customer's interest in the
data and results from the service-side of the operation.
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
Returns:
An IngestionManager appropriate for invocation-side use.
"""
return _IngestionManager(
lock, pool, subscription, None, termination_manager, transmission_manager,
expiration_manager)
def service_ingestion_manager(
servicer, operation_context, output_operator, lock, pool,
termination_manager, transmission_manager, expiration_manager):
"""Creates an IngestionManager appropriate for service-side use.
The returned IngestionManager will require its set_group_and_name method to be
called before its advance method may be called.
Args:
servicer: A base.Servicer for servicing the operation.
operation_context: A base.OperationContext for the operation to be passed to
the customer.
output_operator: A base.Operator for the operation to be passed to the
customer and to be called by the customer to accept operation data output
by the customer.
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
Returns:
An IngestionManager appropriate for service-side use.
"""
subscription_creator = _ServiceSubscriptionCreator(
servicer, operation_context, output_operator)
return _IngestionManager(
lock, pool, None, subscription_creator, termination_manager,
transmission_manager, expiration_manager)

@ -0,0 +1,308 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Package-internal interfaces."""
import abc
from grpc.framework.interfaces.base import base
class TerminationManager(object):
"""An object responsible for handling the termination of an operation.
Attributes:
outcome: None if the operation is active or a base.Outcome value if it has
terminated.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def add_callback(self, callback):
"""Registers a callback to be called on operation termination.
If the operation has already terminated the callback will not be called.
Args:
callback: A callable that will be passed an interfaces.Outcome value.
Returns:
None if the operation has not yet terminated and the passed callback will
be called when it does, or a base.Outcome value describing the operation
termination if the operation has terminated and the callback will not be
called as a result of this method call.
"""
raise NotImplementedError()
@abc.abstractmethod
def emission_complete(self):
"""Indicates that emissions from customer code have completed."""
raise NotImplementedError()
@abc.abstractmethod
def transmission_complete(self):
"""Indicates that transmissions to the remote end are complete.
Returns:
True if the operation has terminated or False if the operation remains
ongoing.
"""
raise NotImplementedError()
@abc.abstractmethod
def reception_complete(self):
"""Indicates that reception from the other side is complete."""
raise NotImplementedError()
@abc.abstractmethod
def ingestion_complete(self):
"""Indicates that customer code ingestion of received values is complete."""
raise NotImplementedError()
@abc.abstractmethod
def expire(self):
"""Indicates that the operation must abort because it has taken too long."""
raise NotImplementedError()
@abc.abstractmethod
def abort(self, outcome):
"""Indicates that the operation must abort for the indicated reason.
Args:
outcome: An interfaces.Outcome indicating operation abortion.
"""
raise NotImplementedError()
class TransmissionManager(object):
"""A manager responsible for transmitting to the other end of an operation."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def kick_off(
self, group, method, timeout, initial_metadata, payload, completion,
allowance):
"""Transmits the values associated with operation invocation."""
raise NotImplementedError()
@abc.abstractmethod
def advance(self, initial_metadata, payload, completion, allowance):
"""Accepts values for transmission to the other end of the operation.
Args:
initial_metadata: An initial metadata value to be transmitted to the other
side of the operation. May only ever be non-None once.
payload: A payload value.
completion: A base.Completion value. May only ever be non-None in the last
transmission to be made to the other side.
allowance: A positive integer communicating the number of additional
payloads allowed to be transmitted from the other side to this side of
the operation, or None if no additional allowance is being granted in
this call.
"""
raise NotImplementedError()
@abc.abstractmethod
def timeout(self, timeout):
"""Accepts for transmission to the other side a new timeout value.
Args:
timeout: A positive float used as the new timeout value for the operation
to be transmitted to the other side.
"""
raise NotImplementedError()
@abc.abstractmethod
def allowance(self, allowance):
"""Indicates to this manager that the remote customer is allowing payloads.
Args:
allowance: A positive integer indicating the number of additional payloads
the remote customer is allowing to be transmitted from this side of the
operation.
"""
raise NotImplementedError()
@abc.abstractmethod
def remote_complete(self):
"""Indicates to this manager that data from the remote side is complete."""
raise NotImplementedError()
@abc.abstractmethod
def abort(self, outcome):
"""Indicates that the operation has aborted.
Args:
outcome: An interfaces.Outcome for the operation. If None, indicates that
the operation abortion should not be communicated to the other side of
the operation.
"""
raise NotImplementedError()
class ExpirationManager(object):
"""A manager responsible for aborting the operation if it runs out of time."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def change_timeout(self, timeout):
"""Changes the timeout allotted for the operation.
Operation duration is always measure from the beginning of the operation;
calling this method changes the operation's allotted time to timeout total
seconds, not timeout seconds from the time of this method call.
Args:
timeout: A length of time in seconds to allow for the operation.
"""
raise NotImplementedError()
@abc.abstractmethod
def deadline(self):
"""Returns the time until which the operation is allowed to run.
Returns:
The time (seconds since the epoch) at which the operation will expire.
"""
raise NotImplementedError()
@abc.abstractmethod
def terminate(self):
"""Indicates to this manager that the operation has terminated."""
raise NotImplementedError()
class EmissionManager(base.Operator):
"""A manager of values emitted by customer code."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def advance(
self, initial_metadata=None, payload=None, completion=None,
allowance=None):
"""Accepts a value emitted by customer code.
This method should only be called by customer code.
Args:
initial_metadata: An initial metadata value emitted by the local customer
to be sent to the other side of the operation.
payload: A payload value emitted by the local customer to be sent to the
other side of the operation.
completion: A Completion value emitted by the local customer to be sent to
the other side of the operation.
allowance: A positive integer indicating an additional number of payloads
that the local customer is willing to accept from the other side of the
operation.
"""
raise NotImplementedError()
class IngestionManager(object):
"""A manager responsible for executing customer code.
This name of this manager comes from its responsibility to pass successive
values from the other side of the operation into the code of the local
customer.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def set_group_and_method(self, group, method):
"""Communicates to this IngestionManager the operation group and method.
Args:
group: The group identifier of the operation.
method: The method identifier of the operation.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_local_allowance(self, allowance):
"""Communicates to this IngestionManager that more payloads may be ingested.
Args:
allowance: A positive integer indicating an additional number of payloads
that the local customer is willing to ingest.
"""
raise NotImplementedError()
@abc.abstractmethod
def local_emissions_done(self):
"""Indicates to this manager that local emissions are done."""
raise NotImplementedError()
@abc.abstractmethod
def advance(self, initial_metadata, payload, completion, allowance):
"""Advances the operation by passing values to the local customer."""
raise NotImplementedError()
class ReceptionManager(object):
"""A manager responsible for receiving tickets from the other end."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def receive_ticket(self, ticket):
"""Handle a ticket from the other side of the operation.
Args:
ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket
appropriate to this end of the operation and this object.
"""
raise NotImplementedError()
class Operation(object):
"""An ongoing operation.
Attributes:
context: A base.OperationContext object for the operation.
operator: A base.Operator object for the operation for use by the customer
of the operation.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def handle_ticket(self, ticket):
"""Handle a ticket from the other side of the operation.
Args:
ticket: A links.Ticket from the other side of the operation.
"""
raise NotImplementedError()
@abc.abstractmethod
def abort(self, outcome):
"""Aborts the operation.
Args:
outcome: A base.Outcome value indicating operation abortion.
"""
raise NotImplementedError()

@ -0,0 +1,192 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Implementation of operations."""
import threading
# _utilities is referenced from specification in this module.
from grpc.framework.core import _context
from grpc.framework.core import _emission
from grpc.framework.core import _expiration
from grpc.framework.core import _ingestion
from grpc.framework.core import _interfaces
from grpc.framework.core import _reception
from grpc.framework.core import _termination
from grpc.framework.core import _transmission
from grpc.framework.core import _utilities # pylint: disable=unused-import
class _EasyOperation(_interfaces.Operation):
"""A trivial implementation of interfaces.Operation."""
def __init__(
self, lock, termination_manager, transmission_manager, expiration_manager,
context, operator, reception_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
context: A base.OperationContext for use by the customer during the
operation.
operator: A base.Operator for use by the customer during the operation.
reception_manager: The _interfaces.ReceptionManager for the operation.
"""
self._lock = lock
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
self._reception_manager = reception_manager
self.context = context
self.operator = operator
def handle_ticket(self, ticket):
with self._lock:
self._reception_manager.receive_ticket(ticket)
def abort(self, outcome):
with self._lock:
if self._termination_manager.outcome is None:
self._termination_manager.abort(outcome)
self._transmission_manager.abort(outcome)
self._expiration_manager.terminate()
def invocation_operate(
operation_id, group, method, subscription, timeout, initial_metadata,
payload, completion, ticket_sink, termination_action, pool):
"""Constructs objects necessary for front-side operation management.
Args:
operation_id: An object identifying the operation.
group: The group identifier of the operation.
method: The method identifier of the operation.
subscription: A base.Subscription describing the customer's interest in the
results of the operation.
timeout: A length of time in seconds to allow for the operation.
initial_metadata: An initial metadata value to be sent to the other side of
the operation. May be None if the initial metadata will be passed later or
if there will be no initial metadata passed at all.
payload: The first payload value to be transmitted to the other side. May be
None if there is no such value or if the customer chose not to pass it at
operation invocation.
completion: A base.Completion value indicating the end of values passed to
the other side of the operation.
ticket_sink: A callable that accepts links.Tickets and delivers them to the
other side of the operation.
termination_action: A callable that accepts the outcome of the operation as
a base.Outcome value to be called on operation completion.
pool: A thread pool with which to do the work of the operation.
Returns:
An _interfaces.Operation for the operation.
"""
lock = threading.Lock()
with lock:
termination_manager = _termination.invocation_termination_manager(
termination_action, pool)
transmission_manager = _transmission.TransmissionManager(
operation_id, ticket_sink, lock, pool, termination_manager)
expiration_manager = _expiration.invocation_expiration_manager(
timeout, lock, termination_manager, transmission_manager)
operation_context = _context.OperationContext(
lock, termination_manager, transmission_manager, expiration_manager)
emission_manager = _emission.EmissionManager(
lock, termination_manager, transmission_manager, expiration_manager)
ingestion_manager = _ingestion.invocation_ingestion_manager(
subscription, lock, pool, termination_manager, transmission_manager,
expiration_manager)
reception_manager = _reception.ReceptionManager(
termination_manager, transmission_manager, expiration_manager,
ingestion_manager)
termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_expiration_manager(expiration_manager)
emission_manager.set_ingestion_manager(ingestion_manager)
transmission_manager.kick_off(
group, method, timeout, initial_metadata, payload, completion, None)
return _EasyOperation(
lock, termination_manager, transmission_manager, expiration_manager,
operation_context, emission_manager, reception_manager)
def service_operate(
servicer_package, ticket, ticket_sink, termination_action, pool):
"""Constructs an Operation for service of an operation.
Args:
servicer_package: A _utilities.ServicerPackage to be used servicing the
operation.
ticket: The first links.Ticket received for the operation.
ticket_sink: A callable that accepts links.Tickets and delivers them to the
other side of the operation.
termination_action: A callable that accepts the outcome of the operation as
a base.Outcome value to be called on operation completion.
pool: A thread pool with which to do the work of the operation.
Returns:
An _interfaces.Operation for the operation.
"""
lock = threading.Lock()
with lock:
termination_manager = _termination.service_termination_manager(
termination_action, pool)
transmission_manager = _transmission.TransmissionManager(
ticket.operation_id, ticket_sink, lock, pool, termination_manager)
expiration_manager = _expiration.service_expiration_manager(
ticket.timeout, servicer_package.default_timeout,
servicer_package.maximum_timeout, lock, termination_manager,
transmission_manager)
operation_context = _context.OperationContext(
lock, termination_manager, transmission_manager, expiration_manager)
emission_manager = _emission.EmissionManager(
lock, termination_manager, transmission_manager, expiration_manager)
ingestion_manager = _ingestion.service_ingestion_manager(
servicer_package.servicer, operation_context, emission_manager, lock,
pool, termination_manager, transmission_manager, expiration_manager)
reception_manager = _reception.ReceptionManager(
termination_manager, transmission_manager, expiration_manager,
ingestion_manager)
termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_expiration_manager(expiration_manager)
emission_manager.set_ingestion_manager(ingestion_manager)
reception_manager.receive_ticket(ticket)
return _EasyOperation(
lock, termination_manager, transmission_manager, expiration_manager,
operation_context, emission_manager, reception_manager)

@ -0,0 +1,137 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for ticket reception."""
from grpc.framework.core import _interfaces
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.base import utilities
from grpc.framework.interfaces.links import links
_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = {
links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED,
links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED,
links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN,
links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE,
links.Ticket.Termination.TRANSMISSION_FAILURE:
base.Outcome.TRANSMISSION_FAILURE,
links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE,
}
class ReceptionManager(_interfaces.ReceptionManager):
"""A ReceptionManager based around a _Receiver passed to it."""
def __init__(
self, termination_manager, transmission_manager, expiration_manager,
ingestion_manager):
"""Constructor.
Args:
termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager.
expiration_manager: The operation's _interfaces.ExpirationManager.
ingestion_manager: The operation's _interfaces.IngestionManager.
"""
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
self._ingestion_manager = ingestion_manager
self._lowest_unseen_sequence_number = 0
self._out_of_sequence_tickets = {}
self._aborted = False
def _abort(self, outcome):
self._aborted = True
self._termination_manager.abort(outcome)
self._transmission_manager.abort(outcome)
self._expiration_manager.terminate()
def _sequence_failure(self, ticket):
"""Determines a just-arrived ticket's sequential legitimacy.
Args:
ticket: A just-arrived ticket.
Returns:
True if the ticket is sequentially legitimate; False otherwise.
"""
if ticket.sequence_number < self._lowest_unseen_sequence_number:
return True
elif ticket.sequence_number in self._out_of_sequence_tickets:
return True
else:
return False
def _process_one(self, ticket):
if ticket.sequence_number == 0:
self._ingestion_manager.set_group_and_method(ticket.group, ticket.method)
if ticket.timeout is not None:
self._expiration_manager.change_timeout(ticket.timeout)
if ticket.termination is None:
completion = None
else:
completion = utilities.completion(
ticket.terminal_metadata, ticket.code, ticket.message)
self._ingestion_manager.advance(
ticket.initial_metadata, ticket.payload, completion, ticket.allowance)
if ticket.allowance is not None:
self._transmission_manager.allowance(ticket.allowance)
def _process(self, ticket):
"""Process those tickets ready to be processed.
Args:
ticket: A just-arrived ticket the sequence number of which matches this
_ReceptionManager's _lowest_unseen_sequence_number field.
"""
while True:
self._process_one(ticket)
next_ticket = self._out_of_sequence_tickets.pop(
ticket.sequence_number + 1, None)
if next_ticket is None:
self._lowest_unseen_sequence_number = ticket.sequence_number + 1
return
else:
ticket = next_ticket
def receive_ticket(self, ticket):
"""See _interfaces.ReceptionManager.receive_ticket for specification."""
if self._aborted:
return
elif self._sequence_failure(ticket):
self._abort(base.Outcome.RECEPTION_FAILURE)
elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION):
outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination]
self._abort(outcome)
elif ticket.sequence_number == self._lowest_unseen_sequence_number:
self._process(ticket)
else:
self._out_of_sequence_tickets[ticket.sequence_number] = ticket

@ -0,0 +1,212 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for operation termination."""
import abc
from grpc.framework.core import _constants
from grpc.framework.core import _interfaces
from grpc.framework.foundation import callable_util
from grpc.framework.interfaces.base import base
def _invocation_completion_predicate(
unused_emission_complete, unused_transmission_complete,
unused_reception_complete, ingestion_complete):
return ingestion_complete
def _service_completion_predicate(
unused_emission_complete, transmission_complete, unused_reception_complete,
unused_ingestion_complete):
return transmission_complete
class TerminationManager(_interfaces.TerminationManager):
"""A _interfaces.TransmissionManager on which another manager may be set."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def set_expiration_manager(self, expiration_manager):
"""Sets the expiration manager with which this manager will interact.
Args:
expiration_manager: The _interfaces.ExpirationManager associated with the
current operation.
"""
raise NotImplementedError()
class _TerminationManager(TerminationManager):
"""An implementation of TerminationManager."""
def __init__(self, predicate, action, pool):
"""Constructor.
Args:
predicate: One of _invocation_completion_predicate or
_service_completion_predicate to be used to determine when the operation
has completed.
action: A behavior to pass the operation outcome on operation termination.
pool: A thread pool.
"""
self._predicate = predicate
self._action = action
self._pool = pool
self._expiration_manager = None
self.outcome = None
self._callbacks = []
self._emission_complete = False
self._transmission_complete = False
self._reception_complete = False
self._ingestion_complete = False
def set_expiration_manager(self, expiration_manager):
self._expiration_manager = expiration_manager
def _terminate_internal_only(self, outcome):
"""Terminates the operation.
Args:
outcome: A base.Outcome describing the outcome of the operation.
"""
self.outcome = outcome
callbacks = list(self._callbacks)
self._callbacks = None
act = callable_util.with_exceptions_logged(
self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
if outcome is base.Outcome.LOCAL_FAILURE:
self._pool.submit(act, outcome)
else:
def call_callbacks_and_act(callbacks, outcome):
for callback in callbacks:
callback_outcome = callable_util.call_logging_exceptions(
callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE,
outcome)
if callback_outcome.exception is not None:
outcome = base.Outcome.LOCAL_FAILURE
break
act(outcome)
self._pool.submit(
callable_util.with_exceptions_logged(
call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE),
callbacks, outcome)
def _terminate_and_notify(self, outcome):
self._terminate_internal_only(outcome)
self._expiration_manager.terminate()
def _perhaps_complete(self):
if self._predicate(
self._emission_complete, self._transmission_complete,
self._reception_complete, self._ingestion_complete):
self._terminate_and_notify(base.Outcome.COMPLETED)
return True
else:
return False
def is_active(self):
"""See _interfaces.TerminationManager.is_active for specification."""
return self.outcome is None
def add_callback(self, callback):
"""See _interfaces.TerminationManager.add_callback for specification."""
if self.outcome is None:
self._callbacks.append(callback)
return None
else:
return self.outcome
def emission_complete(self):
"""See superclass method for specification."""
if self.outcome is None:
self._emission_complete = True
self._perhaps_complete()
def transmission_complete(self):
"""See superclass method for specification."""
if self.outcome is None:
self._transmission_complete = True
return self._perhaps_complete()
else:
return False
def reception_complete(self):
"""See superclass method for specification."""
if self.outcome is None:
self._reception_complete = True
self._perhaps_complete()
def ingestion_complete(self):
"""See superclass method for specification."""
if self.outcome is None:
self._ingestion_complete = True
self._perhaps_complete()
def expire(self):
"""See _interfaces.TerminationManager.expire for specification."""
self._terminate_internal_only(base.Outcome.EXPIRED)
def abort(self, outcome):
"""See _interfaces.TerminationManager.abort for specification."""
self._terminate_and_notify(outcome)
def invocation_termination_manager(action, pool):
"""Creates a TerminationManager appropriate for invocation-side use.
Args:
action: An action to call on operation termination.
pool: A thread pool in which to execute the passed action and any
termination callbacks that are registered during the operation.
Returns:
A TerminationManager appropriate for invocation-side use.
"""
return _TerminationManager(_invocation_completion_predicate, action, pool)
def service_termination_manager(action, pool):
"""Creates a TerminationManager appropriate for service-side use.
Args:
action: An action to call on operation termination.
pool: A thread pool in which to execute the passed action and any
termination callbacks that are registered during the operation.
Returns:
A TerminationManager appropriate for service-side use.
"""
return _TerminationManager(_service_completion_predicate, action, pool)

@ -0,0 +1,294 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for ticket transmission during an operation."""
from grpc.framework.core import _constants
from grpc.framework.core import _interfaces
from grpc.framework.foundation import callable_util
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.links import links
_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
def _explode_completion(completion):
if completion is None:
return None, None, None, None
else:
return (
completion.terminal_metadata, completion.code, completion.message,
links.Ticket.Termination.COMPLETION)
class TransmissionManager(_interfaces.TransmissionManager):
"""An _interfaces.TransmissionManager that sends links.Tickets."""
def __init__(
self, operation_id, ticket_sink, lock, pool, termination_manager):
"""Constructor.
Args:
operation_id: The operation's ID.
ticket_sink: A callable that accepts tickets and sends them to the other
side of the operation.
lock: The operation-servicing-wide lock object.
pool: A thread pool in which the work of transmitting tickets will be
performed.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
"""
self._lock = lock
self._pool = pool
self._ticket_sink = ticket_sink
self._operation_id = operation_id
self._termination_manager = termination_manager
self._expiration_manager = None
self._lowest_unused_sequence_number = 0
self._remote_allowance = 1
self._remote_complete = False
self._timeout = None
self._local_allowance = 0
self._initial_metadata = None
self._payloads = []
self._completion = None
self._aborted = False
self._abortion_outcome = None
self._transmitting = False
def set_expiration_manager(self, expiration_manager):
"""Sets the ExpirationManager with which this manager will cooperate."""
self._expiration_manager = expiration_manager
def _next_ticket(self):
"""Creates the next ticket to be transmitted.
Returns:
A links.Ticket to be sent to the other side of the operation or None if
there is nothing to be sent at this time.
"""
if self._aborted:
if self._abortion_outcome is None:
return None
else:
termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
self._abortion_outcome]
if termination is None:
return None
else:
self._abortion_outcome = None
return links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None,
None, None, None, None, None, None, None, None, None,
termination)
action = False
# TODO(nathaniel): Support other subscriptions.
local_subscription = links.Ticket.Subscription.FULL
timeout = self._timeout
if timeout is not None:
self._timeout = None
action = True
if self._local_allowance <= 0:
allowance = None
else:
allowance = self._local_allowance
self._local_allowance = 0
action = True
initial_metadata = self._initial_metadata
if initial_metadata is not None:
self._initial_metadata = None
action = True
if not self._payloads or self._remote_allowance <= 0:
payload = None
else:
payload = self._payloads.pop(0)
self._remote_allowance -= 1
action = True
if self._completion is None or self._payloads:
terminal_metadata, code, message, termination = None, None, None, None
else:
terminal_metadata, code, message, termination = _explode_completion(
self._completion)
self._completion = None
action = True
if action:
ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, None,
local_subscription, timeout, allowance, initial_metadata, payload,
terminal_metadata, code, message, termination)
self._lowest_unused_sequence_number += 1
return ticket
else:
return None
def _transmit(self, ticket):
"""Commences the transmission loop sending tickets.
Args:
ticket: A links.Ticket to be sent to the other side of the operation.
"""
def transmit(ticket):
while True:
transmission_outcome = callable_util.call_logging_exceptions(
self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
if transmission_outcome.exception is None:
with self._lock:
if ticket.termination is links.Ticket.Termination.COMPLETION:
self._termination_manager.transmission_complete()
ticket = self._next_ticket()
if ticket is None:
self._transmitting = False
return
else:
with self._lock:
if self._termination_manager.outcome is None:
self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE)
self._expiration_manager.terminate()
return
self._pool.submit(callable_util.with_exceptions_logged(
transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
self._transmitting = True
def kick_off(
self, group, method, timeout, initial_metadata, payload, completion,
allowance):
"""See _interfaces.TransmissionManager.kickoff for specification."""
# TODO(nathaniel): Support other subscriptions.
subscription = links.Ticket.Subscription.FULL
terminal_metadata, code, message, termination = _explode_completion(
completion)
self._remote_allowance = 1 if payload is None else 0
ticket = links.Ticket(
self._operation_id, 0, group, method, subscription, timeout, allowance,
initial_metadata, payload, terminal_metadata, code, message,
termination)
self._lowest_unused_sequence_number = 1
self._transmit(ticket)
def advance(self, initial_metadata, payload, completion, allowance):
"""See _interfaces.TransmissionManager.advance for specification."""
effective_initial_metadata = initial_metadata
effective_payload = payload
effective_completion = completion
if allowance is not None and not self._remote_complete:
effective_allowance = allowance
else:
effective_allowance = None
if self._transmitting:
if effective_initial_metadata is not None:
self._initial_metadata = effective_initial_metadata
if effective_payload is not None:
self._payloads.append(effective_payload)
if effective_completion is not None:
self._completion = effective_completion
if effective_allowance is not None:
self._local_allowance += effective_allowance
else:
if effective_payload is not None:
if 0 < self._remote_allowance:
ticket_payload = effective_payload
self._remote_allowance -= 1
else:
self._payloads.append(effective_payload)
ticket_payload = None
else:
ticket_payload = None
if effective_completion is not None and not self._payloads:
ticket_completion = effective_completion
else:
self._completion = effective_completion
ticket_completion = None
if any(
(effective_initial_metadata, ticket_payload, ticket_completion,
effective_allowance)):
terminal_metadata, code, message, termination = _explode_completion(
completion)
ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, None,
None, None, allowance, effective_initial_metadata, ticket_payload,
terminal_metadata, code, message, termination)
self._lowest_unused_sequence_number += 1
self._transmit(ticket)
def timeout(self, timeout):
"""See _interfaces.TransmissionManager.timeout for specification."""
if self._transmitting:
self._timeout = timeout
else:
ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, None,
None, timeout, None, None, None, None, None, None, None)
self._lowest_unused_sequence_number += 1
self._transmit(ticket)
def allowance(self, allowance):
"""See _interfaces.TransmissionManager.allowance for specification."""
if self._transmitting or not self._payloads:
self._remote_allowance += allowance
else:
self._remote_allowance += allowance - 1
payload = self._payloads.pop(0)
if self._payloads:
completion = None
else:
completion = self._completion
self._completion = None
terminal_metadata, code, message, termination = _explode_completion(
completion)
ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, None,
None, None, None, None, payload, terminal_metadata, code, message,
termination)
self._lowest_unused_sequence_number += 1
self._transmit(ticket)
def remote_complete(self):
"""See _interfaces.TransmissionManager.remote_complete for specification."""
self._remote_complete = True
self._local_allowance = 0
def abort(self, outcome):
"""See _interfaces.TransmissionManager.abort for specification."""
if self._transmitting:
self._aborted, self._abortion_outcome = True, outcome
else:
self._aborted = True
if outcome is not None:
termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
outcome]
if termination is not None:
ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None,
None, None, None, None, None, None, None, None, None,
termination)
self._transmit(ticket)

@ -0,0 +1,46 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Package-internal utilities."""
import collections
class ServicerPackage(
collections.namedtuple(
'ServicerPackage', ('servicer', 'default_timeout', 'maximum_timeout'))):
"""A trivial bundle class.
Attributes:
servicer: A base.Servicer.
default_timeout: A float indicating the length of time in seconds to allow
for an operation invoked without a timeout.
maximum_timeout: A float indicating the maximum length of time in seconds to
allow for an operation.
"""

@ -0,0 +1,62 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into the ticket-exchange-based base layer implementation."""
# base and links are referenced from specification in this module.
from grpc.framework.core import _end
from grpc.framework.interfaces.base import base # pylint: disable=unused-import
from grpc.framework.interfaces.links import links # pylint: disable=unused-import
def invocation_end_link():
"""Creates a base.End-links.Link suitable for operation invocation.
Returns:
An object that is both a base.End and a links.Link, that supports operation
invocation, and that translates operation invocation into ticket exchange.
"""
return _end.serviceless_end_link()
def service_end_link(servicer, default_timeout, maximum_timeout):
"""Creates a base.End-links.Link suitable for operation service.
Args:
servicer: A base.Servicer for servicing operations.
default_timeout: A length of time in seconds to be used as the default
time alloted for a single operation.
maximum_timeout: A length of time in seconds to be used as the maximum
time alloted for a single operation.
Returns:
An object that is both a base.End and a links.Link and that services
operations that arrive at it through ticket exchange.
"""
return _end.serviceful_end_link(servicer, default_timeout, maximum_timeout)

@ -27,10 +27,20 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""The base interface of RPC Framework."""
"""The base interface of RPC Framework.
Implementations of this interface support the conduct of "operations":
exchanges between two distinct ends of an arbitrary number of data payloads
and metadata such as a name for the operation, initial and terminal metadata
in each direction, and flow control. These operations may be used for transfers
of data, remote procedure calls, status indication, or anything else
applications choose.
"""
# threading is referenced from specification in this module.
import abc
import enum
import threading
# abandonment is referenced from specification in this module.
from grpc.framework.foundation import abandonment # pylint: disable=unused-import
@ -208,19 +218,26 @@ class End(object):
raise NotImplementedError()
@abc.abstractmethod
def stop_gracefully(self):
"""Gracefully stops this object's service of operations.
def stop(self, grace):
"""Stops this object's service of operations.
Operations in progress will be allowed to complete, and this method blocks
until all of them have.
"""
raise NotImplementedError()
This object will refuse service of new operations as soon as this method is
called but operations under way at the time of the call may be given a
grace period during which they are allowed to finish.
@abc.abstractmethod
def stop_immediately(self):
"""Immediately stops this object's service of operations.
Args:
grace: A duration of time in seconds to allow ongoing operations to
terminate before being forcefully terminated by the stopping of this
End. May be zero to terminate all ongoing operations and immediately
stop.
Operations in progress will not be allowed to complete.
Returns:
A threading.Event that will be set to indicate all operations having
terminated and this End having completely stopped. The returned event
may not be set until after the full grace period (if some ongoing
operation continues for the full length of the period) or it may be set
much sooner (if for example this End had no operations in progress at
the time its stop method was called).
"""
raise NotImplementedError()

@ -98,7 +98,7 @@ class Ticket(
COMPLETION = 'completion'
CANCELLATION = 'cancellation'
EXPIRATION = 'expiration'
LOCAL_SHUTDOWN = 'local shutdown'
SHUTDOWN = 'shutdown'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
LOCAL_FAILURE = 'local failure'

@ -0,0 +1,165 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests the RPC Framework Core's implementation of the Base interface."""
import collections
import logging
import random
import time
import unittest
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
from grpc.framework.core import implementations
from grpc.framework.interfaces.base import utilities
from grpc_test import test_common as grpc_test_common
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.base import test_cases
from grpc_test.framework.interfaces.base import test_interfaces
_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
_CODE = _intermediary_low.Code.OK
_MESSAGE = b'test message'
class _SerializationBehaviors(
collections.namedtuple(
'_SerializationBehaviors',
('request_serializers', 'request_deserializers', 'response_serializers',
'response_deserializers',))):
pass
class _Links(
collections.namedtuple(
'_Links',
('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link',
'service_end_link'))):
pass
def _serialization_behaviors_from_serializations(serializations):
request_serializers = {}
request_deserializers = {}
response_serializers = {}
response_deserializers = {}
for (group, method), serialization in serializations.iteritems():
request_serializers[group, method] = serialization.serialize_request
request_deserializers[group, method] = serialization.deserialize_request
response_serializers[group, method] = serialization.serialize_response
response_deserializers[group, method] = serialization.deserialize_response
return _SerializationBehaviors(
request_serializers, request_deserializers, response_serializers,
response_deserializers)
class _Implementation(test_interfaces.Implementation):
def instantiate(self, serializations, servicer):
serialization_behaviors = _serialization_behaviors_from_serializations(
serializations)
invocation_end_link = implementations.invocation_end_link()
service_end_link = implementations.service_end_link(
servicer, test_constants.DEFAULT_TIMEOUT,
test_constants.MAXIMUM_TIMEOUT)
service_grpc_link = service.service_link(
serialization_behaviors.request_deserializers,
serialization_behaviors.response_serializers)
port = service_grpc_link.add_port(0, None)
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link(
channel, b'localhost',
serialization_behaviors.request_serializers,
serialization_behaviors.response_deserializers)
invocation_end_link.join_link(invocation_grpc_link)
invocation_grpc_link.join_link(invocation_end_link)
service_end_link.join_link(service_grpc_link)
service_grpc_link.join_link(service_end_link)
invocation_grpc_link.start()
service_grpc_link.start()
return invocation_end_link, service_end_link, (
invocation_grpc_link, service_grpc_link)
def destantiate(self, memo):
invocation_grpc_link, service_grpc_link = memo
invocation_grpc_link.stop()
service_grpc_link.stop_gracefully()
def invocation_initial_metadata(self):
return _INVOCATION_INITIAL_METADATA
def service_initial_metadata(self):
return _SERVICE_INITIAL_METADATA
def invocation_completion(self):
return utilities.completion(None, None, None)
def service_completion(self):
return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE)
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(
original_metadata, transmitted_metadata)
def completion_transmitted(self, original_completion, transmitted_completion):
if (original_completion.terminal_metadata is not None and
not grpc_test_common.metadata_transmitted(
original_completion.terminal_metadata,
transmitted_completion.terminal_metadata)):
return False
elif original_completion.code is not transmitted_completion.code:
return False
elif original_completion.message != transmitted_completion.message:
return False
else:
return True
def setUpModule():
logging.warn('setUpModule!')
def tearDownModule():
logging.warn('tearDownModule!')
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,96 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests the RPC Framework Core's implementation of the Base interface."""
import logging
import random
import time
import unittest
from grpc.framework.core import implementations
from grpc.framework.interfaces.base import utilities
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.base import test_cases
from grpc_test.framework.interfaces.base import test_interfaces
class _Implementation(test_interfaces.Implementation):
def __init__(self):
self._invocation_initial_metadata = object()
self._service_initial_metadata = object()
self._invocation_terminal_metadata = object()
self._service_terminal_metadata = object()
def instantiate(self, serializations, servicer):
invocation = implementations.invocation_end_link()
service = implementations.service_end_link(
servicer, test_constants.DEFAULT_TIMEOUT,
test_constants.MAXIMUM_TIMEOUT)
invocation.join_link(service)
service.join_link(invocation)
return invocation, service, None
def destantiate(self, memo):
pass
def invocation_initial_metadata(self):
return self._invocation_initial_metadata
def service_initial_metadata(self):
return self._service_initial_metadata
def invocation_completion(self):
return utilities.completion(self._invocation_terminal_metadata, None, None)
def service_completion(self):
return utilities.completion(self._service_terminal_metadata, None, None)
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return transmitted_metadata is original_metadata
def completion_transmitted(self, original_completion, transmitted_completion):
return (
(original_completion.terminal_metadata is
transmitted_completion.terminal_metadata) and
original_completion.code is transmitted_completion.code and
original_completion.message is transmitted_completion.message
)
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -211,8 +211,10 @@ class _OperationTest(unittest.TestCase):
elif instruction.kind is _control.Instruction.Kind.CONCLUDE:
break
invocation_end.stop_gracefully()
service_end.stop_gracefully()
invocation_stop_event = invocation_end.stop(0)
service_stop_event = service_end.stop(0)
invocation_stop_event.wait()
service_stop_event.wait()
invocation_stats = invocation_end.operation_stats()
service_stats = service_end.operation_stats()

@ -29,9 +29,42 @@
"""State and behavior appropriate for use in tests."""
import logging
import threading
import time
from grpc.framework.interfaces.links import links
from grpc.framework.interfaces.links import utilities
# A more-or-less arbitrary limit on the length of raw data values to be logged.
_UNCOMFORTABLY_LONG = 48
def _safe_for_log_ticket(ticket):
"""Creates a safe-for-printing-to-the-log ticket for a given ticket.
Args:
ticket: Any links.Ticket.
Returns:
A links.Ticket that is as much as can be equal to the given ticket but
possibly features values like the string "<payload of length 972321>" in
place of the actual values of the given ticket.
"""
if isinstance(ticket.payload, (basestring,)):
payload_length = len(ticket.payload)
else:
payload_length = -1
if payload_length < _UNCOMFORTABLY_LONG:
return ticket
else:
return links.Ticket(
ticket.operation_id, ticket.sequence_number,
ticket.group, ticket.method, ticket.subscription, ticket.timeout,
ticket.allowance, ticket.initial_metadata,
'<payload of length {}>'.format(payload_length),
ticket.terminal_metadata, ticket.code, ticket.message,
ticket.termination)
class RecordingLink(links.Link):
@ -64,3 +97,71 @@ class RecordingLink(links.Link):
"""Returns a copy of the list of all tickets received by this Link."""
with self._condition:
return tuple(self._tickets)
class _Pipe(object):
"""A conduit that logs all tickets passed through it."""
def __init__(self, name):
self._lock = threading.Lock()
self._name = name
self._left_mate = utilities.NULL_LINK
self._right_mate = utilities.NULL_LINK
def accept_left_to_right_ticket(self, ticket):
with self._lock:
logging.warning(
'%s: moving left to right through %s: %s', time.time(), self._name,
_safe_for_log_ticket(ticket))
try:
self._right_mate.accept_ticket(ticket)
except Exception as e: # pylint: disable=broad-except
logging.exception(e)
def accept_right_to_left_ticket(self, ticket):
with self._lock:
logging.warning(
'%s: moving right to left through %s: %s', time.time(), self._name,
_safe_for_log_ticket(ticket))
try:
self._left_mate.accept_ticket(ticket)
except Exception as e: # pylint: disable=broad-except
logging.exception(e)
def join_left_mate(self, left_mate):
with self._lock:
self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate
def join_right_mate(self, right_mate):
with self._lock:
self._right_mate = (
utilities.NULL_LINK if right_mate is None else right_mate)
class _Facade(links.Link):
def __init__(self, accept, join):
self._accept = accept
self._join = join
def accept_ticket(self, ticket):
self._accept(ticket)
def join_link(self, link):
self._join(link)
def logging_links(name):
"""Creates a conduit that logs all tickets passed through it.
Args:
name: A name to use for the conduit to identify itself in logging output.
Returns:
Two links.Links, the first of which is the "left" side of the conduit
and the second of which is the "right" side of the conduit.
"""
pipe = _Pipe(name)
left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate)
right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate)
return left_facade, right_facade

@ -82,6 +82,10 @@ static ID id_metadata;
* received by the call and subsequently saved on it. */
static ID id_status;
/* id_write_flag is name of the attribute used to access the write_flag
* saved on the call. */
static ID id_write_flag;
/* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
static VALUE sym_send_message;
static VALUE sym_send_metadata;
@ -240,6 +244,30 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
return rb_ivar_set(self, id_metadata, metadata);
}
/*
call-seq:
write_flag = call.write_flag
Gets the write_flag value saved the call. */
static VALUE grpc_rb_call_get_write_flag(VALUE self) {
return rb_ivar_get(self, id_write_flag);
}
/*
call-seq:
call.write_flag = write_flag
Saves the write_flag on the call. */
static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
rb_obj_classname(write_flag));
return Qnil;
}
return rb_ivar_set(self, id_write_flag, write_flag);
}
/* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
to fill grpc_metadata_array.
@ -437,17 +465,19 @@ typedef struct run_batch_stack {
grpc_status_code recv_status;
char *recv_status_details;
size_t recv_status_details_capacity;
uint write_flag;
} run_batch_stack;
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
static void grpc_run_batch_stack_init(run_batch_stack *st) {
static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
grpc_metadata_array_init(&st->recv_metadata);
grpc_metadata_array_init(&st->recv_trailing_metadata);
st->op_num = 0;
st->write_flag = write_flag;
}
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
@ -477,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
this_op = rb_ary_entry(ops_ary, i);
this_value = rb_hash_aref(ops_hash, this_op);
st->ops[st->op_num].flags = 0;
switch (NUM2INT(this_op)) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* N.B. later there is no need to explicitly delete the metadata keys
@ -490,6 +521,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
case GRPC_OP_SEND_MESSAGE:
st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
RSTRING_PTR(this_value), RSTRING_LEN(this_value));
st->ops[st->op_num].flags = st->write_flag;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
@ -525,7 +557,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
NUM2INT(this_op));
};
st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
st->ops[st->op_num].flags = 0;
st->ops[st->op_num].reserved = NULL;
st->op_num++;
}
@ -604,6 +635,8 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_event ev;
grpc_call_error err;
VALUE result = Qnil;
VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
uint write_flag = 0;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
/* Validate the ops args, adding them to a ruby array */
@ -611,7 +644,10 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
return Qnil;
}
grpc_run_batch_stack_init(&st);
if (rb_write_flag != Qnil) {
write_flag = NUM2UINT(rb_write_flag);
}
grpc_run_batch_stack_init(&st, write_flag);
grpc_run_batch_stack_fill_ops(&st, ops_hash);
/* call grpc_call_start_batch, then wait for it to complete using
@ -638,6 +674,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
return result;
}
static void Init_grpc_write_flags() {
/* Constants representing the write flags in grpc.h */
VALUE grpc_rb_mWriteFlags =
rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
UINT2NUM(GRPC_WRITE_BUFFER_HINT));
rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
UINT2NUM(GRPC_WRITE_NO_COMPRESS));
}
static void Init_grpc_error_codes() {
/* Constants representing the error codes of grpc_call_error in grpc.h */
VALUE grpc_rb_mRpcErrors =
@ -735,10 +781,14 @@ void Init_grpc_call() {
rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
1);
/* Ids used to support call attributes */
id_metadata = rb_intern("metadata");
id_status = rb_intern("status");
id_write_flag = rb_intern("write_flag");
/* Ids used by the c wrapping internals. */
id_cq = rb_intern("__cq");
@ -766,6 +816,7 @@ void Init_grpc_call() {
Init_grpc_error_codes();
Init_grpc_op_codes();
Init_grpc_write_flags();
}
/* Gets the call from the ruby object */

@ -59,7 +59,7 @@ module GRPC
include Core::CallOps
extend Forwardable
attr_reader(:deadline)
def_delegators :@call, :cancel, :metadata
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=
# client_invoke begins a client invocation.
#
@ -484,6 +484,7 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status, :start_call, :wait)
:metadata, :status, :start_call, :wait, :write_flag,
:write_flag=)
end
end

@ -31,6 +31,14 @@ require 'grpc'
include GRPC::Core::StatusCodes
describe GRPC::Core::WriteFlags do
it 'should define the known write flag values' do
m = GRPC::Core::WriteFlags
expect(m.const_get(:BUFFER_HINT)).to_not be_nil
expect(m.const_get(:NO_COMPRESS)).to_not be_nil
end
end
describe GRPC::Core::RpcErrors do
before(:each) do
@known_types = {

@ -35,6 +35,7 @@ describe GRPC::ActiveCall do
ActiveCall = GRPC::ActiveCall
Call = GRPC::Core::Call
CallOps = GRPC::Core::CallOps
WriteFlags = GRPC::Core::WriteFlags
before(:each) do
@pass_through = proc { |x| x }
@ -129,6 +130,31 @@ describe GRPC::ActiveCall do
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS]
TEST_WRITE_FLAGS.each do |f|
it "successfully makes calls with write_flag set to #{f}" do
call = make_test_call
ActiveCall.client_invoke(call, @client_queue)
marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, marshal,
@pass_through, deadline)
msg = 'message is a string'
client_call.write_flag = f
client_call.remote_send(msg)
# confirm that the message was marshalled
recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
recvd_call = recvd_rpc.call
server_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
end
end
describe '#client_invoke' do
@ -261,7 +287,7 @@ describe GRPC::ActiveCall do
client_call.writes_done(false)
server_call = expect_server_to_receive(msg)
e = client_call.each_remote_read
n = 3 # arbitrary value > 1
n = 3 # arbitrary value > 1
n.times do
server_call.remote_send(reply)
expect(e.next).to eq(reply)

@ -39,4 +39,12 @@ export LD_LIBRARY_PATH=$ROOT/libs/$CONFIG
export DYLD_LIBRARY_PATH=$ROOT/libs/$CONFIG
export PATH=$ROOT/bins/$CONFIG:$ROOT/bins/$CONFIG/protobuf:$PATH
source "python"$PYVER"_virtual_environment"/bin/activate
# TODO(atash): These tests don't currently run under py.test and thus don't
# appear under the coverage report. Find a way to get these tests to work with
# py.test (or find another tool or *something*) that's acceptable to the rest of
# the team...
"python"$PYVER -m grpc_test._core_over_links_base_interface_test
"python"$PYVER -m grpc_test.framework.core._base_interface_test
"python"$PYVER $GRPCIO_TEST/setup.py test -a "-n8 --cov=grpc --junitxml=./report.xml"

Loading…
Cancel
Save