Merge github.com:grpc/grpc into naming-crisis

pull/3035/head
Craig Tiller 10 years ago
commit 5d4d1ee34e
  1. 88
      BUILD
  2. 208
      Makefile
  3. 96
      build.json
  4. 10
      examples/pubsub/main.cc
  5. 2
      examples/pubsub/publisher.cc
  6. 5
      examples/pubsub/publisher.h
  7. 8
      examples/pubsub/publisher_test.cc
  8. 2
      examples/pubsub/subscriber.cc
  9. 5
      examples/pubsub/subscriber.h
  10. 6
      examples/pubsub/subscriber_test.cc
  11. 90
      include/grpc++/channel.h
  12. 18
      include/grpc++/client_context.h
  13. 9
      include/grpc++/completion_queue.h
  14. 7
      include/grpc++/create_channel.h
  15. 8
      include/grpc++/credentials.h
  16. 10
      include/grpc++/generic/async_generic_service.h
  17. 15
      include/grpc++/generic/generic_stub.h
  18. 18
      include/grpc++/impl/call.h
  19. 9
      include/grpc++/impl/client_unary_call.h
  20. 4
      include/grpc++/impl/proto_utils.h
  21. 14
      include/grpc++/impl/rpc_method.h
  22. 24
      include/grpc++/impl/rpc_service_method.h
  23. 4
      include/grpc++/impl/service_type.h
  24. 2
      include/grpc++/impl/sync.h
  25. 2
      include/grpc++/impl/thd.h
  26. 21
      include/grpc++/server.h
  27. 10
      include/grpc++/server_builder.h
  28. 6
      include/grpc++/server_context.h
  29. 2
      include/grpc++/server_credentials.h
  30. 364
      include/grpc++/support/async_stream.h
  31. 14
      include/grpc++/support/async_unary_call.h
  32. 8
      include/grpc++/support/auth_context.h
  33. 12
      include/grpc++/support/byte_buffer.h
  34. 8
      include/grpc++/support/channel_arguments.h
  35. 6
      include/grpc++/support/config.h
  36. 6
      include/grpc++/support/config_protobuf.h
  37. 8
      include/grpc++/support/slice.h
  38. 10
      include/grpc++/support/status.h
  39. 6
      include/grpc++/support/status_code_enum.h
  40. 6
      include/grpc++/support/stub_options.h
  41. 392
      include/grpc++/support/sync_stream.h
  42. 8
      include/grpc++/support/time.h
  43. 26
      include/grpc/compression.h
  44. 11
      include/grpc/grpc.h
  45. 14
      include/grpc/grpc_security.h
  46. 4
      src/compiler/config.h
  47. 79
      src/compiler/cpp_generator.cc
  48. 2
      src/compiler/python_generator.cc
  49. 63
      src/core/channel/channel_args.c
  50. 20
      src/core/channel/channel_args.h
  51. 7
      src/core/iomgr/pollset.h
  52. 2
      src/core/iomgr/pollset_multipoller_with_epoll.c
  53. 2
      src/core/iomgr/pollset_multipoller_with_poll_posix.c
  54. 15
      src/core/iomgr/pollset_posix.c
  55. 6
      src/core/iomgr/pollset_posix.h
  56. 10
      src/core/iomgr/pollset_windows.c
  57. 2
      src/core/security/google_default_credentials.c
  58. 20
      src/core/security/security_connector.c
  59. 28
      src/core/security/server_auth_filter.c
  60. 16
      src/core/surface/completion_queue.c
  61. 12
      src/core/transport/chttp2/stream_encoder.c
  62. 15
      src/cpp/client/channel.cc
  63. 80
      src/cpp/client/channel.h
  64. 3
      src/cpp/client/channel_arguments.cc
  65. 4
      src/cpp/client/client_context.cc
  66. 19
      src/cpp/client/create_channel.cc
  67. 14
      src/cpp/client/create_channel_internal.cc
  68. 24
      src/cpp/client/create_channel_internal.h
  69. 5
      src/cpp/client/generic_stub.cc
  70. 18
      src/cpp/client/insecure_credentials.cc
  71. 4
      src/cpp/client/secure_channel_arguments.cc
  72. 12
      src/cpp/client/secure_credentials.cc
  73. 4
      src/cpp/client/secure_credentials.h
  74. 2
      src/cpp/common/auth_property_iterator.cc
  75. 5
      src/cpp/common/call.cc
  76. 2
      src/cpp/common/completion_queue.cc
  77. 2
      src/cpp/common/create_auth_context.h
  78. 2
      src/cpp/common/insecure_create_auth_context.cc
  79. 2
      src/cpp/common/secure_auth_context.h
  80. 2
      src/cpp/common/secure_create_auth_context.cc
  81. 2
      src/cpp/proto/proto_utils.cc
  82. 2
      src/cpp/server/async_generic_service.cc
  83. 3
      src/cpp/server/create_default_thread_pool.cc
  84. 3
      src/cpp/server/dynamic_thread_pool.cc
  85. 16
      src/cpp/server/dynamic_thread_pool.h
  86. 2
      src/cpp/server/fixed_size_thread_pool.cc
  87. 14
      src/cpp/server/fixed_size_thread_pool.h
  88. 4
      src/cpp/server/secure_server_credentials.h
  89. 106
      src/cpp/server/server.cc
  90. 16
      src/cpp/server/server_builder.cc
  91. 2
      src/cpp/server/server_context.cc
  92. 6
      src/cpp/server/thread_pool_interface.h
  93. 2
      src/cpp/util/byte_buffer.cc
  94. 2
      src/cpp/util/slice.cc
  95. 2
      src/cpp/util/status.cc
  96. 4
      src/cpp/util/time.cc
  97. 1
      src/csharp/.gitignore
  98. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  99. 7
      src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
  100. 222
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  101. Some files were not shown because too many files have changed in this diff Show More

88
BUILD

@ -675,8 +675,11 @@ cc_library(
"src/cpp/client/secure_credentials.h",
"src/cpp/common/secure_auth_context.h",
"src/cpp/server/secure_server_credentials.h",
"src/cpp/client/channel.h",
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/create_auth_context.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/fixed_size_thread_pool.h",
"src/cpp/server/thread_pool_interface.h",
"src/cpp/client/secure_channel_arguments.cc",
"src/cpp/client/secure_credentials.cc",
"src/cpp/common/auth_property_iterator.cc",
@ -687,10 +690,10 @@ cc_library(
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/create_channel_internal.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/client/insecure_credentials.cc",
"src/cpp/client/internal_stub.cc",
"src/cpp/common/call.cc",
"src/cpp/common/completion_queue.cc",
"src/cpp/common/rpc_method.cc",
@ -710,25 +713,16 @@ cc_library(
"src/cpp/util/time.cc",
],
hdrs = [
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
"include/grpc++/channel.h",
"include/grpc++/client_context.h",
"include/grpc++/completion_queue.h",
"include/grpc++/config.h",
"include/grpc++/config_protobuf.h",
"include/grpc++/create_channel.h",
"include/grpc++/credentials.h",
"include/grpc++/dynamic_thread_pool.h",
"include/grpc++/fixed_size_thread_pool.h",
"include/grpc++/generic_stub.h",
"include/grpc++/generic/async_generic_service.h",
"include/grpc++/generic/generic_stub.h",
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/proto_utils.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
@ -744,13 +738,19 @@ cc_library(
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/slice.h",
"include/grpc++/status.h",
"include/grpc++/status_code_enum.h",
"include/grpc++/stream.h",
"include/grpc++/stub_options.h",
"include/grpc++/thread_pool_interface.h",
"include/grpc++/time.h",
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
"include/grpc++/support/channel_arguments.h",
"include/grpc++/support/config.h",
"include/grpc++/support/config_protobuf.h",
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/time.h",
],
includes = [
"include",
@ -767,17 +767,20 @@ cc_library(
cc_library(
name = "grpc++_unsecure",
srcs = [
"src/cpp/client/channel.h",
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/create_auth_context.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/fixed_size_thread_pool.h",
"src/cpp/server/thread_pool_interface.h",
"src/cpp/common/insecure_create_auth_context.cc",
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/create_channel_internal.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/client/insecure_credentials.cc",
"src/cpp/client/internal_stub.cc",
"src/cpp/common/call.cc",
"src/cpp/common/completion_queue.cc",
"src/cpp/common/rpc_method.cc",
@ -797,25 +800,16 @@ cc_library(
"src/cpp/util/time.cc",
],
hdrs = [
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
"include/grpc++/channel.h",
"include/grpc++/client_context.h",
"include/grpc++/completion_queue.h",
"include/grpc++/config.h",
"include/grpc++/config_protobuf.h",
"include/grpc++/create_channel.h",
"include/grpc++/credentials.h",
"include/grpc++/dynamic_thread_pool.h",
"include/grpc++/fixed_size_thread_pool.h",
"include/grpc++/generic_stub.h",
"include/grpc++/generic/async_generic_service.h",
"include/grpc++/generic/generic_stub.h",
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/proto_utils.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
@ -831,13 +825,19 @@ cc_library(
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/slice.h",
"include/grpc++/status.h",
"include/grpc++/status_code_enum.h",
"include/grpc++/stream.h",
"include/grpc++/stub_options.h",
"include/grpc++/thread_pool_interface.h",
"include/grpc++/time.h",
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
"include/grpc++/support/channel_arguments.h",
"include/grpc++/support/config.h",
"include/grpc++/support/config_protobuf.h",
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/time.h",
],
includes = [
"include",
@ -854,8 +854,8 @@ cc_library(
cc_library(
name = "grpc_plugin_support",
srcs = [
"include/grpc++/config.h",
"include/grpc++/config_protobuf.h",
"include/grpc++/support/config.h",
"include/grpc++/support/config_protobuf.h",
"src/compiler/config.h",
"src/compiler/cpp_generator.h",
"src/compiler/cpp_generator_helpers.h",

File diff suppressed because one or more lines are too long

@ -30,25 +30,16 @@
{
"name": "grpc++_base",
"public_headers": [
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
"include/grpc++/channel.h",
"include/grpc++/client_context.h",
"include/grpc++/completion_queue.h",
"include/grpc++/config.h",
"include/grpc++/config_protobuf.h",
"include/grpc++/create_channel.h",
"include/grpc++/credentials.h",
"include/grpc++/dynamic_thread_pool.h",
"include/grpc++/fixed_size_thread_pool.h",
"include/grpc++/generic_stub.h",
"include/grpc++/generic/async_generic_service.h",
"include/grpc++/generic/generic_stub.h",
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/proto_utils.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
@ -64,27 +55,36 @@
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/slice.h",
"include/grpc++/status.h",
"include/grpc++/status_code_enum.h",
"include/grpc++/stream.h",
"include/grpc++/stub_options.h",
"include/grpc++/thread_pool_interface.h",
"include/grpc++/time.h"
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
"include/grpc++/support/channel_arguments.h",
"include/grpc++/support/config.h",
"include/grpc++/support/config_protobuf.h",
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/time.h"
],
"headers": [
"src/cpp/client/channel.h",
"src/cpp/common/create_auth_context.h"
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/create_auth_context.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/fixed_size_thread_pool.h",
"src/cpp/server/thread_pool_interface.h"
],
"src": [
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/create_channel_internal.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/client/insecure_credentials.cc",
"src/cpp/client/internal_stub.cc",
"src/cpp/common/call.cc",
"src/cpp/common/completion_queue.cc",
"src/cpp/common/rpc_method.cc",
@ -697,8 +697,8 @@
"build": "protoc",
"language": "c++",
"headers": [
"include/grpc++/config.h",
"include/grpc++/config_protobuf.h",
"include/grpc++/support/config.h",
"include/grpc++/support/config_protobuf.h",
"src/compiler/config.h",
"src/compiler/cpp_generator.h",
"src/compiler/cpp_generator_helpers.h",
@ -1365,6 +1365,20 @@
"gpr"
]
},
{
"name": "grpc_channel_args_test",
"build": "test",
"language": "c",
"src": [
"test/core/channel/channel_args_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "grpc_channel_stack_test",
"build": "test",
@ -2129,21 +2143,6 @@
"gpr"
]
},
{
"name": "dynamic_thread_pool_test",
"build": "test",
"language": "c++",
"src": [
"test/cpp/server/dynamic_thread_pool_test.cc"
],
"deps": [
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "end2end_test",
"build": "test",
@ -2160,21 +2159,6 @@
"gpr"
]
},
{
"name": "fixed_size_thread_pool_test",
"build": "test",
"language": "c++",
"src": [
"test/cpp/server/fixed_size_thread_pool_test.cc"
],
"deps": [
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "generic_end2end_test",
"build": "test",
@ -2624,13 +2608,9 @@
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc_zookeeper",
"grpc",
"gpr_test_util",
"gpr"
],
"external_deps": [
"zookeeper"
]
},
{

@ -37,18 +37,16 @@
#include <string>
#include <thread>
#include <gflags/gflags.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <gflags/gflags.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/channel.h>
#include <grpc++/create_channel.h>
#include <grpc++/credentials.h>
#include <grpc++/status.h>
#include "test/cpp/util/test_config.h"
#include "examples/pubsub/publisher.h"
#include "examples/pubsub/subscriber.h"
#include "test/cpp/util/test_config.h"
DEFINE_int32(server_port, 443, "Server port.");
DEFINE_string(server_host, "pubsub-staging.googleapis.com",
@ -72,7 +70,7 @@ int main(int argc, char** argv) {
ss << FLAGS_server_host << ":" << FLAGS_server_port;
std::shared_ptr<grpc::Credentials> creds = grpc::GoogleDefaultCredentials();
std::shared_ptr<grpc::ChannelInterface> channel =
std::shared_ptr<grpc::Channel> channel =
grpc::CreateChannel(ss.str(), creds, grpc::ChannelArguments());
grpc::examples::pubsub::Publisher publisher(channel);

@ -50,7 +50,7 @@ namespace grpc {
namespace examples {
namespace pubsub {
Publisher::Publisher(std::shared_ptr<ChannelInterface> channel)
Publisher::Publisher(std::shared_ptr<Channel> channel)
: stub_(PublisherService::NewStub(channel)) {}
void Publisher::Shutdown() { stub_.reset(); }

@ -34,8 +34,7 @@
#ifndef GRPC_EXAMPLES_PUBSUB_PUBLISHER_H
#define GRPC_EXAMPLES_PUBSUB_PUBLISHER_H
#include <grpc++/channel_interface.h>
#include <grpc++/status.h>
#include <grpc++/channel.h>
#include "examples/pubsub/pubsub.grpc.pb.h"
@ -45,7 +44,7 @@ namespace pubsub {
class Publisher {
public:
Publisher(std::shared_ptr<ChannelInterface> channel);
Publisher(std::shared_ptr<Channel> channel);
void Shutdown();
Status CreateTopic(const grpc::string& topic);

@ -31,22 +31,20 @@
*
*/
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
#include <gtest/gtest.h>
#include "examples/pubsub/publisher.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
using grpc::ChannelInterface;
using grpc::Channel;
namespace grpc {
namespace testing {
@ -124,7 +122,7 @@ class PublisherTest : public ::testing::Test {
std::unique_ptr<Server> server_;
PublisherServiceImpl service_;
std::shared_ptr<ChannelInterface> channel_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::examples::pubsub::Publisher> publisher_;
};

@ -48,7 +48,7 @@ namespace grpc {
namespace examples {
namespace pubsub {
Subscriber::Subscriber(std::shared_ptr<ChannelInterface> channel)
Subscriber::Subscriber(std::shared_ptr<Channel> channel)
: stub_(SubscriberService::NewStub(channel)) {}
void Subscriber::Shutdown() { stub_.reset(); }

@ -34,8 +34,7 @@
#ifndef GRPC_EXAMPLES_PUBSUB_SUBSCRIBER_H
#define GRPC_EXAMPLES_PUBSUB_SUBSCRIBER_H
#include <grpc++/channel_interface.h>
#include <grpc++/status.h>
#include <grpc++/channel.h>
#include "examples/pubsub/pubsub.grpc.pb.h"
@ -45,7 +44,7 @@ namespace pubsub {
class Subscriber {
public:
Subscriber(std::shared_ptr<ChannelInterface> channel);
Subscriber(std::shared_ptr<Channel> channel);
void Shutdown();
Status CreateSubscription(const grpc::string& topic,

@ -31,15 +31,13 @@
*
*/
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
#include <gtest/gtest.h>
#include "examples/pubsub/subscriber.h"
@ -122,7 +120,7 @@ class SubscriberTest : public ::testing::Test {
std::unique_ptr<Server> server_;
SubscriberServiceImpl service_;
std::shared_ptr<ChannelInterface> channel_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::examples::pubsub::Subscriber> subscriber_;
};

@ -31,36 +31,49 @@
*
*/
#ifndef GRPCXX_CHANNEL_INTERFACE_H
#define GRPCXX_CHANNEL_INTERFACE_H
#ifndef GRPCXX_CHANNEL_H
#define GRPCXX_CHANNEL_H
#include <memory>
#include <grpc/grpc.h>
#include <grpc++/status.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/config.h>
struct grpc_call;
struct grpc_channel;
namespace grpc {
class Call;
class CallOpBuffer;
class ClientContext;
class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class RpcMethod;
class Credentials;
class SecureCredentials;
class ChannelInterface : public CallHook,
public std::enable_shared_from_this<ChannelInterface> {
public:
virtual ~ChannelInterface() {}
template <class R>
class ClientReader;
template <class W>
class ClientWriter;
template <class R, class W>
class ClientReaderWriter;
template <class R>
class ClientAsyncReader;
template <class W>
class ClientAsyncWriter;
template <class R, class W>
class ClientAsyncReaderWriter;
template <class R>
class ClientAsyncResponseReader;
virtual void* RegisterMethod(const char* method_name) = 0;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) = 0;
class Channel GRPC_FINAL : public GrpcLibrary,
public CallHook,
public std::enable_shared_from_this<Channel> {
public:
~Channel();
// Get the current channel state. If the channel is in IDLE and try_to_connect
// is set to true, try to connect.
virtual grpc_connectivity_state GetState(bool try_to_connect) = 0;
grpc_connectivity_state GetState(bool try_to_connect);
// Return the tag on cq when the channel state is changed or deadline expires.
// GetState needs to called to get the current state.
@ -80,13 +93,46 @@ class ChannelInterface : public CallHook,
}
private:
virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) = 0;
virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;
template <class R>
friend class ::grpc::ClientReader;
template <class W>
friend class ::grpc::ClientWriter;
template <class R, class W>
friend class ::grpc::ClientReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncReader;
template <class W>
friend class ::grpc::ClientAsyncWriter;
template <class R, class W>
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);
friend class ::grpc::RpcMethod;
friend std::shared_ptr<Channel> CreateChannelInternal(
const grpc::string& host, grpc_channel* c_channel);
Channel(const grpc::string& host, grpc_channel* c_channel);
Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq);
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call);
void* RegisterMethod(const char* method);
void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline, CompletionQueue* cq,
void* tag);
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline);
const grpc::string host_;
grpc_channel* const c_channel_; // owned
};
} // namespace grpc
#endif // GRPCXX_CHANNEL_INTERFACE_H
#endif // GRPCXX_CHANNEL_H

@ -42,16 +42,16 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc++/auth_context.h>
#include <grpc++/config.h>
#include <grpc++/status.h>
#include <grpc++/time.h>
#include <grpc++/support/auth_context.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
#include <grpc++/support/time.h>
struct census_context;
namespace grpc {
class ChannelInterface;
class Channel;
class CompletionQueue;
class Credentials;
class RpcMethod;
@ -215,20 +215,18 @@ class ClientContext {
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod& method,
friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);
grpc_call* call() { return call_; }
void set_call(grpc_call* call,
const std::shared_ptr<ChannelInterface>& channel);
void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel);
grpc::string authority() { return authority_; }
bool initial_metadata_received_;
std::shared_ptr<ChannelInterface> channel_;
std::shared_ptr<Channel> channel_;
grpc_call* call_;
gpr_timespec deadline_;
grpc::string authority_;

@ -36,8 +36,8 @@
#include <grpc/support/time.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/status.h>
#include <grpc++/time.h>
#include <grpc++/support/status.h>
#include <grpc++/support/time.h>
struct grpc_completion_queue;
@ -65,7 +65,7 @@ template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class UnknownMethodHandler;
class ChannelInterface;
class Channel;
class ClientContext;
class CompletionQueue;
class RpcMethod;
@ -143,8 +143,7 @@ class CompletionQueue : public GrpcLibrary {
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod& method,
friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);

@ -36,15 +36,14 @@
#include <memory>
#include <grpc++/config.h>
#include <grpc++/credentials.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/config.h>
namespace grpc {
class ChannelArguments;
class ChannelInterface;
// If creds does not hold an object or is invalid, a lame channel is returned.
std::shared_ptr<ChannelInterface> CreateChannel(
std::shared_ptr<Channel> CreateChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds,
const ChannelArguments& args);

@ -36,12 +36,12 @@
#include <memory>
#include <grpc++/config.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/config.h>
namespace grpc {
class ChannelArguments;
class ChannelInterface;
class Channel;
class SecureCredentials;
class Credentials : public GrpcLibrary {
@ -57,11 +57,11 @@ class Credentials : public GrpcLibrary {
virtual SecureCredentials* AsSecureCredentials() = 0;
private:
friend std::shared_ptr<ChannelInterface> CreateChannel(
friend std::shared_ptr<Channel> CreateChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds,
const ChannelArguments& args);
virtual std::shared_ptr<ChannelInterface> CreateChannel(
virtual std::shared_ptr<Channel> CreateChannel(
const grpc::string& target, const ChannelArguments& args) = 0;
};

@ -31,11 +31,11 @@
*
*/
#ifndef GRPCXX_ASYNC_GENERIC_SERVICE_H
#define GRPCXX_ASYNC_GENERIC_SERVICE_H
#ifndef GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H
#define GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H
#include <grpc++/byte_buffer.h>
#include <grpc++/stream.h>
#include <grpc++/support/byte_buffer.h>
#include <grpc++/support/async_stream.h>
struct grpc_server;
@ -75,4 +75,4 @@ class AsyncGenericService GRPC_FINAL {
} // namespace grpc
#endif // GRPCXX_ASYNC_GENERIC_SERVICE_H
#endif // GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H

@ -31,11 +31,11 @@
*
*/
#ifndef GRPCXX_GENERIC_STUB_H
#define GRPCXX_GENERIC_STUB_H
#ifndef GRPCXX_GENERIC_GENERIC_STUB_H
#define GRPCXX_GENERIC_GENERIC_STUB_H
#include <grpc++/byte_buffer.h>
#include <grpc++/stream.h>
#include <grpc++/support/async_stream.h>
#include <grpc++/support/byte_buffer.h>
namespace grpc {
@ -47,8 +47,7 @@ typedef ClientAsyncReaderWriter<ByteBuffer, ByteBuffer>
// by name.
class GenericStub GRPC_FINAL {
public:
explicit GenericStub(std::shared_ptr<ChannelInterface> channel)
: channel_(channel) {}
explicit GenericStub(std::shared_ptr<Channel> channel) : channel_(channel) {}
// begin a call to a named method
std::unique_ptr<GenericClientAsyncReaderWriter> Call(
@ -56,9 +55,9 @@ class GenericStub GRPC_FINAL {
void* tag);
private:
std::shared_ptr<ChannelInterface> channel_;
std::shared_ptr<Channel> channel_;
};
} // namespace grpc
#endif // GRPCXX_GENERIC_STUB_H
#endif // GRPCXX_GENERIC_GENERIC_STUB_H

@ -34,18 +34,17 @@
#ifndef GRPCXX_IMPL_CALL_H
#define GRPCXX_IMPL_CALL_H
#include <grpc/support/alloc.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/status.h>
#include <grpc++/impl/serialization_traits.h>
#include <functional>
#include <memory>
#include <map>
#include <cstring>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
struct grpc_call;
struct grpc_op;
@ -541,8 +540,7 @@ class CallOpSet : public CallOpSetInterface,
template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
class SneakyCallOpSet GRPC_FINAL
: public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
class SneakyCallOpSet : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base;

@ -34,21 +34,20 @@
#ifndef GRPCXX_IMPL_CLIENT_UNARY_CALL_H
#define GRPCXX_IMPL_CLIENT_UNARY_CALL_H
#include <grpc++/config.h>
#include <grpc++/status.h>
#include <grpc++/impl/call.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
namespace grpc {
class ChannelInterface;
class Channel;
class ClientContext;
class CompletionQueue;
class RpcMethod;
// Wrapper that performs a blocking unary call
template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
Status BlockingUnaryCall(Channel* channel, const RpcMethod& method,
ClientContext* context, const InputMessage& request,
OutputMessage* result) {
CompletionQueue cq;

@ -38,8 +38,8 @@
#include <grpc/grpc.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/config_protobuf.h>
#include <grpc++/status.h>
#include <grpc++/support/config_protobuf.h>
#include <grpc++/support/status.h>
namespace grpc {

@ -34,6 +34,10 @@
#ifndef GRPCXX_IMPL_RPC_METHOD_H
#define GRPCXX_IMPL_RPC_METHOD_H
#include <memory>
#include <grpc++/channel.h>
namespace grpc {
class RpcMethod {
@ -45,8 +49,14 @@ class RpcMethod {
BIDI_STREAMING
};
RpcMethod(const char* name, RpcType type, void* channel_tag)
: name_(name), method_type_(type), channel_tag_(channel_tag) {}
RpcMethod(const char* name, RpcType type)
: name_(name), method_type_(type), channel_tag_(NULL) {}
RpcMethod(const char* name, RpcType type,
const std::shared_ptr<Channel>& channel)
: name_(name),
method_type_(type),
channel_tag_(channel->RegisterMethod(name)) {}
const char* name() const { return name_; }
RpcType method_type() const { return method_type_; }

@ -39,10 +39,10 @@
#include <memory>
#include <vector>
#include <grpc++/config.h>
#include <grpc++/impl/rpc_method.h>
#include <grpc++/status.h>
#include <grpc++/stream.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
#include <grpc++/support/sync_stream.h>
namespace grpc {
class ServerContext;
@ -211,13 +211,19 @@ class BidiStreamingHandler : public MethodHandler {
// Handle unknown method by returning UNIMPLEMENTED error.
class UnknownMethodHandler : public MethodHandler {
public:
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
template <class T>
static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, "");
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_);
context->sent_initial_metadata_ = true;
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
ops->ServerSendStatus(context->trailing_metadata_, status);
}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
FillOps(param.server_context, &ops);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
@ -229,7 +235,7 @@ class RpcServiceMethod : public RpcMethod {
// Takes ownership of the handler
RpcServiceMethod(const char* name, RpcMethod::RpcType type,
MethodHandler* handler)
: RpcMethod(name, type, nullptr), handler_(handler) {}
: RpcMethod(name, type), handler_(handler) {}
MethodHandler* handler() { return handler_.get(); }

@ -34,10 +34,10 @@
#ifndef GRPCXX_IMPL_SERVICE_TYPE_H
#define GRPCXX_IMPL_SERVICE_TYPE_H
#include <grpc++/config.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/server.h>
#include <grpc++/status.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
namespace grpc {

@ -34,7 +34,7 @@
#ifndef GRPCXX_IMPL_SYNC_H
#define GRPCXX_IMPL_SYNC_H
#include <grpc++/config.h>
#include <grpc++/support/config.h>
#ifdef GRPC_CXX0X_NO_THREAD
#include <grpc++/impl/sync_no_cxx11.h>

@ -34,7 +34,7 @@
#ifndef GRPCXX_IMPL_THD_H
#define GRPCXX_IMPL_THD_H
#include <grpc++/config.h>
#include <grpc++/support/config.h>
#ifdef GRPC_CXX0X_NO_THREAD
#include <grpc++/impl/thd_no_cxx11.h>

@ -38,11 +38,11 @@
#include <memory>
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/sync.h>
#include <grpc++/status.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
struct grpc_server;
@ -98,7 +98,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
// Add a listening port. Can be called multiple times.
int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
// Start the server.
bool Start();
bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
void HandleQueueClosed();
void RunRpc();
@ -112,7 +112,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
public:
BaseAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
virtual ~BaseAsyncRequest();
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
@ -123,6 +124,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
void* const tag_;
const bool delete_on_finalize_;
grpc_call* call_;
grpc_metadata_array initial_metadata_array_;
};
@ -184,12 +186,13 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
Message* const request_;
};
class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest {
class GenericAsyncRequest : public BaseAsyncRequest {
public:
GenericAsyncRequest(Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag);
ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
@ -197,6 +200,10 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
grpc_call_details call_details_;
};
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
@ -221,7 +228,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
ServerCompletionQueue* notification_cq,
void* tag) {
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag);
tag, true);
}
const int max_message_size_;

@ -37,7 +37,7 @@
#include <memory>
#include <vector>
#include <grpc++/config.h>
#include <grpc++/support/config.h>
namespace grpc {
@ -96,13 +96,9 @@ class ServerBuilder {
std::shared_ptr<ServerCredentials> creds,
int* selected_port = nullptr);
// Set the thread pool used for running appliation rpc handlers.
// Does not take ownership.
void SetThreadPool(ThreadPoolInterface* thread_pool);
// Add a completion queue for handling asynchronous services
// Caller is required to keep this completion queue live until calling
// BuildAndStart()
// Caller is required to keep this completion queue live until
// the server is destroyed.
std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
// Return a running server which is ready for processing rpcs.

@ -39,9 +39,9 @@
#include <grpc/compression.h>
#include <grpc/support/time.h>
#include <grpc++/auth_context.h>
#include <grpc++/config.h>
#include <grpc++/time.h>
#include <grpc++/support/auth_context.h>
#include <grpc++/support/config.h>
#include <grpc++/support/time.h>
struct gpr_timespec;
struct grpc_metadata;

@ -37,7 +37,7 @@
#include <memory>
#include <vector>
#include <grpc++/config.h>
#include <grpc++/support/config.h>
struct grpc_server;

@ -31,362 +31,20 @@
*
*/
#ifndef GRPCXX_STREAM_H
#define GRPCXX_STREAM_H
#ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H
#define GRPCXX_SUPPORT_ASYNC_STREAM_H
#include <grpc++/channel_interface.h>
#include <grpc/support/log.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/server_context.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
#include <grpc++/server_context.h>
#include <grpc++/support/status.h>
namespace grpc {
// Common interface for all client side streaming.
class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read().
// This function will return either:
// - when all incoming messages have been read and the server has returned
// status
// - OR when the server has returned a non-OK status
virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
template <class R>
class ReaderInterface {
public:
virtual ~ReaderInterface() {}
// Blocking read a message and parse to msg. Returns true on success.
// The method returns false when there will be no more incoming messages,
// either because the other side has called WritesDone or the stream has
// failed (or been cancelled).
virtual bool Read(R* msg) = 0;
};
// An interface that can be fed a sequence of W messages.
template <class W>
class WriterInterface {
public:
virtual ~WriterInterface() {}
// Blocking write msg to the stream. Returns true on success.
// Returns false when the stream has been closed.
virtual bool Write(const W& msg, const WriteOptions& options) = 0;
inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
};
template <class R>
class ClientReaderInterface : public ClientStreamingInterface,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
};
template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
template <class W>
ClientReader(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(ops.SendMessage(request).ok());
ops.ClientSendClose();
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
Status Finish() GRPC_OVERRIDE {
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
private:
ClientContext* context_;
CompletionQueue cq_;
Call call_;
};
template <class W>
class ClientWriterInterface : public ClientStreamingInterface,
public WriterInterface<W> {
public:
virtual bool WritesDone() = 0;
};
template <class W>
class ClientWriter : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
template <class R>
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, R* response)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
Status status;
finish_ops_.ClientRecvStatus(context_, &status);
call_.PerformOps(&finish_ops_);
GPR_ASSERT(cq_.Pluck(&finish_ops_));
return status;
}
private:
ClientContext* context_;
CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
// Client-side interface for bi-directional streaming.
template <class W, class R>
class ClientReaderWriterInterface : public ClientStreamingInterface,
public WriterInterface<W>,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
virtual bool WritesDone() = 0;
};
template <class W, class R>
class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
public:
// Blocking create a stream.
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) return false;
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
Status Finish() GRPC_OVERRIDE {
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
private:
ClientContext* context_;
CompletionQueue cq_;
Call call_;
};
template <class R>
class ServerReader GRPC_FINAL : public ReaderInterface<R> {
public:
ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
Call* const call_;
ServerContext* const ctx_;
};
template <class W>
class ServerWriter GRPC_FINAL : public WriterInterface<W> {
public:
ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
Call* const call_;
ServerContext* const ctx_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
public ReaderInterface<R> {
public:
ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
Call* const call_;
ServerContext* const ctx_;
};
// Async interfaces
// Common interface for all client side streaming.
class ClientAsyncStreamingInterface {
@ -425,7 +83,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
public:
// Create a stream and write the first request out.
template <class W>
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
ClientAsyncReader(Channel* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
@ -484,7 +142,7 @@ template <class W>
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
public:
template <class R>
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
ClientAsyncWriter(Channel* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
R* response, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
@ -549,7 +207,7 @@ template <class W, class R>
class ClientAsyncReaderWriter GRPC_FINAL
: public ClientAsyncReaderWriterInterface<W, R> {
public:
ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
ClientAsyncReaderWriter(Channel* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
@ -761,6 +419,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
}
private:
friend class ::grpc::Server;
void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
Call call_;
@ -773,4 +433,4 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
} // namespace grpc
#endif // GRPCXX_STREAM_H
#endif // GRPCXX_SUPPORT_ASYNC_STREAM_H

@ -31,17 +31,17 @@
*
*/
#ifndef GRPCXX_ASYNC_UNARY_CALL_H
#define GRPCXX_ASYNC_UNARY_CALL_H
#ifndef GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H
#define GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H
#include <grpc++/channel_interface.h>
#include <grpc/support/log.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/server_context.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
#include <grpc++/support/status.h>
namespace grpc {
@ -58,7 +58,7 @@ class ClientAsyncResponseReader GRPC_FINAL
: public ClientAsyncResponseReaderInterface<R> {
public:
template <class W>
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
ClientAsyncResponseReader(Channel* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const W& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
@ -152,4 +152,4 @@ class ServerAsyncResponseWriter GRPC_FINAL
} // namespace grpc
#endif // GRPCXX_ASYNC_UNARY_CALL_H
#endif // GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H

@ -31,13 +31,13 @@
*
*/
#ifndef GRPCXX_AUTH_CONTEXT_H
#define GRPCXX_AUTH_CONTEXT_H
#ifndef GRPCXX_SUPPORT_AUTH_CONTEXT_H
#define GRPCXX_SUPPORT_AUTH_CONTEXT_H
#include <iterator>
#include <vector>
#include <grpc++/config.h>
#include <grpc++/support/config.h>
struct grpc_auth_context;
struct grpc_auth_property;
@ -92,4 +92,4 @@ class AuthContext {
} // namespace grpc
#endif // GRPCXX_AUTH_CONTEXT_H
#endif // GRPCXX_SUPPORT_AUTH_CONTEXT_H

@ -31,16 +31,16 @@
*
*/
#ifndef GRPCXX_BYTE_BUFFER_H
#define GRPCXX_BYTE_BUFFER_H
#ifndef GRPCXX_SUPPORT_BYTE_BUFFER_H
#define GRPCXX_SUPPORT_BYTE_BUFFER_H
#include <grpc/grpc.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/log.h>
#include <grpc++/config.h>
#include <grpc++/slice.h>
#include <grpc++/status.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/support/config.h>
#include <grpc++/support/slice.h>
#include <grpc++/support/status.h>
#include <vector>
@ -101,4 +101,4 @@ class SerializationTraits<ByteBuffer, void> {
} // namespace grpc
#endif // GRPCXX_BYTE_BUFFER_H
#endif // GRPCXX_SUPPORT_BYTE_BUFFER_H

@ -31,15 +31,15 @@
*
*/
#ifndef GRPCXX_CHANNEL_ARGUMENTS_H
#define GRPCXX_CHANNEL_ARGUMENTS_H
#ifndef GRPCXX_SUPPORT_CHANNEL_ARGUMENTS_H
#define GRPCXX_SUPPORT_CHANNEL_ARGUMENTS_H
#include <vector>
#include <list>
#include <grpc++/config.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc++/support/config.h>
namespace grpc {
namespace testing {
@ -90,4 +90,4 @@ class ChannelArguments {
} // namespace grpc
#endif // GRPCXX_CHANNEL_ARGUMENTS_H
#endif // GRPCXX_SUPPORT_CHANNEL_ARGUMENTS_H

@ -31,8 +31,8 @@
*
*/
#ifndef GRPCXX_CONFIG_H
#define GRPCXX_CONFIG_H
#ifndef GRPCXX_SUPPORT_CONFIG_H
#define GRPCXX_SUPPORT_CONFIG_H
#if !defined(GRPC_NO_AUTODETECT_PLATFORM)
@ -113,4 +113,4 @@ typedef GRPC_CUSTOM_STRING string;
} // namespace grpc
#endif // GRPCXX_CONFIG_H
#endif // GRPCXX_SUPPORT_CONFIG_H

@ -31,8 +31,8 @@
*
*/
#ifndef GRPCXX_CONFIG_PROTOBUF_H
#define GRPCXX_CONFIG_PROTOBUF_H
#ifndef GRPCXX_SUPPORT_CONFIG_PROTOBUF_H
#define GRPCXX_SUPPORT_CONFIG_PROTOBUF_H
#ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h>
@ -69,4 +69,4 @@ typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
} // namespace protobuf
} // namespace grpc
#endif // GRPCXX_CONFIG_PROTOBUF_H
#endif // GRPCXX_SUPPORT_CONFIG_PROTOBUF_H

@ -31,11 +31,11 @@
*
*/
#ifndef GRPCXX_SLICE_H
#define GRPCXX_SLICE_H
#ifndef GRPCXX_SUPPORT_SLICE_H
#define GRPCXX_SUPPORT_SLICE_H
#include <grpc/support/slice.h>
#include <grpc++/config.h>
#include <grpc++/support/config.h>
namespace grpc {
@ -71,4 +71,4 @@ class Slice GRPC_FINAL {
} // namespace grpc
#endif // GRPCXX_SLICE_H
#endif // GRPCXX_SUPPORT_SLICE_H

@ -31,11 +31,11 @@
*
*/
#ifndef GRPCXX_STATUS_H
#define GRPCXX_STATUS_H
#ifndef GRPCXX_SUPPORT_STATUS_H
#define GRPCXX_SUPPORT_STATUS_H
#include <grpc++/status_code_enum.h>
#include <grpc++/config.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status_code_enum.h>
namespace grpc {
@ -61,4 +61,4 @@ class Status {
} // namespace grpc
#endif // GRPCXX_STATUS_H
#endif // GRPCXX_SUPPORT_STATUS_H

@ -31,8 +31,8 @@
*
*/
#ifndef GRPCXX_STATUS_CODE_ENUM_H
#define GRPCXX_STATUS_CODE_ENUM_H
#ifndef GRPCXX_SUPPORT_STATUS_CODE_ENUM_H
#define GRPCXX_SUPPORT_STATUS_CODE_ENUM_H
namespace grpc {
@ -156,4 +156,4 @@ enum StatusCode {
} // namespace grpc
#endif // GRPCXX_STATUS_CODE_ENUM_H
#endif // GRPCXX_SUPPORT_STATUS_CODE_ENUM_H

@ -31,8 +31,8 @@
*
*/
#ifndef GRPCXX_STUB_OPTIONS_H
#define GRPCXX_STUB_OPTIONS_H
#ifndef GRPCXX_SUPPORT_STUB_OPTIONS_H
#define GRPCXX_SUPPORT_STUB_OPTIONS_H
namespace grpc {
@ -40,4 +40,4 @@ class StubOptions {};
} // namespace grpc
#endif // GRPCXX_STUB_OPTIONS_H
#endif // GRPCXX_SUPPORT_STUB_OPTIONS_H

@ -0,0 +1,392 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_SUPPORT_SYNC_STREAM_H
#define GRPCXX_SUPPORT_SYNC_STREAM_H
#include <grpc/support/log.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
#include <grpc++/support/status.h>
namespace grpc {
// Common interface for all client side streaming.
class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read().
// This function will return either:
// - when all incoming messages have been read and the server has returned
// status
// - OR when the server has returned a non-OK status
virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
template <class R>
class ReaderInterface {
public:
virtual ~ReaderInterface() {}
// Blocking read a message and parse to msg. Returns true on success.
// The method returns false when there will be no more incoming messages,
// either because the other side has called WritesDone or the stream has
// failed (or been cancelled).
virtual bool Read(R* msg) = 0;
};
// An interface that can be fed a sequence of W messages.
template <class W>
class WriterInterface {
public:
virtual ~WriterInterface() {}
// Blocking write msg to the stream. Returns true on success.
// Returns false when the stream has been closed.
virtual bool Write(const W& msg, const WriteOptions& options) = 0;
inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
};
template <class R>
class ClientReaderInterface : public ClientStreamingInterface,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
};
template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
template <class W>
ClientReader(Channel* channel, const RpcMethod& method,
ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(ops.SendMessage(request).ok());
ops.ClientSendClose();
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
Status Finish() GRPC_OVERRIDE {
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
private:
ClientContext* context_;
CompletionQueue cq_;
Call call_;
};
template <class W>
class ClientWriterInterface : public ClientStreamingInterface,
public WriterInterface<W> {
public:
virtual bool WritesDone() = 0;
};
template <class W>
class ClientWriter : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
template <class R>
ClientWriter(Channel* channel, const RpcMethod& method,
ClientContext* context, R* response)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
Status status;
finish_ops_.ClientRecvStatus(context_, &status);
call_.PerformOps(&finish_ops_);
GPR_ASSERT(cq_.Pluck(&finish_ops_));
return status;
}
private:
ClientContext* context_;
CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
// Client-side interface for bi-directional streaming.
template <class W, class R>
class ClientReaderWriterInterface : public ClientStreamingInterface,
public WriterInterface<W>,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
virtual bool WritesDone() = 0;
};
template <class W, class R>
class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
public:
// Blocking create a stream.
ClientReaderWriter(Channel* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) return false;
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
Status Finish() GRPC_OVERRIDE {
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
private:
ClientContext* context_;
CompletionQueue cq_;
Call call_;
};
template <class R>
class ServerReader GRPC_FINAL : public ReaderInterface<R> {
public:
ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
Call* const call_;
ServerContext* const ctx_;
};
template <class W>
class ServerWriter GRPC_FINAL : public WriterInterface<W> {
public:
ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
Call* const call_;
ServerContext* const ctx_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
public ReaderInterface<R> {
public:
ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
Call* const call_;
ServerContext* const ctx_;
};
} // namespace grpc
#endif // GRPCXX_SUPPORT_SYNC_STREAM_H

@ -31,10 +31,10 @@
*
*/
#ifndef GRPCXX_TIME_H
#define GRPCXX_TIME_H
#ifndef GRPCXX_SUPPORT_TIME_H
#define GRPCXX_SUPPORT_TIME_H
#include <grpc++/config.h>
#include <grpc++/support/config.h>
namespace grpc {
@ -107,4 +107,4 @@ class TimePoint<std::chrono::system_clock::time_point> {
#endif // !GRPC_CXX0X_NO_CHRONO
#endif // GRPCXX_TIME_H
#endif // GRPCXX_SUPPORT_TIME_H

@ -36,12 +36,15 @@
#include <stdlib.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {
#endif
/** To be used in channel arguments */
#define GRPC_COMPRESSION_ALGORITHM_ARG "grpc.compression_algorithm"
#define GRPC_COMPRESSION_ALGORITHM_STATE_ARG "grpc.compression_algorithm_state"
/* The various compression algorithms supported by GRPC */
typedef enum {
@ -60,6 +63,11 @@ typedef enum {
GRPC_COMPRESS_LEVEL_COUNT
} grpc_compression_level;
typedef struct grpc_compression_options {
gpr_uint32 enabled_algorithms_bitset; /**< All algs are enabled by default */
grpc_compression_algorithm default_compression_algorithm; /**< for channel */
} grpc_compression_options;
/** Parses the first \a name_length bytes of \a name as a
* grpc_compression_algorithm instance, updating \a algorithm. Returns 1 upon
* success, 0 otherwise. */
@ -67,9 +75,7 @@ int grpc_compression_algorithm_parse(const char *name, size_t name_length,
grpc_compression_algorithm *algorithm);
/** Updates \a name with the encoding name corresponding to a valid \a
* algorithm. Note that the string returned through \a name upon success is
* statically allocated and shouldn't be freed. Returns 1 upon success, 0
* otherwise. */
* algorithm. Returns 1 upon success, 0 otherwise. */
int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name);
@ -85,6 +91,20 @@ grpc_compression_level grpc_compression_level_for_algorithm(
grpc_compression_algorithm grpc_compression_algorithm_for_level(
grpc_compression_level level);
void grpc_compression_options_init(grpc_compression_options *opts);
/** Mark \a algorithm as enabled in \a opts. */
void grpc_compression_options_enable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm);
/** Mark \a algorithm as disabled in \a opts. */
void grpc_compression_options_disable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm);
/** Returns true if \a algorithm is marked as enabled in \a opts. */
int grpc_compression_options_is_algorithm_enabled(
const grpc_compression_options *opts, grpc_compression_algorithm algorithm);
#ifdef __cplusplus
}
#endif

@ -589,9 +589,14 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
THREAD SAFETY: grpc_call_destroy is thread-compatible */
void grpc_call_destroy(grpc_call *call);
/** Request notification of a new call. 'cq_for_notification' must
have been registered to the server via
grpc_server_register_completion_queue. */
/** Request notification of a new call.
Once a call is received, a notification tagged with \a tag_new is added to
\a cq_for_notification. \a call, \a details and \a request_metadata are
updated with the appropriate call information. \a cq_bound_to_call is bound
to \a call, and batch operation notifications for that call will be posted
to \a cq_bound_to_call.
Note that \a cq_for_notification must have been registered to the server via
\a grpc_server_register_completion_queue. */
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata,

@ -275,12 +275,18 @@ int grpc_auth_context_set_peer_identity_property_name(grpc_auth_context *ctx,
/* --- Auth Metadata Processing --- */
/* Callback function that is called when the metadata processing is done.
success is 1 if processing succeeded, 0 otherwise.
Consumed metadata will be removed from the set of metadata available on the
call. */
- Consumed metadata will be removed from the set of metadata available on the
call. consumed_md may be NULL if no metadata has been consumed.
- Response metadata will be set on the response. response_md may be NULL.
- status is GRPC_STATUS_OK for success or a specific status for an error.
Common error status for auth metadata processing is either
GRPC_STATUS_UNAUTHENTICATED in case of an authentication failure or
GRPC_STATUS PERMISSION_DENIED in case of an authorization failure.
- error_details gives details about the error. May be NULL. */
typedef void (*grpc_process_auth_metadata_done_cb)(
void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
int success);
const grpc_metadata *response_md, size_t num_response_md,
grpc_status_code status, const char *error_details);
/* Pluggable server-side metadata processor object. */
typedef struct {

@ -34,8 +34,8 @@
#ifndef SRC_COMPILER_CONFIG_H
#define SRC_COMPILER_CONFIG_H
#include <grpc++/config.h>
#include <grpc++/config_protobuf.h>
#include <grpc++/support/config.h>
#include <grpc++/support/config_protobuf.h>
#ifndef GRPC_CUSTOM_DESCRIPTOR
#include <google/protobuf/descriptor.h>

@ -112,18 +112,18 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file,
grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string temp =
"#include <grpc++/impl/internal_stub.h>\n"
"#include <grpc++/support/async_stream.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
"#include <grpc++/impl/proto_utils.h>\n"
"#include <grpc++/impl/service_type.h>\n"
"#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/status.h>\n"
"#include <grpc++/stream.h>\n"
"#include <grpc++/stub_options.h>\n"
"#include <grpc++/support/async_unary_call.h>\n"
"#include <grpc++/support/status.h>\n"
"#include <grpc++/support/stub_options.h>\n"
"#include <grpc++/support/sync_stream.h>\n"
"\n"
"namespace grpc {\n"
"class CompletionQueue;\n"
"class ChannelInterface;\n"
"class Channel;\n"
"class RpcService;\n"
"class ServerCompletionQueue;\n"
"class ServerContext;\n"
@ -554,17 +554,17 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
printer->Outdent();
printer->Print("};\n");
printer->Print(
"class Stub GRPC_FINAL : public StubInterface,"
" public ::grpc::InternalStub {\n public:\n");
"class Stub GRPC_FINAL : public StubInterface"
" {\n public:\n");
printer->Indent();
printer->Print(
"Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n");
printer->Print("Stub(const std::shared_ptr< ::grpc::Channel>& channel);\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i), vars, true);
}
printer->Outdent();
printer->Print("\n private:\n");
printer->Indent();
printer->Print("std::shared_ptr< ::grpc::Channel> channel_;\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i), vars, false);
}
@ -575,7 +575,7 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
printer->Print("};\n");
printer->Print(
"static std::unique_ptr<Stub> NewStub(const std::shared_ptr< "
"::grpc::ChannelInterface>& channel, "
"::grpc::Channel>& channel, "
"const ::grpc::StubOptions& options = ::grpc::StubOptions());\n");
printer->Print("\n");
@ -702,12 +702,13 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file,
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
printer.Print(vars, "#include <grpc++/async_unary_call.h>\n");
printer.Print(vars, "#include <grpc++/channel_interface.h>\n");
printer.Print(vars, "#include <grpc++/channel.h>\n");
printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n");
printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n");
printer.Print(vars, "#include <grpc++/impl/service_type.h>\n");
printer.Print(vars, "#include <grpc++/stream.h>\n");
printer.Print(vars, "#include <grpc++/support/async_unary_call.h>\n");
printer.Print(vars, "#include <grpc++/support/async_stream.h>\n");
printer.Print(vars, "#include <grpc++/support/sync_stream.h>\n");
if (!file->package().empty()) {
std::vector<grpc::string> parts =
@ -738,7 +739,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n");
printer->Print(*vars,
" return ::grpc::BlockingUnaryCall(channel(), "
" return ::grpc::BlockingUnaryCall(channel_.get(), "
"rpcmethod_$Method$_, "
"context, request, response);\n"
"}\n\n");
@ -751,7 +752,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
printer->Print(*vars,
" return new "
"::grpc::ClientAsyncResponseReader< $Response$>("
"channel(), cq, "
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, request);\n"
"}\n\n");
@ -762,7 +763,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Print(*vars,
" return new ::grpc::ClientWriter< $Request$>("
"channel(), "
"channel_.get(), "
"rpcmethod_$Method$_, "
"context, response);\n"
"}\n\n");
@ -773,7 +774,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return new ::grpc::ClientAsyncWriter< $Request$>("
"channel(), cq, "
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, response, tag);\n"
"}\n\n");
@ -785,7 +786,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$& request) {\n");
printer->Print(*vars,
" return new ::grpc::ClientReader< $Response$>("
"channel(), "
"channel_.get(), "
"rpcmethod_$Method$_, "
"context, request);\n"
"}\n\n");
@ -796,7 +797,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return new ::grpc::ClientAsyncReader< $Response$>("
"channel(), cq, "
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, request, tag);\n"
"}\n\n");
@ -808,7 +809,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
printer->Print(*vars,
" return new ::grpc::ClientReaderWriter< "
"$Request$, $Response$>("
"channel(), "
"channel_.get(), "
"rpcmethod_$Method$_, "
"context);\n"
"}\n\n");
@ -820,7 +821,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
printer->Print(*vars,
" return new "
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>("
"channel(), cq, "
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, tag);\n"
"}\n\n");
@ -964,20 +965,19 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
}
printer->Print(*vars, "};\n\n");
printer->Print(
*vars,
"std::unique_ptr< $ns$$Service$::Stub> $ns$$Service$::NewStub("
"const std::shared_ptr< ::grpc::ChannelInterface>& channel, "
"const ::grpc::StubOptions& options) {\n"
" std::unique_ptr< $ns$$Service$::Stub> stub(new "
"$ns$$Service$::Stub(channel));\n"
" return stub;\n"
"}\n\n");
printer->Print(*vars,
"std::unique_ptr< $ns$$Service$::Stub> $ns$$Service$::NewStub("
"const std::shared_ptr< ::grpc::Channel>& channel, "
"const ::grpc::StubOptions& options) {\n"
" std::unique_ptr< $ns$$Service$::Stub> stub(new "
"$ns$$Service$::Stub(channel));\n"
" return stub;\n"
"}\n\n");
printer->Print(*vars,
"$ns$$Service$::Stub::Stub(const std::shared_ptr< "
"::grpc::ChannelInterface>& channel)\n");
"::grpc::Channel>& channel)\n");
printer->Indent();
printer->Print(": ::grpc::InternalStub(channel)");
printer->Print(": channel_(channel)");
for (int i = 0; i < service->method_count(); ++i) {
const grpc::protobuf::MethodDescriptor *method = service->method(i);
(*vars)["Method"] = method->name();
@ -991,13 +991,12 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else {
(*vars)["StreamingType"] = "BIDI_STREAMING";
}
printer->Print(
*vars,
", rpcmethod_$Method$_("
"$prefix$$Service$_method_names[$Idx$], "
"::grpc::RpcMethod::$StreamingType$, "
"channel->RegisterMethod($prefix$$Service$_method_names[$Idx$])"
")\n");
printer->Print(*vars,
", rpcmethod_$Method$_("
"$prefix$$Service$_method_names[$Idx$], "
"::grpc::RpcMethod::$StreamingType$, "
"channel"
")\n");
}
printer->Print("{}\n\n");
printer->Outdent();

@ -42,7 +42,7 @@
#include <tuple>
#include <vector>
#include <grpc++/config.h>
#include <grpc++/support/config.h>
#include "src/compiler/config.h"
#include "src/compiler/generator_helpers.h"
#include "src/compiler/python_generator.h"

@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include <string.h>
@ -146,3 +147,65 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
tmp.value.integer = algorithm;
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
/** Returns 1 if the argument for compression algorithm's enabled states bitset
* was found in \a a, returning the arg's value in \a states. Otherwise, returns
* 0. */
static int find_compression_algorithm_states_bitset(
const grpc_channel_args *a, int **states_arg) {
if (a != NULL) {
size_t i;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
!strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
*states_arg = &a->args[i].value.integer;
return 1; /* GPR_TRUE */
}
}
}
return 0; /* GPR_FALSE */
}
grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
grpc_channel_args **a,
grpc_compression_algorithm algorithm,
int state) {
int *states_arg;
grpc_channel_args *result = *a;
const int states_arg_found =
find_compression_algorithm_states_bitset(*a, &states_arg);
if (states_arg_found) {
if (state != 0) {
GPR_BITSET(states_arg, algorithm);
} else {
GPR_BITCLEAR(states_arg, algorithm);
}
} else {
/* create a new arg */
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
/* all enabled by default */
tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
if (state != 0) {
GPR_BITSET(&tmp.value.integer, algorithm);
} else {
GPR_BITCLEAR(&tmp.value.integer, algorithm);
}
result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
grpc_channel_args_destroy(*a);
*a = result;
}
return result;
}
int grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a) {
int *states_arg;
if (find_compression_algorithm_states_bitset(a, &states_arg)) {
return *states_arg;
} else {
return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
}
}

@ -67,4 +67,24 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm);
/** Sets the support for the given compression algorithm. By default, all
* compression algorithms are enabled. It's an error to disable an algorithm set
* by grpc_channel_args_set_compression_algorithm.
*
* Returns an instance will the updated algorithm states. The \a a pointer is
* modified to point to the returned instance (which may be different from the
* input value of \a a). */
grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
grpc_channel_args **a,
grpc_compression_algorithm algorithm,
int enabled);
/** Returns the bitset representing the support state (true for enabled, false
* for disabled) for compression algorithms.
*
* The i-th bit of the returned bitset corresponds to the i-th entry in the
* grpc_compression_algorithm enum. */
int grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */

@ -74,10 +74,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
not be released by grpc_pollset_work AFTER worker has been destroyed.
Returns true if some work has been done, and false if the deadline
expired. */
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline);
Tries not to block past deadline. */
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset.
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.

@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work(
pfds[1].events = POLLIN;
pfds[1].revents = 0;
poll_rv = poll(pfds, 2, timeout_ms);
poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
if (poll_rv < 0) {
if (errno != EINTR) {

@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work(
POLLOUT, &watchers[i]);
}
r = poll(pfds, pfd_count, timeout);
r = grpc_poll_function(pfds, pfd_count, timeout);
for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,

@ -38,7 +38,6 @@
#include "src/core/iomgr/pollset_posix.h"
#include <errno.h>
#include <poll.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
@ -57,6 +56,8 @@
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
grpc_poll_function_type grpc_poll_function = poll;
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
worker->next->prev = worker->prev;
@ -89,6 +90,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
}
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
/* pollset->mu already held */
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker = p->root_worker.next;
@ -168,14 +170,10 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) {
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
int added_worker = 0;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
@ -217,7 +215,6 @@ done:
gpr_mu_lock(&pollset->mu);
}
}
return 1;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */
r = poll(pfd, nfds, timeout);
r = grpc_poll_function(pfd, nfds, timeout);
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {

@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#include <poll.h>
#include <grpc/support/sync.h>
#include "src/core/iomgr/wakeup_fd_posix.h"
@ -118,4 +120,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
* be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset);
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */

@ -99,14 +99,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) {
gpr_timespec now;
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline) {
int added_worker = 0;
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
worker->next = worker->prev = NULL;
gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
@ -127,7 +122,6 @@ done:
if (added_worker) {
remove_worker(pollset, worker);
}
return 1 /* GPR_TRUE */;
}
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {

@ -115,7 +115,7 @@ static int is_stack_running_on_compute_engine(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&detector.pollset, &worker,
grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));

@ -575,6 +575,16 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
if (!check_request_metadata_creds(request_metadata_creds)) {
goto error;
}
if (config->pem_root_certs == NULL) {
pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs);
if (pem_root_certs == NULL || pem_root_certs_size == 0) {
gpr_log(GPR_ERROR, "Could not get default pem root certs.");
goto error;
}
} else {
pem_root_certs = config->pem_root_certs;
pem_root_certs_size = config->pem_root_certs_size;
}
c = gpr_malloc(sizeof(grpc_ssl_channel_security_connector));
memset(c, 0, sizeof(grpc_ssl_channel_security_connector));
@ -590,16 +600,6 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
if (overridden_target_name != NULL) {
c->overridden_target_name = gpr_strdup(overridden_target_name);
}
if (config->pem_root_certs == NULL) {
pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs);
if (pem_root_certs == NULL || pem_root_certs_size == 0) {
gpr_log(GPR_ERROR, "Could not get default pem root certs.");
goto error;
}
} else {
pem_root_certs = config->pem_root_certs;
pem_root_certs_size = config->pem_root_certs_size;
}
result = tsi_create_ssl_client_handshaker_factory(
config->pem_private_key, config->pem_private_key_size,
config->pem_cert_chain, config->pem_cert_chain_size, pem_root_certs,

@ -104,24 +104,34 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) {
return md;
}
static void on_md_processing_done(void *user_data,
const grpc_metadata *consumed_md,
size_t num_consumed_md, int success) {
static void on_md_processing_done(
void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
const grpc_metadata *response_md, size_t num_response_md,
grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (success) {
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md,
elem);
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1);
} else {
gpr_slice message = gpr_slice_from_copied_string(
"Authentication metadata processing failed.");
gpr_slice message;
error_details = error_details != NULL
? error_details
: "Authentication metadata processing failed.";
message = gpr_slice_from_copied_string(error_details);
grpc_sopb_reset(calld->recv_ops);
grpc_transport_stream_op_add_close(&calld->transport_op,
GRPC_STATUS_UNAUTHENTICATED, &message);
grpc_transport_stream_op_add_close(&calld->transport_op, status, &message);
grpc_call_next_op(elem, &calld->transport_op);
}
}

@ -170,6 +170,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker worker;
int first_loop = 1;
gpr_timespec now;
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@ -196,12 +199,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
first_loop = 0;
grpc_pollset_work(&cc->pollset, &worker, now, deadline);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
@ -239,6 +245,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker worker;
gpr_timespec now;
int first_loop = 1;
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@ -281,13 +290,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
first_loop = 0;
grpc_pollset_work(&cc->pollset, &worker, now, deadline);
del_plucker(cc, tag, &worker);
}
done:

@ -66,6 +66,8 @@ typedef struct {
size_t header_idx;
/* was the last frame emitted a header? (if yes, we'll need a CONTINUATION */
gpr_uint8 last_was_header;
/* have we seen a regular (non-colon-prefixed) header yet? */
gpr_uint8 seen_regular_header;
/* output stream id */
gpr_uint32 stream_id;
gpr_slice_buffer *output;
@ -361,6 +363,15 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
gpr_uint32 indices_key;
int should_add_elem;
GPR_ASSERT (GPR_SLICE_LENGTH(elem->key->slice) > 0);
if (GPR_SLICE_START_PTR(elem->key->slice)[0] != ':') { /* regular header */
st->seen_regular_header = 1;
} else if (st->seen_regular_header != 0) { /* reserved header */
gpr_log(GPR_ERROR,
"Reserved header (colon-prefixed) happening after regular ones.");
abort();
}
inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems);
/* is this elem currently in the decoders table? */
@ -566,6 +577,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
st.cur_frame_type = NONE;
st.last_was_header = 0;
st.seen_regular_header = 0;
st.stream_id = stream_id;
st.output = output;

@ -31,29 +31,26 @@
*
*/
#include "src/cpp/client/channel.h"
#include <grpc++/channel.h>
#include <memory>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
#include "src/core/profiling/timers.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/credentials.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/rpc_method.h>
#include <grpc++/status.h>
#include <grpc++/time.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
#include <grpc++/support/time.h>
#include "src/core/profiling/timers.h"
namespace grpc {
Channel::Channel(grpc_channel* channel) : c_channel_(channel) {}
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {}

@ -1,80 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CPP_CLIENT_CHANNEL_H
#define GRPC_INTERNAL_CPP_CLIENT_CHANNEL_H
#include <memory>
#include <grpc++/channel_interface.h>
#include <grpc++/config.h>
#include <grpc++/impl/grpc_library.h>
struct grpc_channel;
namespace grpc {
class Call;
class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class Credentials;
class StreamContextInterface;
class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
public:
explicit Channel(grpc_channel* c_channel);
Channel(const grpc::string& host, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
void* RegisterMethod(const char* method) GRPC_OVERRIDE;
Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
grpc_connectivity_state GetState(bool try_to_connect) GRPC_OVERRIDE;
private:
void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline, CompletionQueue* cq,
void* tag) GRPC_OVERRIDE;
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) GRPC_OVERRIDE;
const grpc::string host_;
grpc_channel* const c_channel_; // owned
};
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_CLIENT_CHANNEL_H

@ -31,10 +31,9 @@
*
*/
#include <grpc++/channel_arguments.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc/support/log.h>
#include "src/core/channel/channel_args.h"
namespace grpc {

@ -38,7 +38,7 @@
#include <grpc/support/string_util.h>
#include <grpc++/credentials.h>
#include <grpc++/server_context.h>
#include <grpc++/time.h>
#include <grpc++/support/time.h>
#include "src/core/channel/compress_filter.h"
#include "src/cpp/common/create_auth_context.h"
@ -71,7 +71,7 @@ void ClientContext::AddMetadata(const grpc::string& meta_key,
}
void ClientContext::set_call(grpc_call* call,
const std::shared_ptr<ChannelInterface>& channel) {
const std::shared_ptr<Channel>& channel) {
GPR_ASSERT(call_ == nullptr);
call_ = call;
channel_ = channel;

@ -34,15 +34,16 @@
#include <memory>
#include <sstream>
#include "src/cpp/client/channel.h"
#include <grpc++/channel_interface.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/channel.h>
#include <grpc++/create_channel.h>
#include <grpc++/support/channel_arguments.h>
#include "src/cpp/client/create_channel_internal.h"
namespace grpc {
class ChannelArguments;
std::shared_ptr<ChannelInterface> CreateChannel(
std::shared_ptr<Channel> CreateChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds,
const ChannelArguments& args) {
ChannelArguments cp_args = args;
@ -50,10 +51,10 @@ std::shared_ptr<ChannelInterface> CreateChannel(
user_agent_prefix << "grpc-c++/" << grpc_version_string();
cp_args.SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING,
user_agent_prefix.str());
return creds ? creds->CreateChannel(target, cp_args)
: std::shared_ptr<ChannelInterface>(
new Channel(grpc_lame_client_channel_create(
NULL, GRPC_STATUS_INVALID_ARGUMENT,
"Invalid credentials.")));
return creds
? creds->CreateChannel(target, cp_args)
: CreateChannelInternal("", grpc_lame_client_channel_create(
NULL, GRPC_STATUS_INVALID_ARGUMENT,
"Invalid credentials."));
}
} // namespace grpc

@ -31,6 +31,16 @@
*
*/
#include <grpc++/impl/internal_stub.h>
#include <memory>
namespace grpc {} // namespace grpc
#include <grpc++/channel.h>
struct grpc_channel;
namespace grpc {
std::shared_ptr<Channel> CreateChannelInternal(const grpc::string& host,
grpc_channel* c_channel) {
return std::shared_ptr<Channel>(new Channel(host, c_channel));
}
} // namespace grpc

@ -31,27 +31,21 @@
*
*/
#ifndef GRPCXX_IMPL_INTERNAL_STUB_H
#define GRPCXX_IMPL_INTERNAL_STUB_H
#ifndef GRPC_INTERNAL_CPP_CLIENT_CREATE_CHANNEL_INTERNAL_H
#define GRPC_INTERNAL_CPP_CLIENT_CREATE_CHANNEL_INTERNAL_H
#include <memory>
#include <grpc++/channel_interface.h>
#include <grpc++/support/config.h>
namespace grpc {
class InternalStub {
public:
InternalStub(const std::shared_ptr<ChannelInterface>& channel)
: channel_(channel) {}
virtual ~InternalStub() {}
struct grpc_channel;
ChannelInterface* channel() { return channel_.get(); }
namespace grpc {
class Channel;
private:
const std::shared_ptr<ChannelInterface> channel_;
};
std::shared_ptr<Channel> CreateChannelInternal(const grpc::string& host,
grpc_channel* c_channel);
} // namespace grpc
#endif // GRPCXX_IMPL_INTERNAL_STUB_H
#endif // GRPC_INTERNAL_CPP_CLIENT_CREATE_CHANNEL_INTERNAL_H

@ -31,7 +31,7 @@
*
*/
#include <grpc++/generic_stub.h>
#include <grpc++/generic/generic_stub.h>
#include <grpc++/impl/rpc_method.h>
@ -44,8 +44,7 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call(
return std::unique_ptr<GenericClientAsyncReaderWriter>(
new GenericClientAsyncReaderWriter(
channel_.get(), cq,
RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING, nullptr),
context, tag));
RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag));
}
} // namespace grpc

@ -31,25 +31,27 @@
*
*/
#include <grpc++/credentials.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/config.h>
#include <grpc++/credentials.h>
#include "src/cpp/client/channel.h"
#include <grpc++/channel.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/config.h>
#include "src/cpp/client/create_channel_internal.h"
namespace grpc {
namespace {
class InsecureCredentialsImpl GRPC_FINAL : public Credentials {
public:
std::shared_ptr<grpc::ChannelInterface> CreateChannel(
std::shared_ptr<grpc::Channel> CreateChannel(
const string& target, const grpc::ChannelArguments& args) GRPC_OVERRIDE {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel(
grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr)));
return CreateChannelInternal(
"",
grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr));
}
// InsecureCredentials should not be applied to a call.

@ -31,9 +31,9 @@
*
*/
#include <grpc++/channel_arguments.h>
#include <grpc/grpc_security.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc/grpc_security.h>
#include "src/core/channel/channel_args.h"
namespace grpc {

@ -32,21 +32,21 @@
*/
#include <grpc/support/log.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/channel.h>
#include <grpc++/impl/grpc_library.h>
#include "src/cpp/client/channel.h"
#include <grpc++/support/channel_arguments.h>
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/client/secure_credentials.h"
namespace grpc {
std::shared_ptr<grpc::ChannelInterface> SecureCredentials::CreateChannel(
std::shared_ptr<grpc::Channel> SecureCredentials::CreateChannel(
const string& target, const grpc::ChannelArguments& args) {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel(
return CreateChannelInternal(
args.GetSslTargetNameOverride(),
grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args)));
grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args));
}
bool SecureCredentials::ApplyToCall(grpc_call* call) {

@ -36,7 +36,7 @@
#include <grpc/grpc_security.h>
#include <grpc++/config.h>
#include <grpc++/support/config.h>
#include <grpc++/credentials.h>
namespace grpc {
@ -48,7 +48,7 @@ class SecureCredentials GRPC_FINAL : public Credentials {
grpc_credentials* GetRawCreds() { return c_creds_; }
bool ApplyToCall(grpc_call* call) GRPC_OVERRIDE;
std::shared_ptr<grpc::ChannelInterface> CreateChannel(
std::shared_ptr<grpc::Channel> CreateChannel(
const string& target, const grpc::ChannelArguments& args) GRPC_OVERRIDE;
SecureCredentials* AsSecureCredentials() GRPC_OVERRIDE { return this; }

@ -31,7 +31,7 @@
*
*/
#include <grpc++/auth_context.h>
#include <grpc++/support/auth_context.h>
#include <grpc/grpc_security.h>

@ -34,10 +34,9 @@
#include <grpc++/impl/call.h>
#include <grpc/support/alloc.h>
#include <grpc++/byte_buffer.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/channel_interface.h>
#include <grpc++/support/byte_buffer.h>
#include "src/core/profiling/timers.h"
namespace grpc {

@ -36,7 +36,7 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc++/time.h>
#include <grpc++/support/time.h>
namespace grpc {

@ -33,7 +33,7 @@
#include <memory>
#include <grpc/grpc.h>
#include <grpc++/auth_context.h>
#include <grpc++/support/auth_context.h>
namespace grpc {

@ -33,7 +33,7 @@
#include <memory>
#include <grpc/grpc.h>
#include <grpc++/auth_context.h>
#include <grpc++/support/auth_context.h>
namespace grpc {

@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CPP_COMMON_SECURE_AUTH_CONTEXT_H
#define GRPC_INTERNAL_CPP_COMMON_SECURE_AUTH_CONTEXT_H
#include <grpc++/auth_context.h>
#include <grpc++/support/auth_context.h>
struct grpc_auth_context;

@ -34,7 +34,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc++/auth_context.h>
#include <grpc++/support/auth_context.h>
#include "src/cpp/common/secure_auth_context.h"
namespace grpc {

@ -32,7 +32,6 @@
*/
#include <grpc++/impl/proto_utils.h>
#include <grpc++/config.h>
#include <grpc/grpc.h>
#include <grpc/byte_buffer.h>
@ -40,6 +39,7 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/port_platform.h>
#include <grpc++/support/config.h>
const int kMaxBufferLength = 8192;

@ -31,7 +31,7 @@
*
*/
#include <grpc++/async_generic_service.h>
#include <grpc++/generic/async_generic_service.h>
#include <grpc++/server.h>

@ -32,7 +32,8 @@
*/
#include <grpc/support/cpu.h>
#include <grpc++/dynamic_thread_pool.h>
#include "src/cpp/server/dynamic_thread_pool.h"
#ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL

@ -33,7 +33,8 @@
#include <grpc++/impl/sync.h>
#include <grpc++/impl/thd.h>
#include <grpc++/dynamic_thread_pool.h>
#include "src/cpp/server/dynamic_thread_pool.h"
namespace grpc {
DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)

@ -31,18 +31,18 @@
*
*/
#ifndef GRPCXX_DYNAMIC_THREAD_POOL_H
#define GRPCXX_DYNAMIC_THREAD_POOL_H
#ifndef GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H
#define GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H
#include <grpc++/config.h>
#include <list>
#include <memory>
#include <queue>
#include <grpc++/impl/sync.h>
#include <grpc++/impl/thd.h>
#include <grpc++/thread_pool_interface.h>
#include <grpc++/support/config.h>
#include <list>
#include <memory>
#include <queue>
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
@ -80,4 +80,4 @@ class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface {
} // namespace grpc
#endif // GRPCXX_DYNAMIC_THREAD_POOL_H
#endif // GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H

@ -33,7 +33,7 @@
#include <grpc++/impl/sync.h>
#include <grpc++/impl/thd.h>
#include <grpc++/fixed_size_thread_pool.h>
#include "src/cpp/server/fixed_size_thread_pool.h"
namespace grpc {

@ -31,17 +31,17 @@
*
*/
#ifndef GRPCXX_FIXED_SIZE_THREAD_POOL_H
#define GRPCXX_FIXED_SIZE_THREAD_POOL_H
#ifndef GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H
#define GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H
#include <grpc++/config.h>
#include <queue>
#include <vector>
#include <grpc++/impl/sync.h>
#include <grpc++/impl/thd.h>
#include <grpc++/thread_pool_interface.h>
#include <grpc++/support/config.h>
#include <queue>
#include <vector>
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
@ -64,4 +64,4 @@ class FixedSizeThreadPool GRPC_FINAL : public ThreadPoolInterface {
} // namespace grpc
#endif // GRPCXX_FIXED_SIZE_THREAD_POOL_H
#endif // GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H

@ -34,10 +34,10 @@
#ifndef GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H
#define GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H
#include <grpc/grpc_security.h>
#include <grpc++/server_credentials.h>
#include <grpc/grpc_security.h>
namespace grpc {
class SecureServerCredentials GRPC_FINAL : public ServerCredentials {

@ -32,24 +32,71 @@
*/
#include <grpc++/server.h>
#include <utility>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
#include <grpc++/async_generic_service.h>
#include <grpc++/generic/async_generic_service.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
#include <grpc++/thread_pool_interface.h>
#include <grpc++/time.h>
#include <grpc++/support/time.h>
#include "src/core/profiling/timers.h"
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
class Server::UnimplementedAsyncRequestContext {
protected:
UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
GenericServerContext server_context_;
GenericServerAsyncReaderWriter generic_stream_;
};
class Server::UnimplementedAsyncRequest GRPC_FINAL
: public UnimplementedAsyncRequestContext,
public GenericAsyncRequest {
public:
UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
: GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
NULL, false),
server_(server),
cq_(cq) {}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
ServerContext* context() { return &server_context_; }
GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
private:
Server* const server_;
ServerCompletionQueue* const cq_;
};
typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
UnimplementedAsyncResponseOp;
class Server::UnimplementedAsyncResponse GRPC_FINAL
: public UnimplementedAsyncResponseOp {
public:
UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
~UnimplementedAsyncResponse() { delete request_; }
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
delete this;
return r;
}
private:
UnimplementedAsyncRequest* const request_;
};
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
@ -297,18 +344,23 @@ int Server::AddListeningPort(const grpc::string& addr,
return creds->AddPortToServer(addr, server_);
}
bool Server::Start() {
bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_);
started_ = true;
grpc_server_start(server_);
if (!has_generic_service_) {
unknown_method_.reset(new RpcServiceMethod(
"unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
// Use of emplace_back with just constructor arguments is not accepted here
// by gcc-4.4 because it can't match the anonymous nullptr with a proper
// constructor implicitly. Construct the object and use push_back.
sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
if (!sync_methods_->empty()) {
unknown_method_.reset(new RpcServiceMethod(
"unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
// Use of emplace_back with just constructor arguments is not accepted
// here by gcc-4.4 because it can't match the anonymous nullptr with a
// proper constructor implicitly. Construct the object and use push_back.
sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
}
for (size_t i = 0; i < num_cqs; i++) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
// Start processing rpcs.
if (!sync_methods_->empty()) {
@ -370,12 +422,14 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
Server::BaseAsyncRequest::BaseAsyncRequest(
Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
bool delete_on_finalize)
: server_(server),
context_(context),
stream_(stream),
call_cq_(call_cq),
tag_(tag),
delete_on_finalize_(delete_on_finalize),
call_(nullptr) {
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
@ -402,14 +456,16 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
// just the pointers inside call are copied here
stream_->BindCall(&call);
*tag = tag_;
delete this;
if (delete_on_finalize_) {
delete this;
}
return true;
}
Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag) {}
: BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void Server::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
@ -423,8 +479,9 @@ void Server::RegisteredAsyncRequest::IssueRequest(
Server::GenericAsyncRequest::GenericAsyncRequest(
Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag) {
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag,
delete_on_finalize) {
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
@ -445,6 +502,25 @@ bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
return BaseAsyncRequest::FinalizeResult(tag, status);
}
bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
bool* status) {
if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
new UnimplementedAsyncRequest(server_, cq_);
new UnimplementedAsyncResponse(this);
} else {
delete this;
}
return false;
}
Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
UnimplementedAsyncRequest* request)
: request_(request) {
Status status(StatusCode::UNIMPLEMENTED, "");
UnknownMethodHandler::FillOps(request_->context(), this);
request_->stream()->call_.PerformOps(this);
}
void Server::ScheduleCallback() {
{
grpc::unique_lock<grpc::mutex> lock(mu_);

@ -37,8 +37,8 @@
#include <grpc/support/log.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
#include <grpc++/thread_pool_interface.h>
#include <grpc++/fixed_size_thread_pool.h>
#include "src/cpp/server/thread_pool_interface.h"
#include "src/cpp/server/fixed_size_thread_pool.h"
namespace grpc {
@ -89,10 +89,6 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr,
ports_.push_back(port);
}
void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) {
thread_pool_ = thread_pool;
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
bool thread_pool_owned = false;
if (!async_services_.empty() && !services_.empty()) {
@ -103,12 +99,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true;
}
// Async services only, create a thread pool to handle requests to unknown
// services.
if (!thread_pool_ && !generic_service_ && !async_services_.empty()) {
thread_pool_ = new FixedSizeThreadPool(1);
thread_pool_owned = true;
}
std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
@ -138,7 +128,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
*port->selected_port = r;
}
}
if (!server->Start()) {
if (!server->Start(&cqs_[0], cqs_.size())) {
return nullptr;
}
return server;

@ -38,7 +38,7 @@
#include <grpc/support/log.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/sync.h>
#include <grpc++/time.h>
#include <grpc++/support/time.h>
#include "src/core/channel/compress_filter.h"
#include "src/cpp/common/create_auth_context.h"

@ -31,8 +31,8 @@
*
*/
#ifndef GRPCXX_THREAD_POOL_INTERFACE_H
#define GRPCXX_THREAD_POOL_INTERFACE_H
#ifndef GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H
#define GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H
#include <functional>
@ -51,4 +51,4 @@ ThreadPoolInterface* CreateDefaultThreadPool();
} // namespace grpc
#endif // GRPCXX_THREAD_POOL_INTERFACE_H
#endif // GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H

@ -32,7 +32,7 @@
*/
#include <grpc/byte_buffer_reader.h>
#include <grpc++/byte_buffer.h>
#include <grpc++/support/byte_buffer.h>
namespace grpc {

@ -31,7 +31,7 @@
*
*/
#include <grpc++/slice.h>
#include <grpc++/support/slice.h>
namespace grpc {

@ -31,7 +31,7 @@
*
*/
#include <grpc++/status.h>
#include <grpc++/support/status.h>
namespace grpc {

@ -31,12 +31,12 @@
*
*/
#include <grpc++/config.h>
#include <grpc++/support/config.h>
#ifndef GRPC_CXX0X_NO_CHRONO
#include <grpc/support/time.h>
#include <grpc++/time.h>
#include <grpc++/support/time.h>
using std::chrono::duration_cast;
using std::chrono::nanoseconds;

@ -5,4 +5,5 @@ test-results
packages
Grpc.v12.suo
TestResult.xml
/TestResults
*.nupkg

@ -65,6 +65,7 @@
</Compile>
<Compile Include="ClientBaseTest.cs" />
<Compile Include="ShutdownTest.cs" />
<Compile Include="Internal\AsyncCallTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" />

@ -53,7 +53,7 @@ namespace Grpc.Core.Tests
{
var env1 = GrpcEnvironment.AddRef();
var env2 = GrpcEnvironment.AddRef();
Assert.IsTrue(object.ReferenceEquals(env1, env2));
Assert.AreSame(env1, env2);
GrpcEnvironment.Release();
GrpcEnvironment.Release();
}
@ -61,18 +61,21 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAfterShutdown()
{
Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
var env1 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
var env2 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
Assert.IsFalse(object.ReferenceEquals(env1, env2));
Assert.AreNotSame(env1, env2);
}
[Test]
public void ReleaseWithoutAddRef()
{
Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
}

@ -0,0 +1,222 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class AsyncCallTest
{
Channel channel;
FakeNativeCall fakeCall;
AsyncCall<string, string> asyncCall;
[SetUp]
public void Init()
{
channel = new Channel("localhost", Credentials.Insecure);
fakeCall = new FakeNativeCall();
var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions());
asyncCall = new AsyncCall<string, string>(callDetails, fakeCall);
}
[TearDown]
public void Cleanup()
{
channel.ShutdownAsync().Wait();
}
[Test]
public void AsyncUnary_CompletionSuccess()
{
var resultTask = asyncCall.UnaryCallAsync("abc");
fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), new byte[] { 1, 2, 3 }, new Metadata());
Assert.IsTrue(resultTask.IsCompleted);
Assert.IsTrue(fakeCall.IsDisposed);
Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
}
[Test]
public void AsyncUnary_CompletionFailure()
{
var resultTask = asyncCall.UnaryCallAsync("abc");
fakeCall.UnaryResponseClientHandler(false, new ClientSideStatus(new Status(StatusCode.Internal, ""), null), new byte[] { 1, 2, 3 }, new Metadata());
Assert.IsTrue(resultTask.IsCompleted);
Assert.IsTrue(fakeCall.IsDisposed);
Assert.AreEqual(StatusCode.Internal, asyncCall.GetStatus().StatusCode);
Assert.IsNull(asyncCall.GetTrailers());
var ex = Assert.Throws<RpcException>(() => resultTask.GetAwaiter().GetResult());
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
}
internal class FakeNativeCall : INativeCall
{
public UnaryResponseClientHandler UnaryResponseClientHandler
{
get;
set;
}
public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
{
get;
set;
}
public ReceivedMessageHandler ReceivedMessageHandler
{
get;
set;
}
public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
{
get;
set;
}
public SendCompletionHandler SendCompletionHandler
{
get;
set;
}
public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
{
get;
set;
}
public bool IsCancelled
{
get;
set;
}
public bool IsDisposed
{
get;
set;
}
public void Cancel()
{
IsCancelled = true;
}
public void CancelWithStatus(Status status)
{
IsCancelled = true;
}
public string GetPeer()
{
return "PEER";
}
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
UnaryResponseClientHandler = callback;
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
throw new NotImplementedException();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
UnaryResponseClientHandler = callback;
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
{
ReceivedMessageHandler = callback;
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
ReceivedResponseHeadersHandler = callback;
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
SendCompletionHandler = callback;
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
{
SendCompletionHandler = callback;
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
ReceivedCloseOnServerHandler = callback;
}
public void Dispose()
{
IsDisposed = true;
}
}
}
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save