Merge branch 'master' of github.com:grpc/grpc into the-purge-2

Conflicts:
	include/grpc++/completion_queue.h
pull/1227/head
Nicolas "Pixel" Noble 10 years ago
commit fd2bf675f7
  1. 2
      BUILD
  2. 2
      Makefile
  3. 1
      build.json
  4. 3
      examples/pubsub/main.cc
  5. 2
      examples/pubsub/publisher_test.cc
  6. 2
      examples/pubsub/subscriber_test.cc
  7. 5
      include/grpc++/completion_queue.h
  8. 5
      include/grpc++/credentials.h
  9. 50
      include/grpc++/impl/grpc_library.h
  10. 4
      include/grpc++/server.h
  11. 4
      src/cpp/client/channel.h
  12. 3
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  13. 10
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  14. 176
      src/csharp/Grpc.Core/Server.cs
  15. 2
      src/csharp/Grpc.Examples.MathServer/MathServer.cs
  16. 4
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
  17. 6
      src/ruby/ext/grpc/rb_call.c
  18. 2
      src/ruby/grpc.gemspec
  19. 1
      src/ruby/lib/grpc.rb
  20. 2
      src/ruby/lib/grpc/generic/active_call.rb
  21. 3
      src/ruby/lib/grpc/generic/bidi_call.rb
  22. 27
      src/ruby/lib/grpc/generic/rpc_server.rb
  23. 60
      src/ruby/lib/grpc/notifier.rb
  24. 31
      src/ruby/spec/generic/client_stub_spec.rb
  25. 2
      test/cpp/client/credentials_test.cc
  26. 5
      test/cpp/end2end/async_end2end_test.cc
  27. 5
      test/cpp/end2end/end2end_test.cc
  28. 5
      test/cpp/end2end/generic_end2end_test.cc
  29. 4
      test/cpp/interop/client.cc
  30. 2
      test/cpp/interop/server.cc
  31. 2
      test/cpp/qps/qps_driver.cc
  32. 3
      test/cpp/qps/smoke_test.cc
  33. 4
      test/cpp/qps/worker.cc
  34. 5
      test/cpp/util/cli_call_test.cc
  35. 4
      test/cpp/util/grpc_cli.cc
  36. 1
      vsprojects/grpc++/grpc++.vcxproj
  37. 3
      vsprojects/grpc++/grpc++.vcxproj.filters

@ -620,6 +620,7 @@ cc_library(
"include/grpc++/generic_stub.h", "include/grpc++/generic_stub.h",
"include/grpc++/impl/call.h", "include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h", "include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/internal_stub.h", "include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h", "include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h", "include/grpc++/impl/rpc_service_method.h",
@ -698,6 +699,7 @@ cc_library(
"include/grpc++/generic_stub.h", "include/grpc++/generic_stub.h",
"include/grpc++/impl/call.h", "include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h", "include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/internal_stub.h", "include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h", "include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h", "include/grpc++/impl/rpc_service_method.h",

@ -3854,6 +3854,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/generic_stub.h \ include/grpc++/generic_stub.h \
include/grpc++/impl/call.h \ include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \ include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/internal_stub.h \ include/grpc++/impl/internal_stub.h \
include/grpc++/impl/rpc_method.h \ include/grpc++/impl/rpc_method.h \
include/grpc++/impl/rpc_service_method.h \ include/grpc++/impl/rpc_service_method.h \
@ -4118,6 +4119,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/generic_stub.h \ include/grpc++/generic_stub.h \
include/grpc++/impl/call.h \ include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \ include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/internal_stub.h \ include/grpc++/impl/internal_stub.h \
include/grpc++/impl/rpc_method.h \ include/grpc++/impl/rpc_method.h \
include/grpc++/impl/rpc_service_method.h \ include/grpc++/impl/rpc_service_method.h \

@ -28,6 +28,7 @@
"include/grpc++/generic_stub.h", "include/grpc++/generic_stub.h",
"include/grpc++/impl/call.h", "include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h", "include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/internal_stub.h", "include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h", "include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h", "include/grpc++/impl/rpc_service_method.h",

@ -64,7 +64,6 @@ const char kMessageData[] = "Test Data";
} // namespace } // namespace
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_init();
grpc::testing::InitTest(&argc, &argv, true); grpc::testing::InitTest(&argc, &argv, true);
gpr_log(GPR_INFO, "Start PUBSUB client"); gpr_log(GPR_INFO, "Start PUBSUB client");
@ -145,7 +144,5 @@ int main(int argc, char** argv) {
subscriber.Shutdown(); subscriber.Shutdown();
publisher.Shutdown(); publisher.Shutdown();
channel.reset();
grpc_shutdown();
return 0; return 0;
} }

@ -148,10 +148,8 @@ TEST_F(PublisherTest, TestPublisher) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
grpc_init();
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
gpr_log(GPR_INFO, "Start test ..."); gpr_log(GPR_INFO, "Start test ...");
int result = RUN_ALL_TESTS(); int result = RUN_ALL_TESTS();
grpc_shutdown();
return result; return result;
} }

@ -147,10 +147,8 @@ TEST_F(SubscriberTest, TestSubscriber) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
grpc_init();
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
gpr_log(GPR_INFO, "Start test ..."); gpr_log(GPR_INFO, "Start test ...");
int result = RUN_ALL_TESTS(); int result = RUN_ALL_TESTS();
grpc_shutdown();
return result; return result;
} }

@ -36,6 +36,7 @@
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include <grpc++/impl/client_unary_call.h> #include <grpc++/impl/client_unary_call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/time.h> #include <grpc++/time.h>
struct grpc_completion_queue; struct grpc_completion_queue;
@ -71,11 +72,11 @@ class CompletionQueueTag {
}; };
// grpc_completion_queue wrapper class // grpc_completion_queue wrapper class
class CompletionQueue { class CompletionQueue : public GrpcLibrary {
public: public:
CompletionQueue(); CompletionQueue();
explicit CompletionQueue(grpc_completion_queue* take); explicit CompletionQueue(grpc_completion_queue* take);
~CompletionQueue(); ~CompletionQueue() GRPC_OVERRIDE;
// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT // Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT
enum NextStatus { SHUTDOWN, GOT_EVENT, TIMEOUT }; enum NextStatus { SHUTDOWN, GOT_EVENT, TIMEOUT };

@ -37,15 +37,16 @@
#include <memory> #include <memory>
#include <grpc++/config.h> #include <grpc++/config.h>
#include <grpc++/impl/grpc_library.h>
namespace grpc { namespace grpc {
class ChannelArguments; class ChannelArguments;
class ChannelInterface; class ChannelInterface;
class SecureCredentials; class SecureCredentials;
class Credentials { class Credentials : public GrpcLibrary {
public: public:
virtual ~Credentials(); ~Credentials() GRPC_OVERRIDE;
protected: protected:
friend std::unique_ptr<Credentials> CompositeCredentials( friend std::unique_ptr<Credentials> CompositeCredentials(

@ -0,0 +1,50 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_IMPL_GRPC_LIBRARY_H
#define GRPCXX_IMPL_GRPC_LIBRARY_H
#include <grpc/grpc.h>
namespace grpc {
class GrpcLibrary {
public:
GrpcLibrary() { grpc_init(); }
virtual ~GrpcLibrary() { grpc_shutdown(); }
};
} // namespace grpc
#endif // GRPCXX_IMPL_GRPC_LIBRARY_H

@ -40,6 +40,7 @@
#include <grpc++/completion_queue.h> #include <grpc++/completion_queue.h>
#include <grpc++/config.h> #include <grpc++/config.h>
#include <grpc++/impl/call.h> #include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/service_type.h> #include <grpc++/impl/service_type.h>
#include <grpc++/impl/sync.h> #include <grpc++/impl/sync.h>
#include <grpc++/status.h> #include <grpc++/status.h>
@ -56,7 +57,8 @@ class ServerCredentials;
class ThreadPoolInterface; class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread. // Currently it only supports handling rpcs in a single thread.
class Server GRPC_FINAL : private CallHook, class Server GRPC_FINAL : public GrpcLibrary,
private CallHook,
private AsynchronousService::DispatchImpl { private AsynchronousService::DispatchImpl {
public: public:
~Server(); ~Server();

@ -38,6 +38,7 @@
#include <grpc++/channel_interface.h> #include <grpc++/channel_interface.h>
#include <grpc++/config.h> #include <grpc++/config.h>
#include <grpc++/impl/grpc_library.h>
struct grpc_channel; struct grpc_channel;
@ -49,7 +50,8 @@ class CompletionQueue;
class Credentials; class Credentials;
class StreamContextInterface; class StreamContextInterface;
class Channel GRPC_FINAL : public ChannelInterface { class Channel GRPC_FINAL : public GrpcLibrary,
public ChannelInterface {
public: public:
Channel(const grpc::string& target, grpc_channel* c_channel); Channel(const grpc::string& target, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE; ~Channel() GRPC_OVERRIDE;

@ -33,6 +33,7 @@ using System;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using Grpc.Core; using Grpc.Core;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal namespace Grpc.Core.Internal
{ {
@ -180,7 +181,7 @@ namespace Grpc.Core.Internal
private static void AssertCallOk(GRPCCallError callError) private static void AssertCallOk(GRPCCallError callError)
{ {
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
} }
private static uint GetFlags(bool buffered) private static uint GetFlags(bool buffered)

@ -35,6 +35,7 @@ using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal namespace Grpc.Core.Internal
{ {
@ -105,9 +106,9 @@ namespace Grpc.Core.Internal
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
} }
public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
{ {
return grpcsharp_server_request_call(this, cq, callback); AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
} }
protected override bool ReleaseHandle() protected override bool ReleaseHandle()
@ -115,5 +116,10 @@ namespace Grpc.Core.Internal
grpcsharp_server_destroy(handle); grpcsharp_server_destroy(handle);
return true; return true;
} }
private static void AssertCallOk(GRPCCallError callError)
{
Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
} }
} }

@ -38,27 +38,29 @@ using System.Diagnostics;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading.Tasks; using System.Threading.Tasks;
using Grpc.Core.Internal; using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core namespace Grpc.Core
{ {
/// <summary> /// <summary>
/// Server is implemented only to be able to do /// A gRPC server.
/// in-process testing.
/// </summary> /// </summary>
public class Server public class Server
{ {
// TODO: make sure the delegate doesn't get garbage collected while // TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue. // native callbacks are in the completion queue.
readonly ServerShutdownCallbackDelegate serverShutdownHandler; readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler; readonly CompletionCallbackDelegate newServerRpcHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle; readonly ServerSafeHandle handle;
readonly object myLock = new object();
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
bool startRequested;
bool shutdownRequested;
public Server() public Server()
{ {
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
@ -66,71 +68,81 @@ namespace Grpc.Core
this.serverShutdownHandler = HandleServerShutdown; this.serverShutdownHandler = HandleServerShutdown;
} }
// only call this before Start() /// <summary>
/// Adds a service definition to the server. This is how you register
/// handlers for a service with the server.
/// Only call this before Start().
/// </summary>
public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) public void AddServiceDefinition(ServerServiceDefinition serviceDefinition)
{ {
foreach (var entry in serviceDefinition.CallHandlers) lock (myLock)
{ {
callHandlers.Add(entry.Key, entry.Value); Preconditions.CheckState(!startRequested);
foreach (var entry in serviceDefinition.CallHandlers)
{
callHandlers.Add(entry.Key, entry.Value);
}
} }
} }
// only call before Start() /// <summary>
/// Add a non-secure port on which server should listen.
/// Only call this before Start().
/// </summary>
public int AddListeningPort(string addr) public int AddListeningPort(string addr)
{ {
return handle.AddListeningPort(addr); lock (myLock)
}
// only call before Start()
public int AddListeningPort(string addr, ServerCredentials credentials)
{
using (var nativeCredentials = credentials.ToNativeCredentials())
{ {
return handle.AddListeningPort(addr, nativeCredentials); Preconditions.CheckState(!startRequested);
return handle.AddListeningPort(addr);
} }
} }
public void Start()
{
handle.Start();
// TODO: this basically means the server is single threaded....
StartHandlingRpcs();
}
/// <summary> /// <summary>
/// Requests and handles single RPC call. /// Add a secure port on which server should listen.
/// Only call this before Start().
/// </summary> /// </summary>
internal void RunRpc() public int AddListeningPort(string addr, ServerCredentials credentials)
{ {
AllowOneRpc(); lock (myLock)
try
{ {
var rpcInfo = newRpcQueue.Take(); Preconditions.CheckState(!startRequested);
using (var nativeCredentials = credentials.ToNativeCredentials())
// Console.WriteLine("Server received RPC " + rpcInfo.Method);
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
{ {
callHandler = new NoSuchMethodCallHandler(); return handle.AddListeningPort(addr, nativeCredentials);
} }
callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue());
} }
catch (Exception e) }
/// <summary>
/// Starts the server.
/// </summary>
public void Start()
{
lock (myLock)
{ {
Console.WriteLine("Exception while handling RPC: " + e); Preconditions.CheckState(!startRequested);
startRequested = true;
handle.Start();
AllowOneRpc();
} }
} }
/// <summary> /// <summary>
/// Requests server shutdown and when there are no more calls being serviced, /// Requests server shutdown and when there are no more calls being serviced,
/// cleans up used resources. /// cleans up used resources. The returned task finishes when shutdown procedure
/// is complete.
/// </summary> /// </summary>
/// <returns>The async.</returns>
public async Task ShutdownAsync() public async Task ShutdownAsync()
{ {
lock (myLock)
{
Preconditions.CheckState(startRequested);
Preconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
handle.ShutdownAndNotify(serverShutdownHandler); handle.ShutdownAndNotify(serverShutdownHandler);
await shutdownTcs.Task; await shutdownTcs.Task;
handle.Dispose(); handle.Dispose();
@ -152,19 +164,43 @@ namespace Grpc.Core
handle.Dispose(); handle.Dispose();
} }
private async Task StartHandlingRpcs() /// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
private void AllowOneRpc()
{ {
while (true) lock (myLock)
{ {
await Task.Factory.StartNew(RunRpc); if (!shutdownRequested)
{
handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
}
} }
} }
private void AllowOneRpc() /// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
private void InvokeCallHandler(CallSafeHandle call, string method)
{ {
AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); try
{
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(method, out callHandler))
{
callHandler = new NoSuchMethodCallHandler();
}
callHandler.StartCall(method, call, GetCompletionQueue());
}
catch (Exception e)
{
Console.WriteLine("Exception while handling RPC: " + e);
}
} }
/// <summary>
/// Handles the native callback.
/// </summary>
private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr)
{ {
try try
@ -176,13 +212,16 @@ namespace Grpc.Core
// TODO: handle error // TODO: handle error
} }
var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod()); CallSafeHandle call = ctx.GetServerRpcNewCall();
string method = ctx.GetServerRpcNewMethod();
// after server shutdown, the callback returns with null call // after server shutdown, the callback returns with null call
if (!rpcInfo.Call.IsInvalid) if (!call.IsInvalid)
{ {
newRpcQueue.Add(rpcInfo); Task.Run(() => InvokeCallHandler(call, method));
} }
AllowOneRpc();
} }
catch (Exception e) catch (Exception e)
{ {
@ -190,6 +229,10 @@ namespace Grpc.Core
} }
} }
/// <summary>
/// Handles native callback.
/// </summary>
/// <param name="eventPtr"></param>
private void HandleServerShutdown(IntPtr eventPtr) private void HandleServerShutdown(IntPtr eventPtr)
{ {
try try
@ -202,42 +245,9 @@ namespace Grpc.Core
} }
} }
private static void AssertCallOk(GRPCCallError callError)
{
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
private static CompletionQueueSafeHandle GetCompletionQueue() private static CompletionQueueSafeHandle GetCompletionQueue()
{ {
return GrpcEnvironment.ThreadPool.CompletionQueue; return GrpcEnvironment.ThreadPool.CompletionQueue;
} }
private struct NewRpcInfo
{
private CallSafeHandle call;
private string method;
public NewRpcInfo(CallSafeHandle call, string method)
{
this.call = call;
this.method = method;
}
public CallSafeHandle Call
{
get
{
return this.call;
}
}
public string Method
{
get
{
return this.method;
}
}
}
} }
} }

@ -40,7 +40,7 @@ namespace math
{ {
public static void Main(string[] args) public static void Main(string[] args)
{ {
String host = "0.0.0.0"; string host = "0.0.0.0";
GrpcEnvironment.Initialize(); GrpcEnvironment.Initialize();

@ -80,5 +80,7 @@
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None> </None>
</ItemGroup> </ItemGroup>
<ItemGroup /> <ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
</Project> </Project>

@ -607,19 +607,19 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
rb_raise(grpc_rb_eCallError, rb_raise(grpc_rb_eCallError,
"grpc_call_start_batch failed with %s (code=%d)", "grpc_call_start_batch failed with %s (code=%d)",
grpc_call_error_detail_of(err), err); grpc_call_error_detail_of(err), err);
return; return Qnil;
} }
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
if (ev == NULL) { if (ev == NULL) {
grpc_run_batch_stack_cleanup(&st); grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
return; return Qnil;
} }
if (ev->data.op_complete != GRPC_OP_OK) { if (ev->data.op_complete != GRPC_OP_OK) {
grpc_run_batch_stack_cleanup(&st); grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)", rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)",
ev->data.op_complete); ev->data.op_complete);
return; return Qnil;
} }
/* Build and return the BatchResult struct result */ /* Build and return the BatchResult struct result */

@ -26,7 +26,7 @@ Gem::Specification.new do |s|
s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1'
s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests
s.add_dependency 'logging', '~> 1.8' s.add_dependency 'logging', '~> 2.0'
s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests
s.add_development_dependency 'simplecov', '~> 0.9' s.add_development_dependency 'simplecov', '~> 0.9'

@ -30,6 +30,7 @@
require 'grpc/errors' require 'grpc/errors'
require 'grpc/grpc' require 'grpc/grpc'
require 'grpc/logconfig' require 'grpc/logconfig'
require 'grpc/notifier'
require 'grpc/version' require 'grpc/version'
require 'grpc/core/time_consts' require 'grpc/core/time_consts'
require 'grpc/generic/active_call' require 'grpc/generic/active_call'

@ -188,7 +188,7 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already # @param marshalled [false, true] indicates if the object is already
# marshalled. # marshalled.
def remote_send(req, marshalled = false) def remote_send(req, marshalled = false)
logger.debug("sending #{req.inspect}, marshalled? #{marshalled}") logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled if marshalled
payload = req payload = req
else else

@ -123,8 +123,7 @@ module GRPC
break if req.equal?(END_OF_READS) break if req.equal?(END_OF_READS)
yield req yield req
end end
@loop_th.join @enq_th.join if @enq_th.alive?
@enq_th.join
end end
# during bidi-streaming, read the requests to send from a separate thread # during bidi-streaming, read the requests to send from a separate thread

@ -54,6 +54,18 @@ module GRPC
end end
module_function :handle_signals module_function :handle_signals
# Sets up a signal handler that adds signals to the signal handling global.
#
# Signal handlers should do as little as humanly possible.
# Here, they just add themselves to $grpc_signals
#
# RpcServer (and later other parts of gRPC) monitors the signals
# $grpc_signals in its own non-signal context.
def trap_signals
%w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
end
module_function :trap_signals
# Pool is a simple thread pool. # Pool is a simple thread pool.
class Pool class Pool
# Default keep alive period is 1s # Default keep alive period is 1s
@ -172,17 +184,6 @@ module GRPC
# Signal check period is 0.25s # Signal check period is 0.25s
SIGNAL_CHECK_PERIOD = 0.25 SIGNAL_CHECK_PERIOD = 0.25
# Sets up a signal handler that adds signals to the signal handling global.
#
# Signal handlers should do as little as humanly possible.
# Here, they just add themselves to $grpc_signals
#
# RpcServer (and later other parts of gRPC) monitors the signals
# $grpc_signals in its own non-signal context.
def self.trap_signals
%w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
end
# setup_cq is used by #initialize to constuct a Core::CompletionQueue from # setup_cq is used by #initialize to constuct a Core::CompletionQueue from
# its arguments. # its arguments.
def self.setup_cq(alt_cq) def self.setup_cq(alt_cq)
@ -299,12 +300,12 @@ module GRPC
# Runs the server in its own thread, then waits for signal INT or TERM on # Runs the server in its own thread, then waits for signal INT or TERM on
# the current thread to terminate it. # the current thread to terminate it.
def run_till_terminated def run_till_terminated
self.class.trap_signals GRPC.trap_signals
t = Thread.new { run } t = Thread.new { run }
wait_till_running wait_till_running
loop do loop do
sleep SIGNAL_CHECK_PERIOD sleep SIGNAL_CHECK_PERIOD
break unless handle_signals break unless GRPC.handle_signals
end end
stop stop
t.join t.join

@ -0,0 +1,60 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# GRPC contains the General RPC module.
module GRPC
# Notifier is useful high-level synchronization primitive.
class Notifier
attr_reader :payload, :notified
alias_method :notified?, :notified
def initialize
@mutex = Mutex.new
@cvar = ConditionVariable.new
@notified = false
@payload = nil
end
def wait
@mutex.synchronize do
@cvar.wait(@mutex) until notified?
end
end
def notify(payload)
@mutex.synchronize do
return Error.new('already notified') if notified?
@payload = payload
@notified = true
@cvar.signal
return nil
end
end
end
end

@ -29,37 +29,8 @@
require 'grpc' require 'grpc'
# Notifier is useful high-level synchronization primitive.
class Notifier
attr_reader :payload, :notified
alias_method :notified?, :notified
def initialize
@mutex = Mutex.new
@cvar = ConditionVariable.new
@notified = false
@payload = nil
end
def wait
@mutex.synchronize do
@cvar.wait(@mutex) until notified?
end
end
def notify(payload)
@mutex.synchronize do
return Error.new('already notified') if notified?
@payload = payload
@notified = true
@cvar.signal
return nil
end
end
end
def wakey_thread(&blk) def wakey_thread(&blk)
n = Notifier.new n = GRPC::Notifier.new
t = Thread.new do t = Thread.new do
blk.call(n) blk.call(n)
end end

@ -56,8 +56,6 @@ TEST_F(CredentialsTest, InvalidServiceAccountCreds) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS(); int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret; return ret;
} }

@ -594,9 +594,6 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
grpc_init();
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS(); return RUN_ALL_TESTS();
grpc_shutdown();
return result;
} }

@ -561,9 +561,6 @@ TEST_F(End2endTest, ClientCancelsBidi) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
grpc_init();
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS(); return RUN_ALL_TESTS();
grpc_shutdown();
return result;
} }

@ -279,9 +279,6 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
grpc_init();
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS(); return RUN_ALL_TESTS();
grpc_shutdown();
return result;
} }

@ -76,8 +76,6 @@ using grpc::testing::CreateChannelForTestCase;
using grpc::testing::GetServiceAccountJsonKey; using grpc::testing::GetServiceAccountJsonKey;
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_init();
grpc::testing::InitTest(&argc, &argv, true); grpc::testing::InitTest(&argc, &argv, true);
int ret = 0; int ret = 0;
@ -129,8 +127,6 @@ int main(int argc, char** argv) {
FLAGS_test_case.c_str()); FLAGS_test_case.c_str());
ret = 1; ret = 1;
} }
client.Reset(nullptr);
grpc_shutdown();
return ret; return ret;
} }

@ -218,13 +218,11 @@ void RunServer() {
static void sigint_handler(int x) { got_sigint = true; } static void sigint_handler(int x) { got_sigint = true; }
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_init();
grpc::testing::InitTest(&argc, &argv, true); grpc::testing::InitTest(&argc, &argv, true);
signal(SIGINT, sigint_handler); signal(SIGINT, sigint_handler);
GPR_ASSERT(FLAGS_port != 0); GPR_ASSERT(FLAGS_port != 0);
RunServer(); RunServer();
grpc_shutdown();
return 0; return 0;
} }

@ -69,7 +69,6 @@ using grpc::testing::RpcType;
using grpc::testing::ResourceUsage; using grpc::testing::ResourceUsage;
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_init();
grpc::testing::InitTest(&argc, &argv, true); grpc::testing::InitTest(&argc, &argv, true);
RpcType rpc_type; RpcType rpc_type;
@ -104,6 +103,5 @@ int main(int argc, char** argv) {
ReportLatency(result); ReportLatency(result);
ReportTimes(result); ReportTimes(result);
grpc_shutdown();
return 0; return 0;
} }

@ -136,14 +136,11 @@ static void RunQPS() {
} // namespace grpc } // namespace grpc
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_init();
using namespace grpc::testing; using namespace grpc::testing;
RunSynchronousStreamingPingPong(); RunSynchronousStreamingPingPong();
RunSynchronousUnaryPingPong(); RunSynchronousUnaryPingPong();
RunAsyncUnaryPingPong(); RunAsyncUnaryPingPong();
RunQPS(); RunQPS();
grpc_shutdown();
return 0; return 0;
} }

@ -64,13 +64,11 @@ static void RunServer() {
} // namespace grpc } // namespace grpc
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_init();
grpc::testing::InitTest(&argc, &argv, true); grpc::testing::InitTest(&argc, &argv, true);
signal(SIGINT, sigint_handler); signal(SIGINT, sigint_handler);
grpc::testing::RunServer(); grpc::testing::RunServer();
grpc_shutdown();
return 0; return 0;
} }

@ -123,9 +123,6 @@ TEST_F(CliCallTest, SimpleRpc) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
grpc_init();
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS(); return RUN_ALL_TESTS();
grpc_shutdown();
return result;
} }

@ -79,8 +79,6 @@ DEFINE_string(output_binary_file, "output.bin",
"Path to output file to write serialized response."); "Path to output file to write serialized response.");
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_init();
grpc::testing::InitTest(&argc, &argv, true); grpc::testing::InitTest(&argc, &argv, true);
if (argc < 4 || grpc::string(argv[1]) != "call") { if (argc < 4 || grpc::string(argv[1]) != "call") {
@ -127,7 +125,5 @@ int main(int argc, char** argv) {
output_file << response; output_file << response;
} }
channel.reset();
grpc_shutdown();
return 0; return 0;
} }

@ -96,6 +96,7 @@
<ClInclude Include="..\..\include\grpc++\generic_stub.h" /> <ClInclude Include="..\..\include\grpc++\generic_stub.h" />
<ClInclude Include="..\..\include\grpc++\impl\call.h" /> <ClInclude Include="..\..\include\grpc++\impl\call.h" />
<ClInclude Include="..\..\include\grpc++\impl\client_unary_call.h" /> <ClInclude Include="..\..\include\grpc++\impl\client_unary_call.h" />
<ClInclude Include="..\..\include\grpc++\impl\grpc_library.h" />
<ClInclude Include="..\..\include\grpc++\impl\internal_stub.h" /> <ClInclude Include="..\..\include\grpc++\impl\internal_stub.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_method.h" /> <ClInclude Include="..\..\include\grpc++\impl\rpc_method.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h" /> <ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h" />

@ -120,6 +120,9 @@
<ClInclude Include="..\..\include\grpc++\impl\client_unary_call.h"> <ClInclude Include="..\..\include\grpc++\impl\client_unary_call.h">
<Filter>include\grpc++\impl</Filter> <Filter>include\grpc++\impl</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="..\..\include\grpc++\impl\grpc_library.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\impl\internal_stub.h"> <ClInclude Include="..\..\include\grpc++\impl\internal_stub.h">
<Filter>include\grpc++\impl</Filter> <Filter>include\grpc++\impl</Filter>
</ClInclude> </ClInclude>

Loading…
Cancel
Save